查看原文
其他

两篇文章从此让你告别AQE(上)

涤生-健哥 涤生大数据
2024-12-05

基于成本的优化 (Cost-BasedOptimization,CBO) 并不是什么新鲜事,多年来,它已在RDBMS世界中广泛使用。然而,按照Databricks里的专门做spark的相关人员的说法,在分布式、存储/计算分离的系统(如 Spark)中使用 CBO 是一个“极其复杂的问题”,因为它要收集和维护一组分布在物理机器上的Spark最新的统计结果数据,而这项工作是及其具有挑战性和昂贵的操作。

自从Spark 3.0 正式发布的自适应查询(Adaptive Query Execution,AQE)框架利用运行时统计信息,在查询执行过程中持续优化查询计划。在正常的非自适应查询Non-AQE过程中,一旦创建物理计划并开始运行,物理计划就无法再更新,即使运行时统计信息显示可能基于日期统计信息生成的查询计划效率较低。相反,允许AQE进程根据更准确的运行时统计信息重新优化,并根据更新的查询计划执行剩余的查询阶段

Spark 查询作业根据查询计划中的宽依赖关系可以分为多个Stage,而Spark 的多Stage作业执行模型使 Spark AQE的执行成为可能。Spark 查询的物理执行由Stage运行的并行Task组成,其中从Stage创建 TaskSet,并将 TaskSet 中的任务分发到各个Executor上并在执行。查询过程中处理的数据的最新和最准确的统计信息就是在Stage的执行过程中产生的,而AQE正是利用Stage执行生成的运行时统计信息,优化查询计划,并使用优化的计划执行其余阶段。

说明:由于篇幅问题,关于Spark SQL中的AQE我将发布两篇文章说明,本片将重点介绍AQE执行框架在后台的工作原理,然后在下一篇博文中探讨一些重要的 AQE 物理优化器规则的实现细节。

在深入研究AQE之前,先看一下正常的Non-AQE过程是如何工作的。在某种程度上,AQE可以看作是Non-AQE的变体,因此充分了解Non-AQE可以帮助我们更加轻松的去理解AQE。

Non-AQE

概括地说,当触发 RDD 的操作时,Spark 将作业提交到由当前 SparkContext 初始化的 DAGScheduler里。DAGScheduler 计算已提交作业的整体Stage的 DAG,其中Stage是通过在宽依赖处中断 RDD 来创建的。为作业创建两种类型的 Spark Stage,ResultStage 是执行管道中用于执行操作的最后Stage,ShuffleMapStage 是中间阶段,用于为 shuffle 写入映射输出文件。为每个 Spark Stage创建一组任务,即 Spark 中的 TaskSet。如下图所示,从下往上看,左边的图就是Spark 作业的DAG,中间就是DAG具体的Stage的类型,而右边的图则表示每个Stage由一组Task组成,即TaskSet。

看一下下面的查询,该查询联接了“sales”表和“items”表,然后对联接表进行聚合。

SELECTa.date, SUM(a.quantity)FROM (SELECT s.date, i.item_id, s.quantity, i.priceFROM sales sINNER JOIN items iON s.item_id = i.item_idWHERE i.price < 10) aGROUP BY a.date

这是根据此查询生成的 DAG。查询执行分为四个Stage:前两个阶段从数据文件中读取数据;第三个连接两个表并进行部分聚合;最后一个阶段是结果阶段,进行最终汇总并得到结果。

接下来我们分析一下这些Stage是如何在后台创建和执行的。首先DAGScheduler 自下而上遍历 RDD 管道,并以递归方式创建Stage。下图显示了示例查询的 RDD 管道的简化版本。以红色突出显示的箭头表示RDD中的宽依赖,即产生新的Stage。

Stage创建的执行流程从 DAGScheduler 的 createResultStage 方法开始。顾名思义,createResultStage 方法负责创建Result Stage。但是,在此方法创建Result Stage之前,它必须确保创建此Result Stage的所有父Stage必须创建完成。因此,它必须遍历上游 RDD 节点,查找 ShuffleDependency 并首先创建父Stage。同样,父Stage必须确保在创建父Stage之前创建自己的父Stage。这个过程一直持续到到达根Stage(即没有父Stage的阶段)。在那里,物理上创建根Stage,递归函数调用开始返回之旅。在返回途中,将物理创建相应函数调用中的Stage,从而可以创建其子Stage,直到创建Result Stage。

当一个ShuffleMapStage被创建时,它首先会在shuffledIdToMapStage 中注册,然后将宽依赖的ID和相应的ShuffleMapStage进行一一映射,如下图所示。需要注意的是shuffledIdToMapStage 仅缓存属于当前正在运行的作业的Stage及稍后将访问的一些Stage。

当 createResultStage 方法创建并返回最终Result Stage时,将提交最终Stage以运行一个完整的作业。与创建Stage类似,Stage是递归提交的。在提交当前Stage之前,需要先提交所有父Stage。父Stage由 getMissingParentStages 方法获取,该方法首先查找当前阶段依赖的父Stage,并在 shuffledIdToMapStage 哈希映射中查找父Stage的ShuffleMapStage。如果父Stage的shuffleMapStage 在 shuffledIdToMapStage 的哈希映射中缺失,则会为创建新的ShuffleMapStage,并在哈希映射中注册。

AQE

有了以上关于Non-AQE的相关的知识,现在我们就可以讨论AQE了。首先,我们重新运行上面讨论的示例查询,但这次启用 AQE。

SET spark.sql.adaptive.enabled = true;SELECTa.date, SUM(a.quantity)FROM (SELECT s.date, i.item_id, s.quantity, i.priceFROM sales sINNER JOIN items iON s.item_id = i.item_idWHERE i.price < 10) aGROUP BY a.date

开启AQE:

通过比较在 AQE 关闭和 AQE 打开的情况下执行的同一查询的物理计划,我们可以看到,当 AQE 启用时,联接算法已经从排序-合并联接变为广播哈希联接。正如之前积累的关于Spark SQL Join策略的相关知识我们知道,与排序-合并连接相比,广播哈希连接是首选策略,后者不需要额外的排序步骤。要启用广播哈希联接,联接中至少有一个表需要很小。从查询中,我们可以看到“items”表在与“sales”表联接之前被过滤了。查询执行统计信息显示,筛选器将“items”表的数据大小从 3000 万行 (约150MB) 减少到 30万行 (约5MB)。因此,在开启 AQE 的情况下生成的物理计划比在关闭 AQE 时生成的物理计划更优化。

启用 AQE 后,EXPLAIN 命令打印两个物理计划,即初始计划和最终计划。初始计划是通过 Spark Catalyst 优化器生成的物理计划的第一个版本,尚未进行任何调整。可见,初始版本选择排序-合并联接算法,该算法与 AQE 关闭时生成的物理计划相同。

即使 EXPLAIN 命令没有将相关信息打印出来,但是我们应该知道的是在初始计划和最终计划之间也存在中间计划。AQE 自下而上遍历物理计划,创建并执行查询Stage,并重新优化计划,并为其余物理计划段创建和执行Stage,直到执行整个物理计划。

深入了解 AQE 的源代码,看看它具体是如何工作的。在准备执行所选物理计划的查询执行准备Stage,将应用 InsertAdaptiveSparkPlan 规则,该规则使用 AdaptiveSparkPlanExec 实例包装查询计划。AdaptiveSparkPlanExec 封装了 AQE 的主执行框架,该框架自适应地驱动查询计划的执行。

执行 AdaptiveSparkPlanExec 时,它会调用 getFinalPhysicalPlan 方法来启动执行流。与上面讨论的Non-AQE相同,AQE 也会进行递归函数调用,以遍历创建Stage的物理计划。AdaptiveSparkPlanExec 定义一个专用的 createQueryStages 方法。以递归方式调用此方法以自下而上遍历计划。如果当前节点是 Exchange 节点,并且其所有子Stage都已实现,则会创建并返回新的 QueryStage,即 ShuffleQueryStageExec 或 BroadcastQueryStageExec(具体取决于 Exchange 节点的类型)。在执行新Stage之前,物理优化器规则列表将应用于新Stage。这些优化器规则封装了 Spark 3.0 提供的核心性能优化功能。我将在下一篇文章中更加具体的介绍这些规则,这里简单的了解一下:

然后,将具体化 createQueryStages 方法返回的新Stage,该方法在内部将Stage提交到 DAGScheduler 以独立运行,并返回Stage执行的映射输出统计信息。然后,根据新的统计信息重新优化和重新规划查询计划。然后评估新规划的物理计划的成本(通过 CostEvaluator 的实现),并将其与旧物理计划的成本进行比较。如果新的物理计划的运行成本比旧计划便宜,请使用新的物理计划进行剩余的处理。

说实话,这里涉及到大量的源码剖析,可能部分同学看到大量的源码就会脑袋发懵,但是我又想将这部分描述的更加具体,好让同学们知道整个AQE的递归执行流程。因此,我基于前面使用的“销售/物料”示例来演练AQE计划的演变。

首先,在根节点上调用 createQueryStages 方法,这里会有两个“如果”的条件:

  • 如果节点具有子节点,但该子节点不是 Exchange 节点;

  • 该节点是 Exchange 节点,但并非所有子节点都具体化。

(allChildStagesMaterialized=fasle)

则会在当前节点的子节点上进行内部 createQueryStages 方法调用。重复该过程,直到不满足任何“如果”条件。由于这是第一次运行,因此尚未实现任何节点。因此,createQueryStages 方法将递归调用到没有子节点的底部节点。

由于底部节点没有子节点,因此底部节点的 allChildStagesMaterialized 属性设置为 true。在递归 createQueryStages 方法调用的返回旅程中,底部节点的父节点是宽依赖的 Exchange 节点。由于底部节点不是 Exchange 节点,并且底部节点的 allChildStagesMaterialized 属性为 true,因此底部节点本身可以标记为 materialized,因此其父节点 Exchange 节点的 allChildStagesMaterialized 属性也为 true。现在,创建新 QueryStage 的条件已满足:当前节点是 Exchange 节点,并且其所有子阶段都已实现。

在我们的示例查询中,底部节点是用于读取“items”表和“sales”表的文件扫描节点。因此,将创建两个 ShuffleQueryStageExec 对象。之后,递归 createQueryStages 方法调用的返回旅程将继续。但是,由于这两个 Exchange 节点尚未实现,因此不会为 Exchange 节点的所有祖先节点创建查询阶段。

当顶级 createQueryStages 方法调用完成后,将调用两个新创建的 ShuffleQueryStageExec 的 materialize 方法执行该阶段并返回运行时统计信息。之后,与 ShuffleQueryStageExec 对应的逻辑节点将替换为 LogicalQueryStage。根据更新的统计数据重新优化和重新规划逻辑计划,并生成新的物理计划。在我们的示例中,统计数据显示“items”数据集的大小足够小,可以限定广播哈希联接的使用。因此,在删除排序运算符的新物理计划中,SortMergeJoin 将替换为 BroadcastHashJoin。此时,AQE的第一次迭代已完成。

接下来,在新的物理计划上调用 createQueryStages 方法以启动新的迭代并重复该过程以执行下一阶段。

涤生大数据往期精彩推荐

1.企业数仓DQC数据质量管理实践篇

2.企业数据治理实战总结--数仓面试必备

3.OneData理论案例实战—企业级数仓业务过程

4.中大厂数仓模型规范与度量指标有哪些?

5.手把手教你搭建用户画像系统(入门篇上)

6.手把手教你搭建用户画像系统(入门篇下)

7.SQL优化之诊断篇:快速定位生产性能问题实践

8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!

9.新能源趋势下一个简单的数仓项目,助力理解数仓模型

10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践

11.开发实战角度:distinct实现原理及具体优化总结

12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)

13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)

14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?

15.企业级Apache Kafka集群策略:Kakfa最佳实践总结

16.玩转Spark小文件合并与文件读写提交机制

17.一文详解Spark内存模型原理,面试轻松搞定

18.大厂8年老司机漫谈数仓架构

19.一文带你深入吃透Spark的窗口函数

20.大数据实战:基于Flink+ODPS进行最近N天实时标签构建

21.数仓面试还不懂什么是基线管理?


继续滑动看下一个
涤生大数据
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存