10月26日,字节跳动技术沙龙 | 大数据架构专场 在上海字节跳动总部圆满结束。我们邀请到字节跳动数据仓库架构负责人郭俊,Kyligence大数据研发工程师陶加涛,字节跳动存储工程师徐明敏,阿里云高级技术专家白宸和大家进行分享交流。
以下是字节跳动数据仓库架构负责人郭俊的分享主题沉淀,《字节跳动在Spark SQL上的核心优化实践》。
数据仓库架构团队负责数据仓库领域架构设计,支持字节跳动几乎所有产品线(包含但不限于抖音、今日头条、西瓜视频、火山视频)数据仓库方向的需求,如Spark SQL / Druid的二次开发和优化。
今天的分享分为三个部分,第一个部分是SparkSQL的架构简介,第二部分介绍字节跳动在SparkSQL引擎上的优化实践,第三部分是字节跳动在Spark Shuffle稳定性提升和性能优化上的实践与探索。
我们先简单聊一下Spark SQL的架构。下面这张图描述了一条SQL提交之后需要经历的几个阶段,结合这些阶段就可以看到在哪些环节可以做优化。
很多时候,做数据仓库建模的同学更倾向于直接写SQL而非使用Spark的 DSL。一条SQL提交之后会被Parser解析并转化为Unresolved Logical Plan。它的重点是Logical Plan也即逻辑计划,它描述了希望做什么样的查询。Unresolved是指该查询相关的一些信息未知,比如不知道查询的目标表的Schema以及数据位置。上述信息存于Catalog内。在生产环境中,一般由Hive Metastore提供Catalog服务。Analyzer 会结合Catalog将Unresolved Logical Plan转换为Resolved Logical Plan。到这里还不够。不同的人写出来的SQL不一样,生成的Resolved Logical Plan也就不一样,执行效率也不一样。为了保证无论用户如何写SQL都可以高效的执行,Spark SQL需要对Resolved Logical Plan进行优化,这个优化由Optimizer完成。Optimizer包含了一系列规则,对Resolved Logical Plan进行等价转换,最终生成Optimized Logical Plan。该Optimized Logical Plan不能保证是全局最优的,但至少是接近最优的。上述过程只与SQL有关,与查询有关,但是与Spark无关,因此无法直接提交给Spark执行。Query Planner负责将Optimized Logical Plan转换为Physical Plan,进而可以直接由Spark执行。由于同一种逻辑算子可以有多种物理实现。如Join有多种实现,ShuffledHashJoin、BroadcastHashJoin、BroadcastNestedLoopJoin、SortMergeJoin等。因此 Optimized Logical Plan可被Query Planner转换为多个Physical Plan。如何选择最优的Physical Plan成为一件非常影响最终执行性能的事情。一种比较好的方式是,构建一个Cost Model,并对所有候选的Physical Plan应用该Model并挑选Cost最小的Physical Plan作为最终的Selected Physical Plan。Physical Plan可直接转换成RDD由Spark执行。我们经常说“计划赶不上变化”,在执行过程中,可能发现原计划不是最优的,后续执行计划如果能根据运行时的统计信息进行调整可能提升整体执行效率。这部分动态调整由Adaptive Execution完成。后面介绍字节跳动在Spark SQL上做的一些优化,主要围绕这一节介绍的逻辑计划优化与物理计划优化展开。
在Spark里,实际并没有Bucket Join算子。这里说的Bucket Join泛指不需要Shuffle的SortMergeJoin。下图展示了SortMergeJoin的基本原理。用虚线框代表的Table 1和Table 2 是两张需要按某字段进行Join的表。虚线框内的partition 0到partition m是该表转换成RDD后的Partition,而非表的分区。假设Table 1与Table 2转换为RDD后分别包含m和k个Partition。为了进行Join,需要通过Shuffle保证相同Join Key的数据在同一个Partition内且Partition内按Key排序,同时保证Table 1与 Table 2经过Shuffle后的RDD的Partition数相同。如下图所示,经过Shuffle后只需要启动n个Task,每个Task处理Table 1与Table 2中对应Partition的数据进行Join即可。如Task 0只需要顺序扫描Shuffle后的左右两边的partition 0即可完成Join。
该方法的优势是适用场景广,几乎可用于任意大小的数据集。劣势是每次Join都需要对全量数据进行Shuffle,而Shuffle是最影响Spark SQL性能的环节。如果能避免Shuffle往往能大幅提升Spark SQL性能。对于大数据的场景来讲,数据一般是一次写入多次查询。如果经常对两张表按相同或类似的方式进行Join,每次都需要付出Shuffle的代价。与其这样,不如让数据在写的时候,就让数据按照利于Join的方式分布,从而使得Join时无需进行Shuffle。如下图所示,Table 1与 Table 2内的数据按照相同的Key进行分桶且桶数都为n,同时桶内按该Key排序。对这两张表进行Join时,可以避免Shuffle,直接启动n个Task进行Join。字节跳动对Spark SQL的BucketJoin做了四项比较大的改进。在过去一段时间,字节跳动把大量的Hive作业迁移到了SparkSQL。而Hive与Spark SQL的Bucket表不兼容。对于使用Bucket表的场景,如果直接更新计算引擎,会造成Spark SQL写入Hive Bucket表的数据无法被下游的Hive作业当成Bucket表进行Bucket Join,从而造成作业执行时间变长,可能影响 SLA。为了解决这个问题,我们让Spark SQL支持Hive兼容模式,从而保证Spark SQL写入的Bucket表与Hive写入的Bucket表效果一致,并且这种表可以被Hive和Spark SQL当成Bucket表进行Bucket Join而不需要Shuffle。通过这种方式保证Hive向Spark SQL的透明迁移。第一个需要解决的问题是,Hive的一个Bucket一般只包含一个文件,而Spark SQL的一个Bucket可能包含多个文件。解决办法是动态增加一次以Bucket Key为Key并且并行度与Bucket个数相同的Shuffle。
第二个需要解决的问题是,Hive 1.x的哈希方式与Spark SQL 2.x的哈希方式(Murmur3Hash)不同,使得相同的数据在Hive中的Bucket ID与Spark SQL中的Bucket ID不同而无法直接Join。在Hive兼容模式下,我们让上述动态增加的Shuffle使用Hive相同的哈希方式,从而解决该问题。Spark SQL要求只有Bucket相同的表才能(必要非充分条件)进行Bucket Join。对于两张大小相差很大的表,比如几百GB的维度表与几十TB(单分区)的事实表,它们的Bucket个数往往不同,并且个数相差很多,默认无法进行Bucket Join。因此我们通过两种方式支持了倍数关系的Bucket Join,即当两张Bucket表的Bucket数是倍数关系时支持Bucket Join。第一种方式,Task个数与小表Bucket个数相同。如下图所示,Table A包含3 个Bucket,Table B包含6个Bucket。此时Table B的bucket 0与bucket 3的数据合集应该与Table A的bucket 0进行Join。这种情况下,可以启动3个 Task。其中Task 0对Table A的bucket 0与Table B的bucket 0 + bucket 3 进行Join。在这里,需要对Table B的bucket 0与bucket 3的数据再做一次merge sort从而保证合集有序。
如果Table A 与Table B的Bucket个数相差不大,可以使用上述方式。如果Table B的Bucket个数是Bucket A Bucket个数的10倍,那上述方式虽然避免了Shuffle,但可能因为并行度不够反而比包含Shuffle的SortMergeJoin速度慢。此时可以使用另外一种方式,即Task个数与大表Bucket个数相等,如下图所示。
在该方案下,可将Table A的3个Bucket 读多次。在上图中,直接将Table A 与Table A进行Bucket Union(新的算子,与Union类似,但保留了Bucket特性),结果相当于6个Bucket,与Table B的Bucket个数相同,从而可以进行Bucket Join。公司内部过去使用Bucket的表较少,在我们对Bucket做了一系列改进后,大量用户希望将表转换为Bucket表。转换后,表的元信息显示该表为Bucket 表,而历史分区内的数据并未按Bucket表要求分布,在查询历史数据时会出现无法识别Bucket的问题。同时,由于数据量上涨快,平均Bucket大小也快速增长。这会造成单Task需要处理的数据量过大进而引起使用Bucket后的效果可能不如直接使用基于Shuffle的Join。为了解决上述问题,我们实现了支持降级的Bucket表。基本原理是,每次修改Bucket信息(包含上述两种情况——将非Bucket表转为Bucket表,以及修改Bucket个数)时,记录修改日期。并且在决定使用哪种Join方式时,对于Bucket表先检查所查询的数据是否只包含该日期之后的分区。如果是,则当成Bucket表处理,支持Bucket Join;否则当成普通无Bucket的表。对于一张常用表,可能会与另外一张表按User字段做Join,也可能会与另外一张表按User和App字段做Join,与其它表按User与Item字段进行Join。而Spark SQL原生的Bucket Join 要求Join Key Set与表的Bucket KeySet完全相同才能进行Bucket Join。在该场景中,不同Join的Key Set不同,因此无法同时使用Bucket Join。这极大的限制了Bucket Join的适用场景。针对此问题,我们支持了超集场景下的Bucket Join。只要Join Key Set包含了Bucket Key Set,即可进行Bucket Join。如下图所示,Table X与Table Y,都按字段A分Bucket。而查询需要对Table X与Table Y进行Join,且Join Key Set为A与B。此时,由于A相等的数据,在两表中的Bucket ID 相同,那A与B各自相等的数据在两表中的Bucket ID 肯定也相同,所以数据分布是满足Join要求的,不需要Shuffle。同时,Bucket Join还需要保证两表按Join Key Set即A和B排序,此时只需要对Table X与Table Y进行分区内排序即可。由于两边已经按字段A排序了,此时再按A与B排序,代价相对较低。
Spark SQL处理嵌套类型数据时,存在以下问题:- 读取大量不必要的数据:对于Parquet/ORC等列式存储格式,可只读取需要的字段,而直接跳过其它字段,从而极大节省IO。而对于嵌套数据类型的字段,如下图中的Map类型的people字段,往往只需要读取其中的子字段,如people.age。却需要将整个Map类型的people 字段全部读取出来然后抽取出people.age字段。这会引入大量的无意义的IO开销。在我们的场景中,存在不少Map类型的字段,而且很多包含几十至几百个Key,这也就意味着IO被放大了几十至几百倍。
- 无法进行向量化读取:而向量化读能极大的提升性能。但截止到目前(2019年10月26日),Spark不支持包含嵌套数据类型的向量化读取。这极大的影响了包含嵌套数据类型的查询性能
- 不支持Filter下推:目前(2019年10月26日)的Spark不支持嵌套类型字段上的Filter的下推
- 重复计算:JSON字段,在Spark SQL中以String类型存在,严格来说不算嵌套数据类型。不过实践中也常用于保存不固定的多个字段,在查询时通过JSON Path抽取目标子字段,而大型JSON字符串的字段抽取非常消耗CPU。对于热点表,频繁重复抽取相同子字段非常浪费资源。
对于这个问题,做数仓的同学也想了一些解决方案。如下图所示,在名为base_table的表之外创建了一张名为sub_table的表,并且将高频使用的子字段people.age设置为一个额外的Integer类型的字段。下游不再通过base_table查询people.age,而是使用sub_table上的age字段代替。通过这种方式,将嵌套类型字段上的查询转为了Primitive 类型字段的查询,同时解决了上述问题。- 额外维护了一张表,引入了大量的额外存储/计算开销。
- 无法在新表上查询新增字段的历史数据(如要支持对历史数据的查询,需要重跑历史作业,开销过大,无法接受)。
- 运营成本高:如果高频子字段变化,需要删除不再需要的独立子字段,并添加新子字段为独立字段。删除前,需要确保下游无业务使用该字段。而新增字段需要通知并推进下游业务方使用新字段。
为解决上述所有问题,我们设计并实现了物化列。它的原理是:- 新增一个Primitive类型字段,比如 Integer类型的age字段,并且指定它是people.age的物化字段。
- 插入数据时,为物化字段自动生成数据,并在Partition Parameter内保存物化关系。因此对插入数据的作业完全透明,表的维护方不需要修改已有作业。
- 查询时,检查所需查询的所有Partition,如果都包含物化信息(people.age到age的映射),直接将select people.age自动重写为select age,从而实现对下游查询方的完全透明优化。同时兼容历史数据。
在OLAP领域,经常会对相同表的某些固定字段进行Group By和Aggregate/Join等耗时操作,造成大量重复性计算,浪费资源,且影响查询性能,不利于提升用户体验。如上图所示,查询历史显示大量查询根据user进行group by,然后num进行sum或count计算。此时可创建一张物化视图,且对user进行gorup by,对num进行avg(avg会自动转换为count和sum)。用户对原始表进行select user, sum(num) 查询时,Spark SQL自动将查询重写为对物化视图的select user, sum_num查询。
下图展示了我们在Spark SQL上进行的其它部分优化工作:Shuffle的原理,很多同学应该已经很熟悉了。鉴于时间关系,这里不介绍过多细节,只简单介绍下基本模型。如上图所示,我们将Shuffle上游Stage称为Mapper Stage,其中的Task称为Mapper。Shuffle下游Stage称为Reducer Stage,其中Task称为Reducer。每个Mapper会将自己的数据分为最多N个部分,N为Reducer个数。每个Reducer需要去最多M(Mapper个数)个 Mapper获取属于自己的那部分数据。
稳定性问题:Mapper的Shuffle Write数据存于Mapper本地磁盘,只有一个副本。当该机器出现磁盘故障,或者IO满载,CPU满载时,Reducer无法读取该数据,从而引起FetchFailedException,进而导致Stage Retry。Stage Retry 会造成作业执行时间增长,直接影响SLA。同时,执行时间越长,出现Shuffle数据无法读取的可能性越大,反过来又会造成更多Stage Retry。如此循环,可能导致大型作业无法成功执行。
- 性能问题:每个Mapper的数据会被大量Reducer读取,并且是随机读取不同部分。假设Mapper的Shuffle输出为512MB,Reducer有10万个,那平均每个Reducer读取数据512MB/100000=5.24KB。并且,不同Reducer并行读取数据。对于Mapper输出文件而言,存在大量的随机读取。而HDD的随机IO性能远低于顺序IO。最终的现象是,Reducer读取Shuffle数据非常慢,反映到Metrics上就是Reducer Shuffle Read Blocked Time较长,甚至占整个Reducer执行时间的一大半,如下图所示。
经观察,引起Shuffle失败的最大因素不是磁盘故障等硬件问题,而是CPU满载和磁盘IO满载。
如上图所示,机器的CPU使用率接近100%,使得Mapper侧的Node Manager内的Spark External Shuffle Service无法及时提供Shuffle 服务。
下图中Data Node占用了整台机器IO资源的84%,部分磁盘IO完全打满,这使得读取Shuffle数据非常慢,进而使得Reducer侧无法在超时时间内读取数据,造成FetchFailedException。
无论是何种原因,问题的症结都是Mapper侧的Shuffle Write数据只保存在本地,一旦该节点出现问题,会造成该节点上所有Shuffle Write数据无法被Reducer读取。解决这个问题的一个通用方法是,通过多副本保证可用性。
最初始的一个简单方案是,Mapper侧最终数据文件与索引文件不写在本地磁盘,而是直接写到HDFS。Reducer不再通过Mapper侧的External Shuffle Service读取Shuffle数据,而是直接从HDFS上获取数据,如下图所示。
快速实现这个方案后,我们做了几组简单的测试。结果表明:
在上面的实验过程中,HDFS发出了报警信息。如下图所示,HDFS Name Node Proxy的QPS峰值达到60万。(注:字节跳动自研了Node Name Proxy,并在Proxy层实现了缓存,因此读QPS可以支撑到这个量级)。原因在于,总共10000 Reducer,需要从10000个Mapper处读取数据文件和索引文件,总共需要读取 HDFS 10000 * 1000 * 2 = 2亿次。如果只是Name Node的单点性能问题,还可以通过一些简单的方法解决。例如在Spark Driver侧保存所有Mapper的Block Location,然后Driver将该信息广播至所有Executor,每个Reducer可以直接从Executor处获取Block Location,然后无须连接Name Node,而是直接从Data Node读取数据。但鉴于Data Node的线程模型,这种方案会对Data Node造成较大冲击。最后我们选择了一种比较简单可行的方案,如下图所示。
Mapper的Shuffle输出数据仍然按原方案写本地磁盘,写完后上传到HDFS。Reducer仍然按原始方案通过Mapper侧的External Shuffle Service读取Shuffle数据。如果失败了,则从HDFS读取。这种方案极大减少了对HDFS的访问频率。该方案旨在提升Spark Shuffle稳定性从而提升作业稳定性,但最终没有使用方差等指标来衡量稳定性的提升。原因在于每天集群负载不一样,整体方差较大。Shuffle稳定性提升后,Stage Retry大幅减少,整体作业执行时间减少,也即性能提升。最终通过对比使用该方案前后的总的作业执行时间来对比性能的提升,用于衡量该方案的效果。
如上文所分析,Shuffle性能问题的原因在于,Shuffle Write由Mapper完成,然后Reducer需要从所有Mapper处读取数据。这种模型,我们称之为以Mapper为中心的Shuffle。它的问题在于:- Mapper侧会有M * N * 2次随机读IO(这是最大的性能瓶颈)。
- Mapper侧的External Shuffle Service必须与Mapper位于同一台机器,无法做到有效的存储计算分离,Shuffle服务无法独立扩展。
针对上述问题,我们提出了以Reducer为中心的,存储计算分离的Shuffle方案,如下图所示。
该方案的原理是,Mapper直接将属于不同Reducer的数据写到不同的Shuffle Service。在上图中,总共2个Mapper,5个Reducer,5个 Shuffle Service。所有Mapper都将属于Reducer 0的数据远程流式发送给Shuffle Service 0,并由它顺序写入磁盘。Reducer 0只需要从Shuffle Service 0顺序读取所有数据即可,无需再从M个Mapper取数据。该方案的优势在于:- Shuffle Service可以独立于Mapper或者 Reducer部署,从而做到独立扩展,做到存储计算分离。
- Shuffle Service可将数据直接存于HDFS等高可用存储,因此可同时解决Shuffle稳定性问题。
提问:如果用户的请求同时包含新数据和历史数据,如何处理?回答:一般而言,用户修改数据都是以Partition为单位。所以我们在Partition Parameter上保存了物化列相关信息。如果用户的查询同时包含了新Partition与历史Partition,我们会在新Partition上针对物化列进行SQL Rewrite,历史Partition不Rewrite,然后将新老Partition进Union,从而在保证数据正确性的前提下尽可能充分利用物化列的优势。
提问:你好,你们针对用户的场景,做了很多挺有价值的优化。像物化列、物化视图,都需要根据用户的查询Pattern进行设置。目前你们是人工分析这些查询,还是有某种机制自动去分析并优化?回答:目前我们主要是通过一些审计信息辅助人工分析。同时我们也正在做物化列与物化视图的推荐服务,最终做到智能建设物化列与物化视图。
提问:刚刚介绍的基于HDFS的Spark Shuffle稳定性提升方案,是否可以异步上传Shuffle数据至HDFS?回答:这个想法挺好,我们之前也考虑过,但基于几点考虑,最终没有这样做。第一,单Mapper的Shuffle输出数据量一般很小,上传到 HDFS耗时在2秒以内,这个时间开销可以忽略;第二,我们广泛使用External Shuffle Service和Dynamic Allocation,Mapper执行完成后可能Executor就回收了,如果要异步上传,就必须依赖其它组件,这会提升复杂度,ROI较低。
推荐文章:
Spark SQL/Hive实用函数大全自适应查询执行:在运行时提升Spark SQL执行性能
SparkSQL中产生笛卡尔积的几种典型场景以及处理策略
SparkSQL与Hive metastore Parquet转换
Spark SQL | 目前Spark社区最活跃的组件之一