数据之道 | 进阶版Spark执行计划图
供稿 | eBay DSS Team
作者 | 田川晓阳
编辑 | 顾欣怡本文4490字,预计阅读时间14分钟更多干货请关注“eBay技术荟”公众号导读
新一代数据开发分析平台Zeta由eBay DSS(Data Services and Solutions) 团队自主研发,主要针对在Spark SQL运行过程中可能存在的性能隐患及Spark执行计划图的缺陷,提出相应解决方案,旨在降低Spark SQL优化门槛,助力eBay用户解放分析效能,也希望对同业人员有所启发和帮助。
1. 背景介绍
eBay的大部分数据仓库之前一直构建于商业数据仓库系统Teradata之上,从2017年开始,eBay决定完全基于Hadoop平台来构建数据仓库,并开始了恢弘的数据搬迁之旅。然而随着以Spark为主要计算引擎的新开源平台的落户,复杂的数据访问模式和调优分析以及未经结构化的数据展现等问题开始变成全量投产的瓶颈。
为了解决这个困境,eBay DSS(Data Services & Solutions) 部门挺身而出,自主研发了Zeta——新一代数据开发分析平台,旨在为数据工程师、数据分析师和数据科学家提供跨平台且涵盖全生命周期的数据服务。平台支持基于元数据的数据探索、大数据开发、数据测试、数据分析以及终端的数据可视化等功能。目前,平台已累计服务超过2000多名用户,现有1000多名数据工程师、分析师及产品经理高度依赖Zeta平台来完成日常的数据开发处理和分析工作。
2. 痛点分析
随着大量分析场景的涌入和使用的激增,目前Hadoop平台每天要运行上万个Job,处理超过500PB的数据。与此同时,低质量Spark SQL的大量提交导致性能问题屡屡发生,这严重影响了平台性能,导致了资源浪费。然而,让用户自己对Spark SQL进行优化存在很高的技术门槛。对于缺少Spark经验的使用者来说,进行自主排查并解决问题无疑是一项痛苦甚至不可能完成的任务。
为了能够尽早地发现性能隐患、降低SQL优化的门槛、助力eBay用户解放分析效能,Zeta团队基于以往的实践对这些问题进行了一些探索并研发出了独有的解决方案,接下来本文将对此进行详细阐述。
3. 行业分析
3.1 关于性能分析的行业解决方案
3.2 现有Spark执行计划图的缺陷
其实Spark执行计划图的主要缺陷在于用户很难直接从图上提供的信息中找出性能瓶颈。在执行Spark SQL的时候,Spark UI中存在两个可以用来观察分析自己Job运行情况的Tab,分别是Task level的运行状态图(图1)和SQL 的Physical Plan DAG(图2):
图1 Task Level
(点击可查看大图)(点击可查看大图)
Task level的Tab内容更加偏底层一些,我们可以获取到很多重要的数据信息,比如当前已处理的数据数量以及和JVM相关的重要数据。但问题是,在SQL特别复杂的时候很难将状态信息和自己的SQL的逻辑计划对应起来。SQL Tab 展示了物理执行计划图,更贴近用户的逻辑计划图,但是缺少Spark执行过程中偏底层的状态信息,比如task粒度的信息。如果发生了数据倾斜,无法直接在这张图上看出问题所在。
因此,在现有的Spark UI 布局下,对于用户来说,想要debug就必须在不同的图之间反复切换,同时还要结合Spark内核原理来分析状态信息从而诊断自己Job的问题所在,这对大部分数据分析师和产品经理来说具有一定的学习成本。而且即便发现了问题,这部分状态信息也不一定能给用户提供解决问题的具体方向,Spark当前所暴露出来的内部信息满足不了解决问题的需求。
4. Zeta 解决方案
4.1 核心问题
一句 SQL从解析到实际被Spark进程执行的过程中会历经如下几个阶段:
如图3所示,SQL最终会被转换成底层的RDD,整个任务执行的DAG图会被分解成一个或者多个具有依赖关系的stage并最终以task为执行单元发送到Spark Executor的进程中去执行。大部分情况下,到了这个阶段就已经无法再从task执行的上下文中找出这个task到底是在执行SQL上对应的哪块逻辑,因为经过Codegen等一系列优化之后理论上已经无法在实际的物理执行过程和最初的逻辑计划上建立映射关系。
对于数据倾斜这种情况,我们可以通过观察Spark UI来进行判定。如果某个stage执行了很长时间,其中少部分task处理的数据又比其他task多很多,那么就证明出现了数据倾斜。以多张表做join为例,如果在shuffle的过程中产生了数据倾斜,为了尽可能将数据分散到不同的进程中进行处理,从而达到平衡工作负载的目的,比较通用的有以下几个方法:修改逻辑,将shuffle时的key尽可能打散;加大shuffle的分区数量从而使数据分散到更多的分区中去;单独找出产生了极大倾斜的key,在逻辑中单独处理最后再和其他部分union起来。
在准备开始解决这个问题之前,我们必须要回答两个核心问题:
1)如何找出SQL逻辑中发生了倾斜的那个部分?
2)如果发生了倾斜,又该如何知道到底倾斜在了哪一些key上呢?因此我们还是决定从物理算子入手,通过修改部分物理算子的算法,来达到在做原有计算逻辑的同时也对数据做统计计算。于是我们在这当中对少部分物理算子的算法做了较大的重构。由于做了重构设计,因此当动态计算key的功能启动之时,这少部分原本支持Codegen的物理算子将无法支持Codegen。
在实际的计算过程中,如果开启动态计算key的功能,将会为每一个TaskSet创建一个定制化的AdvancedTaskSetManager,主要作用一是执行原有物理算子的逻辑,二是当发现某些task存在数据倾斜的时候,会额外启动一个TaskSet’来执行统计计算的逻辑,如图4所示:
图4(点击可查看大图)
这个TaskSet’的执行逻辑和正常TaskSet的执行逻辑一样,都是修改过算法后的执行逻辑。不同的是AdvancedTaskSetManager会为这两个TaskSet分别注入不通的TaskContext,从而控制实际Task 在Runtime中执行不同的逻辑分支,一部分进行正常的计算,另外一部分进行统计计算并将结果返回到Driver端进行聚合,从而达到统计汇总的目的。这个改动对Spark原有代码有一定的侵入性。
好了,这下我们需要的运行时的数据都拿到了,接下来要做的就是构建这幅图了。Spark在Driver的初始化进程中会创建一个Spark UI对象,Spark UI会启动一个Jetty的web服务来供外部访问,Driver内部的状态存储对象AppStatusStore会为不同的Tab提供后端Render页面的数据,运行时Spark UI内部的状态如图5所示:
图5(点击可查看大图)
构建的第一步便是记录构建图形所需要的数据。Spark会在运行过程的某些逻辑中构建对应的事件,以便记录上下文并异步发送到Spark Driver内部的消息总线LiveListenerBus中。而且会有特定的Listener在总线的队列中监听特定的事件,当SQL被解析完毕并且准备开始执行的时候,会发出SparkListenerSQLExecutionStart事件。该事件中包含了SQL的物理计划执行图,像SparkPlanInfo,Driver内部的SQLAppStatusListener会监听这个事件并根据SparkPlanInfo准备将来做SQL Physical Plan后端渲染的数据。
因此,我们在SQL执行前物理计划树的遍历阶段记录下每个算子和对应RDD的上下文信息(图6),并以事件的形式发送到消息总线中,再由我们定制化的Listener监听捕捉并和已有的物理计划图进行整合即可(图7)。
图7(点击可查看大图)
而在每个stage开始执行和执行完毕的时候,也会发出相应的事件,这些事件中就包含了上文提到的各种统计信息和额外被注入的诊断信息。因此这些事件也会被监听,并用来update当前我们定制化的DAG图的状态。以一个实际生产中的案例作为参考,原始DAG图(图8)和经过定制化后的DAG图(图9)分别如下所示:图8 社区版DAG图
(点击可查看大图)
图9 经过定制后的DAG图
(点击可查看大图)
此案例是在实际生产中两张表进行join的时候产生了数据倾斜,可见社区版原始的DAG图中只展示了少量的和内存以及数据量等相关的信息,而这些数据并不足以帮助我们观察出内在的性能问题。反观经过定制化的DAG图,可以看到在这个阶段运行过程中检测到了数据倾斜,并且显示倾斜发生在join操作上,还提示了join的字段为user_id,并同时计算出了倾斜最严重的值为1。而这个倾斜最严重的值在相关业务场景中其实是脏数据,于是用户根据这个信息更改了SQL,将user_id为1的数据filter掉了,从而大大减少了shuffle时候的数据量,大幅缩短了整体的运行时间,解决了内存溢出的问题。5. 线上效果
5.1 减少事故发生率
6. 未来优化方向及社区跟进
实战 | 利用Delta Lake使Spark SQL支持跨表CRUD操作
一探究竟 | eBay流量管理之DSR在基础架构中的运用及优化
👇点击阅读原文,一键投递