likes
comments
collection
share

Dremio SQL 查询内存预估原理

作者站长头像
站长
· 阅读数 23

引言

我们在生产环境的两种场景经常会遇到一种错误:预估内存超过限制,导致查询在 Execution Planning 阶段报错直接失败。报错信息如下:

Query was cancelled because the initial memory requirement (130G) is greater than the job memory limit set by the administrator (128G).

两种场景分别为:

场景一:

select * from "Very splits table" limit 30;

SQL 很简单,select 一张 splits 数非常多的表并且只 limit 30 条数据。

场景二:

select
(select _col1 from table t1 where t0._col1 = t1._col1 group by _col1),
(select _col2 from table t2 where t0._col2 = t2._col2 group by _col2),
(select _col3 from table t3 where t0._col3 = t3._col3 group by _col3),
...
from table t0;

自关联非常多的查询,Dremio 在生成逻辑计划时会进行解关联,将自关联转换为多表 join。这就导致查询计划中多了很多的 scan、project、exchange 和 join 算子。

这种报错是 Dremio 中一种保护机制,在 Execution Planning 阶段通过计算由物理执行计划生成的 Fragment 中所有 PhysicalOperator 预估内存的总和,和 Dremio executor 内存总和比较,如果分配到同一个 executor 的 PhysicalOperator 内存总和超过节点总内存会拒绝执行。

关键问题

想要弄明白为什么报错就要确认以下几个关键问题:

  1. Fragment 中 PhysicalOperator 的内存是如何评估的?
  2. Physical Plan 又是如何转换为 Fragment 的?

PhysicalOperator 的内存评估

Operator 的内存计算

我们以 TPC-DS 中的 inventory 表和call_center 表为例,这两张表在数据生成基数为 1GB 的情况下差别很大,inventory 有 11,745,000 行数据,call_center 只有6 行。

那么在 Dremio 中 select * from inventory;select * from call_center; 的内存评估一样吗?从 debug 的结果发现都是一样的,18,000,000(B)。代码位置:com.dremio.exec.util.MemoryAllocationUtilities#setMemory

Dremio SQL 查询内存预估原理

从这次实验的结果来看 Dremio 的内存评估好像和表的大小没有直接关系。我们再看下一个案例,将数据生成基数调整为 100GB,此时 inventory 表的行数为:399,330,000,重新执行 select * from inventory;,此时预计内存变为了 38000000(B)。观察内存的统计逻辑发现,1GB 基数的 PhysicalOperator 有 18 个,而 100GB 基数的 PhysicalOperator 有 38 个,多了很多的 HiveGroupScan、Project、UnionExchange 和 EasyWriter 算子。

Dremio SQL 查询内存预估原理

通过观察这些算子的 Props 发现内存消耗都为 1,000,000(B),所以 Dremio 的内存估计其实非常粗糙,在初始化这些算子的时候就设置了 memReserve,但是大多数算子都是用了默认值,当然也有例外,例如 HashAggPrel 就考虑了它的 inputs 是否支持向量化计算和是否可以 Spill。

以上案例还存在一个疑点:100GB 的表为什么多出了很多物理算子?观察二者的 Physical Plan 发现只有一处不一致,HiveScan 中 1GB 表的 splits 数为 2,100GB 表的 splits 数为 34。

1GB inventory 表 Physical Plan

00-00    Screen : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 5.8038919E7, cumulative cost = {4.120763249E8 rows, 8.1835804412704E8 cpu, 2.32155676E8 io, 1.902051453468E12 network, 0.0 memory}, id = 2624
00-01      Project(Fragment=[$0], Records=[$1], Path=[$2], Metadata=[$3], Partition=[$4], FileSize=[$5], IcebergMetadata=[$6], fileschema=[$7]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 5.8038919E7, cumulative cost = {4.06272433E8 rows, 8.1255415222704E8 cpu, 2.32155676E8 io, 1.902051453468E12 network, 0.0 memory}, id = 2623
00-02        WriterCommitter(final=[/Users/luochunyi/project/dremio-oss-17/distribution/server/target/dremio-community-17.0.0-202107060524010627-31b5222b/dremio-community-17.0.0-202107060524010627-31b5222b/data/pdfs/results/1bed3ce8-225a-2118-efbc-b11043f7f800]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 5.8038919E7, cumulative cost = {3.48233514E8 rows, 8.1254950911352E8 cpu, 2.32155676E8 io, 1.902051453468E12 network, 0.0 memory}, id = 2622
00-03          UnionExchange : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 5.8038919E7, cumulative cost = {2.90194595E8 rows, 7.5451059011352E8 cpu, 2.32155676E8 io, 1.902051453468E12 network, 0.0 memory}, id = 2621
01-01            Writer : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 5.8038919E7, cumulative cost = {2.32155676E8 rows, 2.9019923811352E8 cpu, 2.32155676E8 io, 2.32155676E8 network, 0.0 memory}, id = 2620
01-02              Project(inv_date_sk=[$0], inv_item_sk=[$1], inv_warehouse_sk=[$2], inv_quantity_on_hand=[$3]) : rowType = RecordType(BIGINT inv_date_sk, BIGINT inv_item_sk, BIGINT inv_warehouse_sk, INTEGER inv_quantity_on_hand): rowcount = 5.8038919E7, cumulative cost = {1.74116757E8 rows, 2.3216031911352003E8 cpu, 2.32155676E8 io, 2.32155676E8 network, 0.0 memory}, id = 2619
01-03                Project(inv_date_sk=[$0], inv_item_sk=[$1], inv_warehouse_sk=[$2], inv_quantity_on_hand=[$3]) : rowType = RecordType(BIGINT inv_date_sk, BIGINT inv_item_sk, BIGINT inv_warehouse_sk, INTEGER inv_quantity_on_hand): rowcount = 5.8038919E7, cumulative cost = {1.16077838E8 rows, 2.3215799755676E8 cpu, 2.32155676E8 io, 2.32155676E8 network, 0.0 memory}, id = 2618
01-04                  HiveScan(table=[hive2.tpcds_2g_parquet.inventory], columns=[`inv_date_sk`, `inv_item_sk`, `inv_warehouse_sk`, `inv_quantity_on_hand`], splits=[2], mode=[NATIVE_PARQUET]) : rowType = RecordType(BIGINT inv_date_sk, BIGINT inv_item_sk, BIGINT inv_warehouse_sk, INTEGER inv_quantity_on_hand): rowcount = 5.8038919E7, cumulative cost = {5.8038919E7 rows, 2.32155676E8 cpu, 2.32155676E8 io, 2.32155676E8 network, 0.0 memory}, id = 2617

100GB inventory 表 Physical Plan

00-00    Screen : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 3.982527964E9, cumulative cost = {2.82759485444E10 rows, 5.6154281496874245E10 cpu, 1.5930111856E10 io, 1.30515406436208E14 network, 0.0 memory}, id = 1598
00-01      Project(Fragment=[$0], Records=[$1], Path=[$2], Metadata=[$3], Partition=[$4], FileSize=[$5], IcebergMetadata=[$6], fileschema=[$7]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 3.982527964E9, cumulative cost = {2.7877695748E10 rows, 5.575602870047424E10 cpu, 1.5930111856E10 io, 1.30515406436208E14 network, 0.0 memory}, id = 1597
00-02        WriterCommitter(final=[/Users/luochunyi/project/dremio-oss-17/distribution/server/target/dremio-community-17.0.0-202107060524010627-31b5222b/dremio-community-17.0.0-202107060524010627-31b5222b/data/pdfs/results/1bed3f3d-2d08-18fe-3df4-b84867fcf500]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 3.982527964E9, cumulative cost = {2.3895167784E10 rows, 5.575571009823712E10 cpu, 1.5930111856E10 io, 1.30515406436208E14 network, 0.0 memory}, id = 1596
00-03          UnionExchange : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 3.982527964E9, cumulative cost = {1.991263982E10 rows, 5.177318213423712E10 cpu, 1.5930111856E10 io, 1.30515406436208E14 network, 0.0 memory}, id = 1595
01-01            Writer : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema): rowcount = 3.982527964E9, cumulative cost = {1.5930111856E10 rows, 1.991295842223712E10 cpu, 1.5930111856E10 io, 1.5930111856E10 network, 0.0 memory}, id = 1594
01-02              Project(inv_date_sk=[$0], inv_item_sk=[$1], inv_warehouse_sk=[$2], inv_quantity_on_hand=[$3]) : rowType = RecordType(BIGINT inv_date_sk, BIGINT inv_item_sk, BIGINT inv_warehouse_sk, INTEGER inv_quantity_on_hand): rowcount = 3.982527964E9, cumulative cost = {1.1947583892E10 rows, 1.5930430458237122E10 cpu, 1.5930111856E10 io, 1.5930111856E10 network, 0.0 memory}, id = 1593
01-03                Project(inv_date_sk=[$0], inv_item_sk=[$1], inv_warehouse_sk=[$2], inv_quantity_on_hand=[$3]) : rowType = RecordType(BIGINT inv_date_sk, BIGINT inv_item_sk, BIGINT inv_warehouse_sk, INTEGER inv_quantity_on_hand): rowcount = 3.982527964E9, cumulative cost = {7.965055928E9 rows, 1.593027115711856E10 cpu, 1.5930111856E10 io, 1.5930111856E10 network, 0.0 memory}, id = 1592
01-04                  HiveScan(table=[hive2.tpcds_100g_parquet.inventory], columns=[`inv_date_sk`, `inv_item_sk`, `inv_warehouse_sk`, `inv_quantity_on_hand`], splits=[34], mode=[NATIVE_PARQUET]) : rowType = RecordType(BIGINT inv_date_sk, BIGINT inv_item_sk, BIGINT inv_warehouse_sk, INTEGER inv_quantity_on_hand): rowcount = 3.982527964E9, cumulative cost = {3.982527964E9 rows, 1.5930111856E10 cpu, 1.5930111856E10 io, 1.5930111856E10 network, 0.0 memory}, id = 1591

那么一定是因为 splits 数目导致最终生成的 Fragment 不一样,最终内存计算结果不一样。Fragment 的生成逻辑下一节会讲到。

内存评估限制

Dremio 还考虑了算子是否是内存昂贵的,只有查询中存在昂贵的算子时才进行内存预估限制,Dremio 17 中只有 HashAggPrel(reserve_bytes:1,000,000)、HashJoinPrel(reserve_bytes:1,000,000) 和 SortPrel(reserve_bytes:20,000,000) 三种算子是内存昂贵的。所以 Dremio 的内存预估策略也能在一定程度上可以避免 join、sort 等算子过多导致资源不足,但肯定不是实际消耗的内存。下图整体的内存限制策略,补充了部分细节。

Dremio SQL 查询内存预估原理

Operator 预估内存上界和下界的作用

上图中 Memory Bound 操作会将内存预估值设置到每个 Operator 的 Props 中。在生成整个执行计划的 Pipeline 时会基于设置的内存构造的算子的 BufferAllocator 对象。代码位置在:com.dremio.sabot.exec.fragment.OperatorContextCreator#newOperatorContext。

BufferAllocator 是 Arrow 格式的内存分配器,使用 Arrow 的任何功能前都必须初始化 BufferAllocator 对象,Arrow 的内存分配都必须经过这个 Allocator。ChildAllocator 必须从 RootAllocator 中创建,方便每个算子单独释放内存和排查是否内存泄漏。(这 Java 用了 Arrow 还要考虑内存分配和释放了???)

Physical-Plan convert to Fragment

构建 Fragment 时序图

Dremio SQL 查询内存预估原理

Dremio 中的 Fragment(片段)分为 Major Fragment 和 Minor Fragment。

Major Fragment 描述查询中被切分出来的一个阶段,是一种逻辑概念,例如下图不同颜色就代表不同的 Major Fragment,他们之间用 Exchange 算子切分。Exchange 算子负责数据转移,通常是由一个 Sender 和 Receiver 组成,其中 Sender 属于一个 Major Fragment,而 Receiver 属于一个另一个 Major Fragment。

Dremio SQL 查询内存预估原理

Minor Fragment 是执行查询线程调度的最小单元,一个 Major Fragment 中通常包含一个或多个 Minor Fragment。

构建 Major Fragment

首先拆分物理计划,上面提到 Major Fragment 由 Exchange 算子进行拆分,所以只需要遍历一次物理计划即可,并使用 com.dremio.exec.planner.fragment.Wrapper 包装。

Dremio SQL 查询内存预估原理

当然 rootFragment 可以直接循环遍历 ExchangeFragmentPair 一定是实现了 iterator() 方法,直接返回了 Exchange 算子集合的迭代器。

构建 Minor Fragment

构建 Minor Fragment 前,会遍历每个 FragmentWrapper 生成 FragmentParallelizer,从字面意思就可以看出是将 Major Fragment 并行化,也就是为构建 Minor Fragment 做准备。FragmentParallelizer 分为 SoftAffinityFragmentParallelizer 和 HardAffinityFragmentParallelizer,如果某个 fragment 必须在特定的 node 上运行,那么他就属于 HardAffinity,否则是 SoftAffinity。

这里我们只讲 SoftAffinityFragmentParallelizer 的逻辑,HardAffinityFragmentParallelizer 同理,只是可使用的 EndPoints 不同。

  1. 确定并行度(Width)

Width 参数的初始化

Width 参数是包装在com.dremio.exec.planner.fragment.ParallelizationInfo.ParallelizationInfoCollector类中。maxWidth 一般由 Scan 算子确定,例如 Parquet Scan 有 10 个 splits,那么最理想的 width 就是 10,如果有 10 个 EndPoint,每个 EndPoint 扫描一个 split 效率一般来说是最好的。如果只有一个 EndPoint,width 也不会降为 1,还存在一个 widthConstraint 变量,它一般由 Exchange 算子确定,物理计划中如果有两个 exchange 算子,那么至少存在两个 Scan 算子,minWidth 就为 2,width 初始化完成。

计算单次查询实际的 width

源码:com.dremio.exec.planner.fragment.SoftAffinityFragmentParallelizer#getWidth

  private int getWidth(final Stats stats, final int minWidth, final int maxWidth,
                       final ParallelizationParameters parameters, int numEndpoints)

参数 minWidth 和 maxWidth 就是刚刚初始化好的 width,numEndpoints 是当前存活的 worker 数量。

步骤一:根据物理计划的 cost / 计划中运算符的最大的 cost,称为 width1,这一步我认为没有太大的意义,因为算出的值往往很大。

步骤二:width2 = Math.min(width1, Math.min(maxWidth, 配置的全局单个查询最大 width 数,默认 1000)); 这一步很好理解,就是不超过 maxWidth。

步骤三:width3 = Math.min(width2, sysMaxWidth); sysMaxWidth 是单个节点的 width 限制 * 节点数。节点的 width 限制会根据 worker 的 CPU 核心数和当前 worker 负载大小浮动。当节点负载高于一定阈值时会将限制数乘以一个系数,降低 width,具体计算逻辑见下面的示意图。

步骤四:限制 1 ≤ minWidth ≤ width3 ≤ maxWidth。

Dremio SQL 查询内存预估原理

根据 width 选择 Endpoints

源码:com.dremio.exec.planner.fragment.SoftAffinityFragmentParallelizer#findEndpoints

根据 Fragment 的节点亲缘性(Affinity)分配 Endpoint,大多数情况都是 sortedAffinity(软亲缘,也就是可以算计分发)。

如果遇到 Endpoint 的数量小于 width,就会将 Endpoint 列表构建为循环迭代器,重复分发,当然如果只有一个 Endpoint,10 个 width,也会生成 size 为 10 的 assignedEndpointList。

根据 Endpoint 列表构建作业(assignments

源码:com.dremio.exec.store.schedule.AssignmentCreator2#makeAssignments

构建作业其实就是在分配 Scan 算子,准确的说是在分配 Scan 算子中的 splits。

主要逻辑是在有 Affinity 要求时优先选择列表中负载最小的 Endpoint,尽可能让所有的 Endpoint 负载均衡。剩余的 splits 的 partitionChunk 大小均分到每个 Endpoint。(使用了优先队列对 Endpoint 列表排序)

首先 Dremio 会尝试进行三轮分发,如果 Fragment 有 Affinity 要求则每次都选择负载最低的 Endpoint,否则直接加入等待列表。如果碰到将 Fragment 分发给 Endpoint 后,Endpoint 的负载会超过阈值则将这个 split 加入到等待列表中等待下一轮分发。直到这个等待列表为空。

如果三轮分发后等待列表还不为空,那就按照剩余的 splits 的 partitionChunk 大小均分到每个 Endpoint。

最后会将每个 Endpoints 上的分到的作业(assignments) 包装成 Minor Fragment。

总结

Dremio 中 PhysicalOperator 的内存评估相对粗糙,是变相的限制算子数量的多少,当然也留了很多扩展的接口,每个算子的内存消耗都可以随时修改或者调整初始大小。例如可以结合统计信息更精确的评估 Scan 算子的消耗,再结合直方图评估 join 的中间结果集消耗。

Fragment 的构建分为 Major Fragment 和 Minor Fragment,Major Fragment 是根据 Exchange 算子进行切分,Minor Fragment 则是根据 width 进行拆分,width 的确定和很多因素都有关系,包括 Scan 算子的 split 数量、worker 的 CPU 核心数、集群的负载和人为参数的干预。总的来说就是合理拆分 Scan 算子的 split,尽可能扩大并行数,并且使机器的负载尽可能平均。