实际开发中:如何有效运用Spark Catalyst的执行流程
当我们使用spark sql执行一条sql时,到底经历了哪些过程:
spark.sql(“select * from table;”)
首先我们来看看sql操作的源码部分,如下:
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}
上述代码主要是先初始化QueryPlanningTracker,该类的主要作用用于在查询计划中跟踪运行时和相关统计数据。该方法中的核心方法就是:parsePlan,而从parsePlan方法开始,就进入我们的Catalyst,如下图:
而从上图中我们可以看到,整个Catalyst中最核心的类之一就是QueryExecution类,该类包装了使用Catalyst执行关系查询的主要工作流,封装了用于执行查询执行流中涉及的每个主要阶段的方法,这些方法是惰性方法,仅在触发要查询的数据集的动作时执行。
本文中通过一个查询示例来说明查询执行流程,从文本sql语句到解析、分析、优化,直至物理计划。此示例查询用于查询示例数据集中的数据,该数据集将“sales”表联接到“items”表,并选择“i_price”列值小于10的项目。
df = spark.sql(‘’’SELECT s_date, s_quantity, i_price
FROM
sales
J0IN
items
ON
s_item id = i_item_id
WHERE
i_price < 10
‘’’)
df.explain(extended=True)
SQL Parsing SQL 解析
Catalyst 中查询计划的内部数据结构是树形的,在每个阶段都由规则进行转换。首先需要将文本格式的输入 SQL 语句解析为树形。调用 spark 会话的 sql 方法时,SQL 解析器首先将 sql 语句解析为 ANTLR ParseTree,然后再将 ANTLR ParseTree 转换为未解析的逻辑计划。然后,将未解析的逻辑计划传递到 Dataset 的 ofRows 方法中,以创建 QueryExecution 实例。
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}
对于我们的 sql 示例,这里是从我们的 sql 语句中解析的未解析的逻辑计划。“sales”表和“items”表均未解析,未验证列名,并且列上没有类型信息。
Analysis 解析
Spark SQL Catalyst 提供了一个逻辑查询计划分析器,该分析器从 SessionCatalog 中检索表和属性信息,并将 UnresolvedAttributes 和 UnresolvedRelations 转换为完全类型化的对象。SessionCatalog 是基础元存储(如 Hive 元存储Metastore )的代理,可以认为元存储是您可以查找表和列的完整类型信息的地方。逻辑查询计划分析器在分析阶段执行以下操作:
i、在 SessionCatalog 中按名称查找关系
ii、映射命名属性,例如列名
iii、确定具有相同值的属性并给出唯一ID
iv、通过表达式传播和强制类型
回到我们的示例,在分析了未解决的逻辑计划之后,我们得到了以下分析的逻辑计划。逻辑计划现在知道关系的全名和文件格式,列的数据类型也是可识别的,并且每个列都关联了唯一的 ID。
Logical Optimisation 逻辑优化
Logical Optimisation 逻辑优化
分析逻辑计划后,逻辑计划优化器将基于规则的标准优化应用于逻辑计划。Catalyst 附带了大约 70 个预构建的逻辑计划优化规则(RBO),每个规则都将结构化查询的一部分的查询计划转换为优化的逻辑计划。这些规则在规则批处理中分组和执行。
在我们的示例中,PushDownPredicate规则应用于分析的逻辑计划,并将过滤器从联接操作后移动到联接操作之前和数据加载后。预计这种优化将减小要联接的数据集的大小,并确保在物理规划阶段将谓词向下推送到数据源。
Physical Planning 物理规划
Physical Planning 物理规划
逻辑计划与平台无关,Spark 无法解释和执行。逻辑计划需要通过 SparkerPlanner 转换为 SparkPlan。QueryExecution 提供了一个 createSparkPlan 方法,该方法将优化后的逻辑计划作为输入,并调用 SparkPlanner 的计划方法,该方法应用节点匹配策略返回候选物理计划列表。
预计基于成本的模式(CBO)将分析每个候选物理计划的成本并选择最佳计划。需要注意的是对于 Spark 版本 3.0.0,基于成本的模型仍不可用,并且正在使用临时解决方案,该解决方案采用 SparkPlanner 返回的第一个计划。
回到我们的示例中,优化的逻辑计划中的逻辑运算符将转换为其物理对应符。DataSoruce 策略将关系转换为 FileScan 节点,该节点将过滤器谓词向下推送到数据源,以便仅读取查询所需的数据。Join Inner 逻辑运算符已被 Join 策略转换为 SortMergeJoin(建议 Join 策略选择更高效的 Join 算法,例如 broadcast。我将在以后的文章中详细介绍 Join 策略和优化方法)。
Execution Preparation 执行准备
Execution Preparation 执行准备
SparkPlanner将逻辑计划转换为物理计划后,需要执行一系列准备规则,为物理计划进行执行准备。
例如,EnsureRequirements 规则可确保满足物理操作员的所有要求(如果否,则为满足要求而需要执行的操作)。在我们的示例中,SortMergeJoin 运算符计划用于查询中的联接操作。SortMergeJoin 运算符在执行之前需要重新洗牌和排序。EnsureRequirements 规则会将这些必需的操作添加到其中。
除了保证需求外,准备规则还将确保子查询被规划,数据分区和排序正确使用,交换和子查询在应用时被重用。此外,在此阶段插入 WholeStageCodeGen。
Catalyst 规则执行器
Catalyst 规则执行器
Spark Catalyst 查询计划在内部表示为树,定义为 TreeNode 类型的子类。每个树节点可以有零个或多个树节点子节点,这构成了一个相互递归的结构。TreeNode 类型封装了用于遍历树和转换节点的方法列表。
QueryPlan 是 LogicalPlan 和 SparkPlan 的父级,是 TreeNode 类型的子类。这意味着 Spark SQL 查询执行过程中涉及的逻辑计划和 Spark 计划是递归树结构。将查询计划从一个阶段转换为另一个阶段的过程,例如,从已解决的逻辑计划转换为优化的逻辑计划,是应用规则来转换节点或子树的过程。
在内部,用于转换逻辑计划的 Catalyst 规则需要扩展抽象的基 Rule 类,该类定义了apply(plan:TreeType):TreeType 方法。所有逻辑计划分析器规则和优化器规则都必须从抽象基规则类继承,并使用转换逻辑覆盖和实现 apply 方法。
RuleExecutor 类由 Catalyst 提供,用于协调逻辑计划处理阶段的规则执行。Analyzer 类和 Optimizer 类都是 RuleExecutor 的子类。Analyzer 类负责将未解析的逻辑计划转换为已解析的逻辑计划,而 Optimizer 类负责将已解析的逻辑计划转换为优化的逻辑计划,然后再馈送到物理计划处理管道中。
物理计划处理阶段没有 RuleExecutor 子类。严格来说,物理计划不是 Spark Catalyst 的一部分。Spark Catalyst 被设计为一个独立于平台的 SQL 优化器,理论上应该能够用于任何平台。物理计划是特定于 Spark 的实现,它是 Spark SQL 的一部分。这就是为什么 Catalyst 位于单独的包装org.apache.spark.sql.catalyst中的原因。SparkPlan 是通过 Spark 策略 [SparkStrategy 类型] 从优化的逻辑计划转换而来的。
RuleExecutor 包含一个batches 属性,该属性被子类 Analyzer 或 Optimizer 覆盖,用于定义规则批次序列,每个规则批次都包含一个 Catalyst 规则列表。
RuleExecutor 批量执行 Catalyst 规则。可能会对一个批次进行多次运行,直到它到达树停止进化的固定点。仅当当前批处理完成所有运行时,RuleExecutor 才会移动到下一个批处理。批处理到一个固定点的多次运行允许规则设计为简单且自包含,但最终仍会对树产生更大的全局影响。
每个 RuleExecutor 都提供了一个executeAndTrack 方法,可以调用它来运行规则批处理。在 QueryExecution 中调用了 Analyzer 和 Optimizer 的 executeAndTrack 方法,用于启动逻辑计划转换。
executeAndTrack方法触发 RuleExecutor 的execute方法,该方法包含批处理运行循环。
对于批处理运行中的每个规则执行,Catalyst 将使用模式匹配来测试规则应用于查询计划树的哪个部分,并应用规则来转换该部分。
涤生大数据往期精彩推荐
8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!
10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践
12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)
13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)
14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?
15.企业级Apache Kafka集群策略:Kakfa最佳实践总结
20.大数据实战:基于Flink+ODPS进行最近N天实时标签构建
25.玩转大厂金融风控体系建设