likes
comments
collection
share

实践|如何将传统 SQL 引擎的数据并行模型迁移到关系型流处理引擎上

作者站长头像
站长
· 阅读数 3

RisingWave 是一个开源的流式数据库,其流计算引擎从 Day 1 就是分布式计算系统,能够大规模并行处理数据。RisingWave 流处理引擎能够充分利用多节点和多核的计算资源,集群的处理能力理论上可以用增加计算节点的方式无限扩展。

传统 SQL 引擎的并行计算理论和实践都相当完善,其切分输入数据并行的方式理论上可以达到无限的扩展性,在很多系统经过了广泛的实现和验证。然而,当尝试将其迁移到关系型流处理引擎 RisingWave 时发现,流计算的计算模型对变更流操作的顺序要求比传统 SQL 计算模型更高,而传统数据并行模型将输入分片发送至多个处理节点的操作天然的丢失了不同节点上操作之间的原始顺序信息

本文将围绕”在关系型流处理引擎中如何高效的保序和恢复顺序”这一问题展开。

本文建立在前几篇文章基础之上,需要读者对相关概念有所了解,还敬请读者回顾一下前几篇文章中的 Stream key、Barrier 等概念,以获得更好的阅读体验。

1. SQL 查询引擎的数据并行模型

现代的 SQL 查询引擎多支持多个计算节点并行计算,这要求并行的任务之间不能共享访问内存状态,这也被称作 MPP(Massively Parallel Processing)架构。在实现中,SQL 查询引擎对特定的 SQL 算子,按照一定规则将输入数据分发(Shuffle)到不同的分片上,每个分片都是独立的 SQL 算子,只需要处理自己分片上的数据,最后将所有分片并行计算的结果合并即可得到计算结果。

如下面的例子包含商品表 items 和订单表 orders ,为简化问题,此处每个订单只对应一种商品。我们希望在订单表的基础上获得每一个订单中商品的详细信息,用一个 join 查询来实现。

CREATE TABLE items(id int, name text, PRIMARY KEY id);
CREATE TABLE orders(item_id int, id intPRIMARY KEY id);

INSERT INTO items VALUES (1'foo'), (2'bar'), (3'baz');
INSERT INTO orders VALUES (1101), (2102), (3103), (2104);

SELECT orders.*, item.name FROM orders JOIN items ON orders.item_id = item.id;

这样一个查询,在 MPP 架构下常采用 Shuffle Join 实现,如图所示。分别将两个表按照连接键(Join key)进行分区,将相同 Join Key 的记录重分布到同一分片上,每个分片上的数据单独执行串行的 Join 算法,所有分片 Join 的结果就是查询的结果。

实践|如何将传统 SQL 引擎的数据并行模型迁移到关系型流处理引擎上

我们可以注意到,将输入的数据打散到不同的节点时,原始输入数据的顺序丢失了,我们无法知晓在不同分片上的两行数据的先后关系。而在合并时,可以保证上游同一分片上输出的先后关系,不同上游分片上的数据合并到下游,其先后关系是任意的。举例来讲,图中合并之后 (2, 102, 'bar') 一定在 (2, 104, 'bar') 之前,但 (1, 101, 'foo') 和 (2, 104, 'bar') 之间的关系是任意的。这样做其实依赖于关系代数的良好性质,即关系(relation)是行的无序集合,因此从表中扫描出的原始数据顺序并无特殊意义,在算子的计算中也不必须尊重原始的输入顺序。

2. 变更流上的数据并行模型

我们继续使用上面的例子,对于上面的查询构建基于变更流的计算模型,即对于 items 和 orders 表的变更计算出 join 查询结果的变更流。

CREATE TABLE items(id int, name text, PRIMARY KEY id);
CREATE TABLE orders(item_id int, id intPRIMARY KEY id);

CREATE MATERIALIZED VIEW order_with_item_name AS
SELECT orders.*, item.name FROM orders JOIN items ON orders.item_id = item.id;

INSERT INTO items VALUES (1'foo');
INSERT INTO orders VALUES (1101);
UPDATE orders SET item_id = 2 WHERE id = 101;

仿照前文 MPP 架构的计算过程,如下图所示。

实践|如何将传统 SQL 引擎的数据并行模型迁移到关系型流处理引擎上

图中在合并时,刻意打乱了合并的顺序。该条 orders 表上的更新操作被拆成 DELETE 和 INSERT 两条消息(在图中标号为 ② 和 ③)被分发到不同的分片上,而它们对应产生的结果变更在合并到分片上的时候顺序颠倒了过来。

假设 MV order_with_item_name 是一张关系表,可以用下面的 SQL 来模拟对关系表上的变更。

CREATE TABLE order_with_item_name (item_id int, id int, item_name varchar);

INSERT INTO order_with_item_name VALUES (1101'foo');
INSERT INTO order_with_item_name VALUES (2101'bar');
DELETE FROM order_with_item_name WHERE item_id = 1 AND id = 101 AND item_name = 'foo';

3. 引入 Stream Key 后的乱序问题

上一节描述的关系表变更是低效的,在没有主键和索引的关系表上执行如上的 DELETE 操作需要遍历整张表。本系列文章第二篇《计算模型》中,我们在变更流上引入了 Stream Key 的概念来帮助快速的更改目标表。

只有 INSERT 和 DELETE 而没有其他保证的变更流对存储引擎是不友好的(试想从一个无序列表里找到某个值),要支持快速的插入和删除,通常需要一个唯一的 Key 去进行数据的索引。因此为了保证每个物化视图之上都一定存在一个可以用作主键的 Key,RisingWave 的优化器会对算子树做自底向上的推导,保证每个算子的输出都一定存在 Unique Key。它也被称作Stream Key,代表变更流上的操作是针对这个 Key 进行的。

因此实际的情况中,优化器会推导出在结果中, id 列是一个 Unique Key,将其设为输出的 Stream Key,物化视图使用这个 Stream Key 来作为表存储的主键。用 SQL 模拟结果表以及其变更如下所示。

CREATE TABLE order_with_item_name (
 item_id int, id int, item_name varchar,
 PRIMARY KEY(id)
);

INSERT INTO order_with_item_name VALUES (1101'foo');
INSERT INTO order_with_item_name VALUES (2101'bar');
DELETE FROM order_with_item_name WHERE id = 101;

在这样的定义下, INSERT (2, 101, 'far') 和 DELETE (1, 101, 'foo') 的顺序错误是不可接受的。由于 id 列是表上的主键,表中只能同时存在一个 id = 101 的记录,两条连续的 INSERT 违背了主键唯一的性质。而无论第二条插入的行为是覆盖已经存在的记录,或是忽视新插入的数据,表中这条唯一的记录都会被下面的删除操作删掉,导致结果错误。

到这里我们可以发现,输入分片发送至多个处理节点的操作天然的丢失了不同节点上操作之间的原始顺序信息,而这样的乱序在用 Stream Key 索引记录的回撤流上会产生正确性问题。这样的问题也会出现在其他用 Key 索引的变更流模型上,比如 Upsert Stream。

4. RisingWave 如何应对乱序

4.1 借助 Barrier

在前一篇文章中,我们引入了 Barrier。将 Barrier 引入并行计算模型中,和多输入的算子对 Barrier 的处理类似,下游接收到上游分片发来的 Barrier 时,所有分片需要进行对齐(align)操作,收齐上游所有分片的 Barrier 后,再向下游发 Barrier。而当有多个下游分片时,也需要向所有的下游分片发送 Barrier。

Barrier 将输入流切分成很多段,由于 Barrier 的性质,每个算子只能按照顺序依次处理这每一段数据,数据的乱序不会跨过 Barrier。我们用刚才的例子,在每条变更之间插入 Barrier 来防止乱序,如下图所示。在合并读取上游的分片 2 的 BarrierB 时,为了对齐所有上游分片的 BarrierB,需要先处理完其他分片 BarrierB 之前的变更,即分片 1 上的 -(1, 101, 'foo'),才能继续处理分片 2 上 BarrierB 之后的变更+(2, 101, 'bar')。这就防止了前文例子中这两个条变更操作的乱序。

实践|如何将传统 SQL 引擎的数据并行模型迁移到关系型流处理引擎上

但是由于前一篇文章提到的,Barrier 实际控制了算子攒批优化的上限,因此为了性能方面的考虑,我们仍需要灵活的 Barrier 注入策略,为了保序给每条变更之间插入 Barrier 的代价不可接受。仍需要思考其他方式处理 Barrier 内的乱序问题。

4.2 扩展 Stream Key

考察乱序导致问题的场景,由于不同分片上有涉及同一个 Stream Key 的变更,而我们无法确定不同分片间操作的顺序,进而导致同一个 Key 上的操作顺序不明确。用上文的例子来讲,Join 输出使用 id 列作为 Stream Key。在分片 1 和分片 2 上都出现了针对 id = 101 这条记录的变更。

实践|如何将传统 SQL 引擎的数据并行模型迁移到关系型流处理引擎上

CREATE TABLE order_with_item_name (
 item_id int, id int, item_name varchar,
 PRIMARY KEY(id)
);

INSERT INTO order_with_item_name VALUES (1101'foo');
INSERT INTO order_with_item_name VALUES (2101'bar');
DELETE FROM order_with_item_name WHERE id = 101;

这启发我们拓展 Stream Key 的定义来避免乱序带来的正确性问题,即在推导算子的 Stream Key 时,同时考虑算子并行的分片情况。

在一次 Shuffle 的情况下,下游算子的 Stream Key 不仅需要满足是结果关系表的 Unique Key,同时还必须包含当前算子输出的分区键(Shard Key)。这样对于不同分片上的变更一定涉及到的是不同的 Stream Key,彼此之间的更改不会冲突。而相同的 Stream Key 的变更,一定分布在同一分片上,不会出现乱序问题。

还是以上文的例子, Join 算子结果的分区键是 Join Key item_id,因此推导出 Stream Key 为 (item_id, id),物化视图也可以使用这个 Stream Key 作为存储表的主键。用下面的 SQL 模拟结果表上的变更,可以验证乱序的操作之间也没有发生冲突的情况。

CREATE TABLE order_with_item_name (
 item_id int, id int, item_name varchar,
 PRIMARY KEY(item_id, id)
);

INSERT INTO order_with_item_name VALUES (1101'foo');
INSERT INTO order_with_item_name VALUES (2101'bar');
DELETE FROM order_with_item_name WHERE item_id = 1 AND id = 101;

而在多次 Shuffle 的情况下,在上游多个分片的数据在下游的各个分片上发生合并,此时上游分区键之间的乱序已经发生。因此下游的 Stream Key 还需要包含上游的 Stream Key 来保证已经发生过的乱序不会影响结果。

实践|如何将传统 SQL 引擎的数据并行模型迁移到关系型流处理引擎上

5. 任意主键的处理

到这里,我们通过扩展 Stream Key 保证了乱序一定发生在不同的 Stream Key 之间,进一步通过控制结果表的主键就可以。但这样会有两个问题:

  1. 复杂的查询通常包括多次 Shuffle,这有可能导致 Stream Key 不停的增长,而一个很长的 Key 对计算存储各方面性能都不友好。
  2. 除了内部的物化视图,RisingWave 还支持创建 Sink,将查询结果的变更流注入到外部存储系统当中。而这些外部系统的主键是由用户定义的,RisingWave 无法控制。

这两点都要求能够有某种方式,能够将某一个 Stream Key 定义下的变更流再转化为另一个 Unique key 的变更流,在该 Key 下不产生乱序问题。以下给出 RisingWave 对这个问题的一个解决算法。

首先,我们知道变更的乱序不会越过 Barrier ,因此只需要处理每一个 Barrier 内的乱序。RisinWave 对每个 Barrier 内的所有操作:

  1. 按照流的原始 Stream Key 进行分组,由 Stream Key 和回撤流的性质易知,分组后对于同一个 Stream Key 的变更,一定是 INSERT 和 DELETE 操作交替出现。对于同一个 Stream Key 的变更流 (..., INSERT, DELETE, INSERT, ...) ,相邻的 (INSERT, DELETE) 组合可以消除。在进行消除之后,这个 Barrier 内对于同一个 Stream Key 的变更序列,只存在三种情况,即 (DELETE) 、 (INSERT) 和 (DELETE, INSERT) 。对于目标关系表上,正确顺序下的任何一个 Unique Key 的操作,其实也只存在这三种情况。用反证法证明,假设存在针对一个 Unique Key 的(INSERT, DELETE) 组合,由回撤流的性质可以知道这两行的值是相同的,那么他们的 Stream Key 也是相同的,这种情况下应该在针对该 Stream Key 操作的合并中已经消除掉。
  2. 将所有的 DELETE 置于所有 INSERT 操作前。由于在 1 中已经排除了对于任意 Unique key 上 (INSERT, DELETE) 操作组合的可能,因此这样做不会改变此变更流的语义,还一定能把对于目标 Key 错误的 INSERT 在 DELETE 前面的乱序情况纠正过来。
  3. 有时,Sink 目标的外部的存储表可能存在外键等其他约束(constraint)。这时如果针对同一个主键的操作被拆成了 DELETE 和 INSERT 两个操作会导致删除时打破约束不成功。所以我们还需要用外部表的主键再做一次合并,将不相邻的针对同一个主键的修改,保证原子的更改每一个主键。

我们还是以本文中的查询为例,但对于基表的更新更加复杂。

CREATE TABLE items(id int, name text, PRIMARY KEY id);
CREATE TABLE orders(item_id int, id intPRIMARY KEY id);

CREATE TABLE order_with_item_name (
 item_id int, id int, item_name varchar,
 PRIMARY KEY(id)
);

CREATE SINK s into order_with_item_name AS
SELECT orders.*, item.name FROM orders JOIN items ON orders.item_id = item.id;

INSERT INTO items VALUES (1'foo');
INSERT INTO orders VALUES (1101);
UPDATE orders SET item_id = 2 WHERE id = 101;
UPDATE orders SET item_id = 3 WHERE id = 101;

流程如图所示,为描述清晰,只保留两条更新语句对应的变更流,并且他们是某一相邻 barrier 之间的全部变更。

实践|如何将传统 SQL 引擎的数据并行模型迁移到关系型流处理引擎上

6. 小结

本文展示了 RisingWave 流处理引擎并行计算模型。RisingWave 为了实现资源的无限扩展,参考了传统 SQL 引擎的数据并行模型,但该模型将输入分片发送至多个处理节点的操作天然的丢失了不同节点上操作之间的原始顺序信息,使结果变更流发生乱序,在使用 Stream Key 索引的回撤流计算模型上会导致正确性问题。RisingWave 通过拓展 Stream Key 的方式规避了乱序导致的正确性问题,并且给出了将其转变为任意 Unique Key 上的变更流的解决方案。

补充一点讨论,和这篇文章主题关系不大,因此并没有加进去。对于计算引擎,未来应用层的计算框架或者rpc协议需要更多的去理清自己对“保序”需求的边界在哪里。目前来看其实不少应用都依赖了tcp的流式抽象,但这种保序在网络实现上并不是免费的午餐。像是AWS SRD(放宽范围nvlink可能也在此之列)这种“可靠但不保序”的协议也许才是未来。

7. 关于 RisingWave

🔍关于更多常见问题及答案,欢迎大家来这里搜索留言: risingwavelabs/discussions