spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推
spark sql 的优化框架 Catalyst 博大精深,里面的精华是很多大牛一个pr一个pr积累起来的,仔细琢磨琢磨相关源码也是一件痛并快乐的事情,今天我来抛砖引玉,讲讲 逻辑优化里面 谓词下推的实现,如果对 spark sql 总体架构不了解的,可以先看这篇文章 是时候学习真正的spark技术了 了解全貌。
谓词下推, 顾名思义,就是把过滤算子(就是你在 sql语句里面写的 where语句),尽可能地放在执行计划靠前的地方,好处就是尽早地过滤到不必要的数据,后续流程都节省了计算量,从而优化了性能。
举个最简单的例子:
我们对整个执行计划 explain 一下,就能清晰看到 spark sql 做的优化,Filter operator 从 Project operator 后面挪到了前面。
你可能觉得这也没有啥,不过对有些数据库,是直接可以把这个过滤下沉到 数据库层面,这样加载的数据量就少了很多,省了网络带宽,不过这个跟spark sql 没啥关系,就不提这个了。
上文说,要把 过滤算子 尽可能地放在执行计划靠前的地方, 这篇文章就是要把这个 尽可能掰扯清楚,哪些情况是可以挪动的,哪些情况是不可以挪动的。
spark sql 到了逻辑优化这一步就是利用scala强大的case正则匹配,对一个由各种operator组成的AST树尽其所能的匹配和修改,下面我们看下PushDownPredicate 优化策略都对哪些情况做了匹配优化
1 Filter 有个Project类型的子节点
这里匹配到的就是 Filter 算子,有个Project类型子节点的情况,就是我们上文例子中给的情况,然后后面又加了两个限制条件, 一个是 project 里面的要取的字段都是确定性的(deterministic),这个是啥意思呢,我举个例子
这里的 monotonicallyIncreasingId 就是不确定性的一个 expression, 这个表达式会生成一个64位的id,这个id 是唯一和单调递增的,多个分区的开始值不同,作用就是生成一个递增的唯一Id, 我看了下这个 expression 的实现, 前 31位是有分区ID组成,后33位是在这个分区里面累加上去的,问题就出在这里, 因为这个值是一个有状态的值,后一行的值依赖前一行的值,这就导致如果你把Filter 下推了,我们的例子中,对于第二个分区的两行数据 id: 2 和 id:3 ,其中 id 为2的行被过滤掉了,Long_id 没有经过累加1,然后id为3的Long_id就成了 8589934592 而不是8589934593,而如果先执行 monotonicallyIncreasingId 再过滤,这个值是 8589934593 。这个Filter下推,导致了结果的不同,所以在谓词下推的时候,只有operator 包含的所有expression都是确定性的时候才可以下推, 同理不能下推的还包括 rand 表达式。这个bug影响的版本和修复pr参考 https://issues.apache.org/jira/browse/SPARK-13473。
2 Filter 有个Aggregate 类型的子节点
这种情况, Filter 的有个Aggregate 类型子节点的情况(也就是你写的一些聚合操作), 同样的,aggregate 包含的表达式也必须是确定性的,还有一个条件是你Filter 的字段必须要在 group by 的维度字段里面,举个例子:
1 下面的聚合是可以 谓词下推的:
select a, count(*) as c from t1 group by a where a ==“1"
2 下面的聚合是不可以谓词下推的:
select count(*) as c from t1 where c == “10”
select a, count(b) as c from t1 group by a where c == “10"
这个其实是很好理解的,2 这种情况类似我们在 sql 里面写的 having 语句一样,是为了过滤分组聚合后的结果用的,如果把这个过滤下推,就相当于你把 count(*) 的别名 c 下推当成成了原始表中的 c字段,那么统计的结果就是错的, 而 1 的情况因为 a 字段在分组的字段里面, 这种经过 having 过滤后,其他 a 不为1 的分组肯定会被过滤掉, 所以 聚合后过滤 和 聚合前过滤,两者是等价的,可以谓词下推,相关的issue 和修复pr 参考 https://issues.apache.org/jira/browse/SPARK-22983
3 Filter 有个Window 窗口类型的子节点
针对 sql 中的窗口聚合,需要2种条件,才可以谓词下推:
谓词下推的表达式必须是窗口聚合的分区key
谓词必须是确定性的。
我们知道,spark 中窗口聚合操作,和普通聚合操作的不同的就在于,对于每个分组,前者对每一行都会算出来新的一行,后者对每个分组只会算出来一行,我们举个例子。
我们来看下执行计划:
你会发现
这个原因其实跟 group 聚合操作类似,假如我把 Filter (empNo#16L = 8 ) 下推了,那么过滤后就剩这一个员工了,那他肯定排第一,这个结果不就错了么,而 Filter(depName#15 = develop) 谓词可以下推是因为depName是分组的key,其实我后面只需要在这一个分组里面去窗口聚合,其他分组的数据拿了也是浪费,在前面过滤和在后面过滤在正确性上来讲是等价的。
4 Filter 的子节点只有部分类型才可以谓词下推
也就是说在 AST 树上多次应用谓词下推策略的时候,只有子节点是这些类型的 Operator的时候,才可以下推,其他的无法下推,比如Limit 类型,就不能谓词下推,这个也很好理解,举个例子:
一种是先过滤,然后再limit ,结果有10条数据,一种是先limit 在过滤,只有一条数据,很明显后者是对的。
好,抛砖引玉就到此为止,源码里面还有其他情况,比如碰到 EventTimeWatermark 应该怎么处理才能保证sql语义不错,这个等后面介绍过 Watermark 之后再讲。后面打算出一个 spark sql 源码解析的系列,这个就算是第一篇,大家如果有什么疑问直接留言或者发送我邮箱 1319027852@qq.com,欢迎交流。
大家都在看
▼
spark sql源码系列:
是时候学习真正的spark技术了 丨 从0到1认识 spark sql
structured streaming 系列:
structured streaming 原理剖析 丨 structured streaming 碰上kafka 丨structured streaming 是如何搞定乱序时间的
spark streaming 系列:
spark streaming 读取kafka各种姿势详解 丨 spark streaming流式计算中的困境与解决之道
spark core 系列:
彻底搞懂spark shuffle过程(1) 丨 彻底搞懂spark shuffle过程(2) 丨spark内存管理-Tungsten框架探秘
spark 机器学习系列
关注【spark技术分享】
一起撸spark源码,一起玩spark最佳实践