两篇文章从此让你告别AQE(下)
在上一篇公众号中 两篇文章从此让你告别AQE(上),我们探讨了如何在 Spark SQL 中实现AQE框架。在本篇文章中我们将重点介绍了两个核心 AQE 优化器规则,即 CoalesceShufflePartitoins 规则和 OptimizeSkewedJoin 规则,以及如何在后台实现。
1.动态合并shuffle分区
如果读者已经使用 Spark 一段时间了,可能会熟悉 200 这个数字。似乎总有 200 个任务在运行,无论数据有多小。从以下示例中我们可以看出,有 200 个任务用于处理一个非常小的数据集,每个任务的数据量少于 1kb。
200 个任务背后的原因是,在 Spark 作业执行期间,shuffle分区的数量是固定的,默认值为 200,其实对应的就是spark.sql.shuffle.partitions参数决定的。Shuffle分区的数量决定了Mapper任务写入的bucket数量和Reducer端的输出分区数量。
Shuffle分区的数量可能会显著影响 Spark 性能。如果shuffle分区太少,每个分区包含的数据太多,这会使任务过多,并抵消并行性的好处。当内存不足以处理那么多数据时,可能会触发磁盘溢出,从而导致成本高昂的 I/O 操作,或者发生 OOM 以终止作业。另一方面,太多的shuffle分区会导致太多的小任务,从而导致任务调度和管理的开销过大。因此,选择合适的shuffle分区数对于实现令人满意的性能非常重要。可以通过调整 spark.sql.shuffle.partition 属性来手动配置shuffle分区的数量。这实际上是Spark性能调优最常用的操作之一。但是Spark 作业通常涉及多个Stage,数据大小可能会在管道中的不同Stage发生巨大变化,例如,这些Stage涉及filter或aggregate算子等。由于 Spark 作业中所有Stage的shuffle分区数都是预定义的,因此对某个Stage最佳的shuffle分区数可能会导致另一个Stage的性能不佳。
因此,需要通过作业执行的pipeline,根据运行时特定Stage的数据量,动态调整shuffle分区数。这正是AQE可以提供帮助的使用场景。AQE将Spark作业拆分为多个查询Stage,并根据已完成的上游查询Stage收集的运行时统计信息,重新优化下游查询Stage的查询计划。CoalesceShufflePartitoins 规则是为动态配置shuffle分区数而创建的 AQE 优化器规则,此规则根据输出分区的目标大小合并连续的shuffle分区。对于上面使用的示例,如果我们启用 AQE 并重新运行相同的查询,我们可以看到此时只创建了一个任务。由于此示例中使用的整个数据集非常小,因此只需要一个分区。
若要合并shuffle分区,CoalesceShufflePartitoins 规则首先需要知道合并分区的目标大小,该大小是根据一下三个参数决定的:
i、advisoryTargetSize(spark.sql.adaptive.advisoryPartitionSizeInBytes,默认值为 64MB)
ii、minPartitionSize(spark.sql.adaptive.coalescePartitions.minPartitionSize,默认值为 1MB)
iii、minNumPartitions(spark.sql.adaptive.coalescePartitions.minPartitionNum,如果未设置,则回退到 Spark 默认并行度)
CoalesceShufflePartitions 规则从Mapper端输出统计信息中汇总shuffle输入数据大小,并将其除以 minNumPartitions,以获得合并分区的最大目标大小 maxTargetSize。如果 advisoryTargetSize 大于 maxTargetSize,则目标大小设置为 maxTargetSize,以便实现预期的并行性。如果目标大小非常小,甚至小于 minPartitionSize,则没有必要使目标大小小于 minPartitionSize,而是使用 minPartitionSize 作为目标大小。
准备好目标大小后,CoalesceShufflePartitions 规则可以开始根据目标大小合并分区,并定义合并分区规范 (CoalescedPartitionSpec),该规范稍后将用于创建shuffle reader。先用一个例子来解释这个过程。如下图所示,我们在Mapper端有一个带有两个输入分区的shuffle操作。如果未应用 CoalesceShufflePartitions 规则,则shuffle数据将读入五个输出分区,即使其中一些分区很小。
应用 CoalesceShufflePartitions 规则时,它会从Mapper Stage侧收集的 MapOutputStatistics 中获取所有shuffle分区的大小统计信息。在我们的示例中,我们有两个shuffle reader,每个shuffle reader有五个shuffle分区:
CoalesceShufflePartitions 规则将遍历所有shuffle分区,将所有shuffle分区的总大小相加,将具有连续索引的shuffle分区打包到单个合并分区,直到再添加一个分区超过目标大小。在我们的示例中,我们使用默认的advisory target大小 (64MB) 作为目标大小。第一个shuffle分区的总大小为 50MB,尝试合并第一个和第二个shuffle分区 (20MB) 最终会得到合并后的分区大小 (70MB),大于目标大小,因此,两个分区无法合并,第一个分区将作为单独的输出分区输出。为第一个分区创建一个 ColeascedPartitionSpec 对象,其中 startReducerIndex 为 0(含),endReducerIndex 为 1(不含)。
CoalesceShufflePartitions 规则继续添加第二个和第三个分区,结果为 50MB,小于目标大小。
然后继续添加第四个分区,第二个、第三个和第四个分区的总大小 (60MB) 仍未超过目标大小。
然后再尝试添加第五个分区但是超过目标大小的分区大小 (140MB)。因此,第二个、第三个和第四个分区合并为一个分区 (60MB)。将创建一个 CoalescedPartitionSpec 对象,该对象的 startReducerIndex 为 1,endReducerIndex 为 5。
由于没有要处理的进一步分区,因此将为最后一个(第五个)分区创建另一个 CoalescedPartitionSpec。最终,我们现在有三个输出分区,包括由三个小分区组成的输出分区。
上面的示例显示了只要小分区的总大小不超过目标大小,就会合并小分区的正常情况。但是,存在一种例外情况,即分区小于 minPartitionSize,但如果添加小分区,则两端与其相邻的分区将超过目标大小。对于此方案,小分区将与两个相邻分区之间的较小分区合并。
此时,CoalesceShufflePartitions 规则创建了一系列 CoalescedPartitionSpec 对象,每个对象都定义其中一个输出分区的规范。shuffle reader将使用这些分区规格来相应地输出分区。CoalesceShufflePartitoins 规则创建一个 AQEShuffleReadExec 运算符,该运算符包装当前shuffle排序查询Stage,并使用上面定义的 CoalescedPartitoinSpecs 创建 ShuffledRowRDD。对于由一个 CoalescedPartitionSpec 定义的 ShuffledRowRDD 的每个分区,从 CoalescedPartitionSpec 的 startReducerIndex 和 endReducerIndex 定义的map侧输出分区范围中读取shuffle相关的block用来创建shuffle reader。
在内部,调用 MapOutputTracker 的 getMapSizesByExecutorId 方法获取要读取的 shuffle block的元数据,包括BlockManager id、shuffle block id、shuffle block大小和map索引。然后创建一个 BlockStoreShuffleReader,它初始化一个 ShuffleBlockFetcheriterator,用于执行物理读取操作以从其他节点的块存储中获取块。
通过对上一节的解释动态shuffle分区合并的充分理解,就很容易理解动态倾斜连接优化,这有点像分区合并的“反向”操作,它将倾斜的分区拆分为多个较小的分区。
下面的示例显示了具有倾斜分区的作业执行。倾斜的分区有 5.8GB 的数据,而其他分区只有不到 20MB 的数据。运行倾斜分区的任务需要 6.7 分钟,而运行其他分区的任务需要不到 1 秒。具有倾斜分区的任务运行时间较长,使得作业总执行时间为 7.7 分钟。
然后,我们在应用动态偏斜join优化的情况下启用 AQE,并再次重新运行相同的查询。由于倾斜的分区已被拆分为多个小分区,因此最大的分区现在为 234MB,运行分区需要 29 秒。由于并行度的增加,作业总执行时间从 7.7 分钟减少到 1 分钟。
OptimizeSkewedJoin 规则是负责动态倾斜join优化的 AQE 优化器。在较高级别上,此规则拆分倾斜的分区,并在join的另一端复制其匹配的分区,以便创建更多任务来并行执行join,以避免拖慢作业完成速度的执行时间较长的任务。
在内部,OptimizeSkewedJoin 规则首先检查要优化的join是SortMergeJoinExec 还是ShuffledHashJoinExec。Spark 3.2 中的 OptimizeSkewdJoin 规则仅支持这两种类型的join。
接下来,OptimizeSkewedJoin 规则检测倾斜的分区,如果分区大于指定的倾斜分区阈值(spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,默认值为 256MB),并且大于偏斜分区因子的中值分区大小(spark.sql.adaptive.skewJoin.skewedPartitionFactor,默认值为 5),则该分区被视为倾斜分区。
找到倾斜分区后,我们可以计算拆分分区的目标大小,该大小是非倾斜分区的平均大小或建议分区大小(spark.sql.adaptive.advisoryPartitionSizeInBytes,默认为 64MB)取决于哪个更大。
OptimizeSkewedJoin 规则根据Map输出大小和目标大小定义分区拆分。它首先获取所有Mapper侧输出的大小,以便对倾斜分区进行shuffle排序,然后它会逐个遍历所有Mapper侧输出大小,并尝试合并相邻的Mapper侧输出大小,以便对Mapper侧输出进行分组,其总大小接近目标大小。然后,为每个组创建一个 PartialReducerPartitionSpec,该 Spec 封装了 Reducer 的 id(用于倾斜分区)、开始Mapper索引(组中映射输出的开始索引)和结束Mapper索引(组中映射输出的结束索引)。
现在,OptimizeSkewedJoin 规则已准备好 PartialReducerPartitionSpecs 列表,用于创建物理 AQEshuffleReadExec 运算符。物理shuffle block读取的其余步骤与前面提到的合并shuffle分区的步骤基本相同。主要区别在于,由 OptimizeSkewedJoin 规则创建的shuffle reader指定要读取的Mapper侧输出列表的 startMapIndex 和 endMapIndex。
当倾斜的分区被拆分为多个较小的分区时,其位于join另一端的匹配分区将复制到与拆分分区数匹配的相同数量的副本。join发生在倾斜分区的一个拆分和复制分区的一个副本之间。
涤生大数据往期精彩推荐
8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!
10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践
12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)
13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)
14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?
15.企业级Apache Kafka集群策略:Kakfa最佳实践总结
20.大数据实战:基于Flink+ODPS进行最近N天实时标签构建