查看原文
其他

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

spark君 张江打工人 2021-09-05

spark sql 的优化框架 Catalyst 博大精深,里面的精华是很多大牛一个pr一个pr积累起来的,仔细琢磨琢磨相关源码也是一件痛并快乐的事情,今天我来抛砖引玉,讲讲 逻辑优化里面 谓词下推的实现,如果对 spark sql 总体架构不了解的,可以先看这篇文章 是时候学习真正的spark技术了 了解全貌。

谓词下推, 顾名思义,就是把过滤算子(就是你在 sql语句里面写的 where语句),尽可能地放在执行计划靠前的地方,好处就是尽早地过滤到不必要的数据,后续流程都节省了计算量,从而优化了性能。


举个最简单的例子:


我们对整个执行计划 explain 一下,就能清晰看到 spark sql 做的优化,Filter  operator 从 Project operator 后面挪到了前面。



你可能觉得这也没有啥,不过对有些数据库,是直接可以把这个过滤下沉到 数据库层面,这样加载的数据量就少了很多,省了网络带宽,不过这个跟spark sql 没啥关系,就不提这个了。

上文说,要把 过滤算子 尽可能地放在执行计划靠前的地方, 这篇文章就是要把这个 尽可能掰扯清楚,哪些情况是可以挪动的,哪些情况是不可以挪动的。

spark sql 到了逻辑优化这一步就是利用scala强大的case正则匹配,对一个由各种operator组成的AST树尽其所能的匹配和修改,下面我们看下PushDownPredicate 优化策略都对哪些情况做了匹配优化


1 Filter 有个Project类型的子节点



这里匹配到的就是 Filter 算子,有个Project类型子节点的情况,就是我们上文例子中给的情况,然后后面又加了两个限制条件,  一个是 project 里面的要取的字段都是确定性的(deterministic),这个是啥意思呢,我举个例子





这里的 monotonicallyIncreasingId 就是不确定性的一个 expression, 这个表达式会生成一个64位的id,这个id 是唯一和单调递增的,多个分区的开始值不同,作用就是生成一个递增的唯一Id,  我看了下这个 expression 的实现, 前 31位是有分区ID组成,后33位是在这个分区里面累加上去的,问题就出在这里, 因为这个值是一个有状态的值,后一行的值依赖前一行的值,这就导致如果你把Filter 下推了,我们的例子中,对于第二个分区的两行数据 id: 2 和 id:3 ,其中 id 为2的行被过滤掉了,Long_id 没有经过累加1,然后id为3的Long_id就成了 8589934592 而不是8589934593,而如果先执行 monotonicallyIncreasingId 再过滤,这个值是 8589934593 。这个Filter下推,导致了结果的不同,所以在谓词下推的时候,只有operator 包含的所有expression都是确定性的时候才可以下推, 同理不能下推的还包括 rand 表达式。这个bug影响的版本和修复pr参考 https://issues.apache.org/jira/browse/SPARK-13473


2  Filter 有个Aggregate 类型的子节点



这种情况, Filter 的有个Aggregate 类型子节点的情况(也就是你写的一些聚合操作), 同样的,aggregate 包含的表达式也必须是确定性的,还有一个条件是你Filter 的字段必须要在 group by 的维度字段里面,举个例子:


1 下面的聚合是可以 谓词下推的:

  • select a, count(*) as c from t1 group by a  where a ==“1"


2 下面的聚合是不可以谓词下推的:

  • select  count(*) as  c  from t1 where c ==  “10”

  • select a, count(b) as c   from t1  group by a  where c == “10"


这个其实是很好理解的,2 这种情况类似我们在 sql 里面写的 having 语句一样,是为了过滤分组聚合后的结果用的,如果把这个过滤下推,就相当于你把 count(*) 的别名 c 下推当成成了原始表中的 c字段,那么统计的结果就是错的, 而 1 的情况因为 a 字段在分组的字段里面, 这种经过 having 过滤后,其他 a 不为1 的分组肯定会被过滤掉, 所以 聚合后过滤   和 聚合前过滤,两者是等价的,可以谓词下推,相关的issue 和修复pr 参考  https://issues.apache.org/jira/browse/SPARK-22983


3  Filter 有个Window 窗口类型的子节点


针对 sql 中的窗口聚合,需要2种条件,才可以谓词下推:


  • 谓词下推的表达式必须是窗口聚合的分区key

  •  谓词必须是确定性的。

我们知道,spark 中窗口聚合操作,和普通聚合操作的不同的就在于,对于每个分组,前者对每一行都会算出来新的一行,后者对每个分组只会算出来一行,我们举个例子。



这个例子是按照 部门进行分组,每个部门按照业绩 对员工进行降序排名,业绩相同的员工,rank 并列,假如这时候我们需要统计develop 部门 8 号员工的名次,可以在后面加上过滤条件


我们来看下执行计划:



你会发现 


这个原因其实跟 group 聚合操作类似,假如我把  Filter (empNo#16L = 8 ) 下推了,那么过滤后就剩这一个员工了,那他肯定排第一,这个结果不就错了么,而 Filter(depName#15 = develop) 谓词可以下推是因为depName是分组的key,其实我后面只需要在这一个分组里面去窗口聚合,其他分组的数据拿了也是浪费,在前面过滤和在后面过滤在正确性上来讲是等价的。


4 Filter 的子节点只有部分类型才可以谓词下推





也就是说在 AST 树上多次应用谓词下推策略的时候,只有子节点是这些类型的 Operator的时候,才可以下推,其他的无法下推,比如Limit 类型,就不能谓词下推,这个也很好理解,举个例子:





一种是先过滤,然后再limit ,结果有10条数据,一种是先limit 在过滤,只有一条数据,很明显后者是对的。

好,抛砖引玉就到此为止,源码里面还有其他情况,比如碰到 EventTimeWatermark 应该怎么处理才能保证sql语义不错,这个等后面介绍过 Watermark 之后再讲。后面打算出一个 spark sql 源码解析的系列,这个就算是第一篇,大家如果有什么疑问直接留言或者发送我邮箱 1319027852@qq.com,欢迎交流。




大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践




听说新版微信这有个好看


视频 小程序 ,轻点两下取消赞 在看 ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存