likes
comments
collection
share

火山模型中的查询并行技术解析

作者站长头像
站长
· 阅读数 75

简介

本文主要参考 1994 年 Graefe 的论文《Volcano-An Extensible and Parallel Query Evaluation System》,来解释火山模型中对查询进行并行的方法

火山模型下,每一种关系代数操作会被抽象为一个算子,从而将整个 SQL 构建成一棵算子树(Operator Tree),从根节点到叶子结点自上而下地递归调用 next() 函数,从而实现计算。

文章虽然很老了,火山模型也由于性能问题也逐渐被向量化引擎和编译执行所取代,但是其中将查询表示为算子树进行并行和优化的思路,现在依然被延用。本文主要聚焦于根据算子树对查询进行并行的方法

根据算子树实现并行查询主要包括以下两个方向:

  1. 算子间并行(inter-operator parallelism):查询算子树可能包含多种运算,通过并行执行其中一些互不依赖的算子来实现算子树的计算并行化,这种并行方式称为称为算子间并行。
  2. 算子内并行(intra-operator parallelism):由于关系运算是在大量元组集合上进行,因此可以将每个算子的 input 数据进行独立的分区,从而在不同的数据分区上并行地执行相同的 operator,这种并行方式称为算子内并行。

火山模型中负责实现并行执行的是 Exchange 运算符,它是一个有 open, next, close 方法的 iterator,可以被插到查询执行树中的任何一个或多个位置。图5所示是一个插入了 Exchange 算子的查询计划。

火山模型中的查询并行技术解析

因为主要参考 1994 年的论文,所以下面的表述中不区分进程和线程,统一用进程表示

算子间并行(inter-operator parallelism)

Exchange 的 open 方法用于创建进程,Exchange 算子上方作为父进程,下方作为子进程,例如图5中的查询树执行 open 方法后,创建的进程将如图6所示。

火山模型中的查询并行技术解析

Exchange 采用生产者消费者模型,父进程会作为消费者,子进程会作为生产者,同时在共享内存中创建一个数据结构 port 用于同步和数据交换。例如 Scan 算子会作为生产者,上方的 Join 算子作为消费者。

生产者端的 exchange 算子会作为 driver 驱动查询执行,其输出会放到 packet 里面临时存储。 packet 被填满后,会被放到 port 中,同时发送一个信号量来提醒消费者可以进行消费。

消费者端的 exchange 算子就和普通的迭代器一样,只不过它接收输入时会通过进程间的通信而不是内部的方法调用。

注意,火山模型中所有其他模块都是基于 demand-driven,即 iterator 调用 Next() 方法后,数据流再从下游传到上游,控制流和数据流的方向相反。而 Exchange 算子则是基于data-driven,生产者侧的数据就绪后再通知消费者执行,数据流和控制流的方向相同。可参见下图的 Pull 模型和 Push 模型的比较,容易理解。

这主要有两方面的原因:1. Data-driven 的方式更容易实现算子内的并行,因为算子内并行需要对数据进行分区,然后基于不交叉的数据进行; 2. 这种模式避免了多余的控制流来 Request data,进程间通信时这些不避免要的控制流会导致延迟。

火山模型中的查询并行技术解析

同时,data-driven 的模式下允许流量控制(flow control) 或者说反压(back pressure)。比如说,当生产者的生产速度大于消费者的消费速度时,会导致数据堆积,占用较大内存的问题。这时可以通过消费者端发送一个信号量,告诉生产者降低生产速度或停止生产,等消费者消费完后再进行,从而解决问题。

算子内并行(intra-operator parallelism)

算子内的并行需要对输入数据进行分区,输入数据主要包括数据存储和中间结果。

  • 数据存储的分区主要依赖物理分区,比如不同设备,不同文件。
  • 中间结果分区则主要依靠在 port 中使用不同的队列。生产者使用分区 support function 来决定放到哪个队列里。

图7中展示了为了实现算子内并行创建的进程,Join 算子有三个进程执行,Scan 算子由一个或两个进程执行。通过规定并发度(degree of parallelism)来确定执行的进程数。因为同时有三个进程在执行 Join 算子,因此必须对 Scan 得到的数据进行重分区,以交给不同的进程执行。

火山模型中的查询并行技术解析

所有的 Scan 进程都可以传递数据给所有的 Join 进程,但是 Join 算子间的数据传递只允许在每个 Join 进程内部进行。此时,如果使用了基于分区的并行 Join 方法,且图7中两个 Join 是针对不同属性进行的,则会导致出现问题。因为第一个 Join 是用属性 1 做的分区,此时属性2 相同的 tuple 可能落在不同的 Join 进程中。这个问题可以使用 exchange 算子的变式来解决,称为 inter-change.

Exchange 算子的变式

目前,我们提到的 exchange 算子都只能在一个进程的顶部或底部出现(要么提供输入,要么进行输出)。除此之外,Exchange 还可以在一个进程的 operator tree 的中间出现,其功能只限于提供一个数据交换的窗口。其 next 方法从下游的算子中获取输入,并可能把它发送给同一个 Group 的其他进程(如果属于自己的分区就自己用)。这种操作模式称为 inter-change.

另外,还有能把输出广播给所有消费者的 exchange 算子, 比如 HashJoin 中广播小表构建的哈希表;根据 producer 把 input 分别存储的 exchange 算子,以便上游可以区分输入的来源。

参考

Volcano - An Extensible and Parallel Query Evaluation System

zhuanlan.zhihu.com/p/219516250

Push vs. Pull-Based Loop Fusion in Query Engines

转载自:https://juejin.cn/post/7282604743631847435
评论
请登录