实战 | 利用Delta Lake使Spark SQL支持跨表CRUD操作
供稿 | eBay ADI-Carmel Team
作者 | 金澜涛
编辑 | 顾欣怡本文7309字,预计阅读时间22分钟更多干货请关注“eBay技术荟”公众号导读
本文介绍eBay Carmel团队利用Delta Lake,使Spark SQL支持Teradata的Update/Delete语法。主要从源码角度介绍了CRUD操作的具体实现和优化,以及delta表的管理工作。希望对同业人员有所启发和帮助。
摘要
大数据处理技术朝传统数据库领域靠拢已经成为行业趋势,目前开源的大数据处理引擎,如Apache Spark、Apache Hadoop、Apache Flink等等都已经支持SQL接口,且SQL的使用往往占据主导地位。各个公司使用以上开源软件构建自己的ETL框架和OLAP技术,但在OLTP技术上,仍然是传统数据库的强项。其中的一个主要原因是传统数据库对ACID的支持。具有ACID能力的传统商用数据库基本都实现了完整的CRUD操作。而在大数据技术领域,由于缺少ACID的支持,基本只实现了C/R操作,对U/D操作很少涉及。
eBay数据仓库的部分基础设施是构建在商用数据产品Teradata之上的,近年来,随着公司整体朝开源技术迁移,数据仓库的基础设施已基本迁移到Apache Hadoop、Apache Spark平台。但要完全从Teradata上迁移下来,必须构建具有相同能力的SQL处理引擎。在Teradata上的分析型SQL,有超过5%的查询使用Update/Delete操作,目前Apache Spark并不具备这个能力。
本文介绍eBay Carmel团队利用Delta Lake,使Spark SQL支持Teradata的Update/Delete语法。对比标准SQL的Update/Delete语法,以及目前尚未正式发布的Apache Spark 3.0 提供的语法(不含实现),我们还实现了Teradata的扩展语法,可以进行跨表更新和删除的SQL操作。
1.
简介
Carmel Spark是Carmel团队基于Apache Spark进行魔改的SQL-on-Hadoop引擎。主要改善了交互式分析的使用体验,提供即席查询(ad-hoc)服务。Carmel Spark是“Teradata退出”项目的重要组成部分,在功能性和性能上,都做了大量开发和优化。例如全新的CBO、并发调度、物化视图、索引、临时表、Extended Adaptive Execution、Range Partition、列级访问权限控制,以及各类监控和管理功能等,目前已经在线上使用且满足业务需求。
但由于Apache Spark缺少ACID事务能力,并没有提供Update/Delete语法。去年年初,Databricks开源了存储层Delta Lake,为Apache Spark 提供可伸缩的 ACID 事务,提供事务管理、统一流批、元数据管理、版本回溯等数据库领域常见功能。一年过去了,Delta Lake的版本也更新到了0.5.0,但开源版本始终没有提供Update/Delete的SQL实现。目前只提供Dataframe API,用户需通过编写代码来对数据进行更新和删除等操作。此外,根据Apache Spark 3.0分支上提供的SQL语法接口,也只支持基本的单表Update/Delete操作,对于复杂的带有join语义的跨表操作,则完全不支持。而Teradata用户已经在广泛使用扩展的SQL语法对数据进行更新和删除操作。
基于Delta Lake存储层提供的ACID事务能力,Carmel Spark实现了Update/Delete的SQL语法,且该语法完全兼容Teradata的扩展语义,即能进行跨表的更新和删除。同时,我们拓展了delta表的数据分布,支持bucket delta表,并对其进行了bucket join等优化。此外,由于Carmel Spark集群部署是多租户的,所以同一套代码会长期运行在YARN的不同队列中。虽然Delta Lake存储层提供了良好的事务隔离性,但仍会出现重复操作的风险(非同一事务)。因此,我们使用delta表本身来治理delta表,即将所有delta表的元信息存储在一张delta表中,通过对该元数据表的增删改查操作,来对用户使用的所有delta表进行管理。
本文的组织结构如下:第二节介绍相关技术和产品;第三节阐述项目的整体架构和实现;第四节详细介绍如何利用Delta Lake使SparkSQL支持CRUD操作;第五节介绍delta表的bucket优化;第六节介绍delta表的自治和管理;最后两节分别谈一下未来的工作和对本文的总结。
2.
相关工作
Apache Spark 3.0开始,SQL模块提供了Update/Delete的语法定义,定义在Antlr4的语法文件里,但并没有具体实现,而是交由第三方实现。如图1所示:
Teradata[2]是Teradata Corp.开发的可横向扩展的关系型数据库管理系统,设计用于分析型查询,主要用于数据仓库领域,采用大规模并行处理(MPP)架构。Teradata对Update/Delete等语法支持非常完备,除了ANSI SQL: 2011定义的标准Update/Delete语法,Teradata还做了大量扩展,如跨表更新和删除。其所提供的丰富的语法也给我们迁移到Spark带来了挑战。图2所示为Teradata支持的更新和删除语法:
图2 Teradata的单表以及跨表Update/Delete语法(点击可查看大图)
2018年初,Databricks开源了存储层Delta Lake[3],为Apache Spark 提供可伸缩的 ACID 事务,提供事务管理、统一流批、元数据管理、版本回溯等数据库领域常见功能。Delta Lake将其数据存储在Parquet文件中,并提供ACID事务。它使用名为Delta Log的事务日志来跟踪对表所做的所有更改。
与开源的Delta Lake相比,Databricks内部版本可以通过SQL来进行Update/Delete操作,而目前开源版本只支持DataFrame的API,只能通过Parquet[4]文件推断表的Schema信息,对Hive Metastore[5]的支持较弱,且不支持bucket表等等。Apache Iceberg[6]和Apache Hudi[7]虽然实现形式与Delta Lake不同,但在Update/Delete的SQL语法支持上,目前都不完善。表1给出了这三个系统的对比(截止2019年11月)。
3.
项目概述
有了Delta Lake在存储层提供ACID事务保障,我们的主要工作就是利用Delta Lake,在我们的Spark版本上实现和Teradata相同的Update/Delete功能。要达到这个目标,有以下任务有待完成:
1. Delta Lake目前只支持Apache Spark 2.4+版本,而Carmel团队使用的Spark版本是基于2.3版本的,所以我们改了Delta Lake的部分实现并为我们的Spark版本打了一些补丁。
2. Spark 3.0中虽然没有Update/Delete语法的具体实现,但仍然在Catalyst[8]中加入了相关的逻辑计划节点。不过这些新增的接口都是基于DataSourceV2的,我们需要将这部分代码在DataSourceV1上进行重写:(点击可查看大图)
3. Teradata支持跨表的Update/Delete语法,目前Delta Lake和Spark都不支持,我们需要自己实现带join的跨表连接更新和删除操作。
4. Delta Lake目前对Catalog[9]的访问还不成熟,delta表的schema是通过Parquet文件推断出来的,通过Catalog访问Hive Metastore是使用SQL访问delta表的重要一环。5. 由于上述原因,delta表无法识别bucket信息,更没有考虑读写bucket表时的分布(distribution)。6. 在以上3,4,5步骤完成之后,还要对跨表操作进行优化,这里将主要介绍bucket join的优化。7. 开源版本的Delta Lake缺少一定的管理机制,需要实现一些自动化管理功能,如自动清理和合并文件等。4.
CRUD的实现
首先,要在我们的Spark 2.3内部版本中使用Delta Lake,就需要从社区打一些补丁。这里重点说一下SPARK-28303。
Delta Lake目前不支持Update/Delete SQL的解析,我们增加了两个类:DeltaSqlResolution和PreprocessTableUpdateDelete,通过SparkSessionExtensions注入到Analyzer:
DeltaSqlResolution主要是用于解析condition和assignments表达式:
再由PreprocessTableUpdateDelete生成RunnableCommand。如果是delta表的话,这里可以从LogicalRelation中拿出delta表的TahoeFileIndex(在DataSource.scala的resolveRelation中添加的),如果是非delta表,则会抛出AnalysisException。
UpdateCommand是Delta Lake自带的类,我们对其改动不多,主要改了如下几个地方:
目前Spark3.0定义的Update/Delete语法不支持跨表操作,而跨表更新和删除操作却十分普遍,比如更新目标表中具有(在inner join情况下)或可能没有(在left outer join情况下)另一个表匹配行的行。
许多数据库都提供跨表更新和删除的语法。下面给出了几种常用数据库的跨表更新的例子。
(点击可查看大图)
和单表Update一样,首先对condition和SET子句进行解析。不同的是,除了被更新的target是一个LogicalRelation以外,这里的source可以是一个LogicalRelation,也可以是多张表连接在一起的join plan。
我们从WHERE条件的condition中分离出哪些是target和source之间的join criteria,哪些是source中自身的join criteria(source可以是多表join的plan),以及哪些是分别作用在target或source上的普通Filter。同样地,再由PreprocessTableUpdateDelete生成Runnable Command:
上述代码中,跨表更新和单表更新的区别是多构建了一个DeltaMergeAction。可见跨表更新的实现参考了MergeInto。
UpdateWithJoinCommand是跨表更新的主要执行类,一共分为三步:
1. 通过将需要被更新的target表和source(可以是一个带join的plan)进行内连接(inner join)找出所有会被更新的行所涉及的文件,标记为removeFiles。这一步还能简化后续的步骤,例如不涉及任何文件或者只涉及partition目录时,不用全表执行第2步。
2. 将target和source使用左外连接(left outer join),对于join条件匹配的行,使用build side iterator的数据(右表),不匹配的行使用stream side iterator的数据(左表)。将数据写出到target表,写出的数据文件标记为addedFiles。3. 将1中removeFiles和2中的addedFiles写入transaction log中,即delta log。删除操作和更新操作基本类似,可以视为更新操作的简化版,这里就不展开了。
这里我们把catalogTable对象传入到DeltaDataSource的createRelation方法里。补充一点,之所以这个case可以匹配到DeltaDataSource,是因为我们在ConvertToDelta Command里,通过alterTable,把provider从parquet改成了delta:
回到createRelation。通过传入的catalogTable对象,我们在DeltaLog.scala里将表的信息填到HadoopFsRelation里面:
Delta表的INSERT操作也很简单。在DataSourceStrategy中添加InsertIntoData SourceCommand:
普通delta表的insert我们没有进行修改,这里就不展开了,下一节讲bucket表的insert时再详细阐述这部分的改动。
创建delta表(CREATE操作)目前完全复用了普通Parquet表的CREATE,只是需要在建完表后执行CONVERT TO DELTA命令。我们简单做了一些修改,使其可以CONVERT一张空的Parquet表,目前社区版是不支持的。其他的修改主要是针对管理上的,在第六节会详细介绍。
到此,CRUD功能的SQL实现已经基本完成。在这一节里,我们引入了跨表更新操作,但是跨表更新涉及到join算子,这在大表之间进行更新操作时会有性能问题。在下一节中会介绍如何针对bucket表进行优化。
5.
Bucket优化
然而目前delta表并不支持分桶表,相关代码的BucketSpec都被默认填了None,对更新和删除的操作也没有考虑数据的分布(Distribution)。那么该如何实现bucket表的数据分布呢?
在4.3小节中我们提到了在ResolveRelation时将CatalogTable对象传入了HadoopFsRelation。有了这个CatalogTable对象,就可以帮我们在后续的各类操作中识别bucket表了。
上一步只是告诉Spark,这是一张bucket表,真正写入数据的时候发现数据并没有分桶分布。这是因为Insert操作在delta表上是走InsertIntoDataSource -> InsertIntoDataSourceCommand的,而不是通过DataWritingCommand,所以也就走不到ensureDistributionAndOrdering的逻辑。以下代码是社区版InsertIntoDataSourceCommand的实现:
如上代码所示,它的实现非常简单,将需要insert的逻辑计划“query”封装成一个data frame,然后传入到实现类的insert方法里。在Delta Lake中这个data frame会被传入到TransactionalWrite的writeFiles方法中。最终从这个data frame中取出physical plan并传入DataFormatWriter的write方法。之后就是真正的生成job并分发执行了。
从整个流程可以看出,从一开始的逻辑计划对象“query”到最后的物理计划,并没有机会进行数据分布的实现。所以不管在建表时是否指定分桶,插入数据时都不会满足数据分布。
鉴于目前DataSource并没有考虑数据分布的问题,我们在resolution阶段就需要进行处理。大体就是在Catalyst里增加一个InsertIntoDataSource的逻辑计划节点和一个InsertIntoDataSourceExec的物理计划节点。在InsertIntoDataSourceExec这个物理计划中实现了requiredChildDistribution和requiredChildOrdering方法(代码可以参考InsertIntoHadoopFsRelationCommand的requiredDistribution和requiredOrdering方法)。
这里说一下整体流程。首先,DataSourceStrategy原本是匹配到了InsertIntoTable就会将逻辑计划“query”原封不动地传入InsertIntoDataSource Command。我们现在做出如下改变:增加一个新的逻辑计划节点InsertIntoDataSource,为其添加partition,bucket等信息,并将“query”作为该新节点的child:然后在SparkStrategy.scala的BasicOperators里将InsertIntoDataSource节点转成物理计划节点InsertIntoDataSourceExec,通过planLater(i.query)得到物理计划作为该物理节点的child。这样InsertIntoDataSourceExec的requiredChildDistribution和requiredChildOrdering方法就可以对数据进行分布了:
到目前为止,对delta表的改造已经使其具有了bucketSpec字段和数据分布的特性。在跨表更新或删除时,无论是inner join还是left outer join,只要target和source都是bucket表且满足bucket join条件,就能走bucket join而不是SortMergeJoin。这就解决了大表之间join产生大量shuffle带来的性能问题。
下面这个例子是跨表更新一张3.9TB的表,source则是一张5.2TB的表。图3所示是left outer join阶段,右表虽然有一个Filter,但是仍然不满足broadcast join阈值,这个更新操作在非bucket join的情况下,会造成大量Executor OOM,最终导致job失败。通过引入bucket join,该job在2分钟左右就能顺利完成。从图3可以看到在SortMergeJoin的前后,已经没有ShuffleExchange了。
6.
Delta的自治和管理
介绍完CRUD的功能和相关优化,这一节讲一下我们是如何管理delta表的,主要包括:如何统计delta的使用情况,如何自动进行文件清理,如何管理TimeTraval[13]等。
在这之前我们需要简单介绍一下eBay Carmel Spark的基本架构。eBay的Carmel Spark平台是计算存储分离的。数据存储有一个专门的Hadoop集群(Apollo),Carmel Spark集群(Hermes)主要是由大内存加SSD的计算节点组成,通过YARN[14]进行调度。除了本地SSD以外,也有一部分存储容量搭建了一个小容量的HDFS,主要是拿来做Relation Cache和物化视图,这部分以后有机会另起一篇文章进行介绍。
我们使用Spark Thriftserver来提供JDBC和ODBC服务,但所有的Thriftserver并不是固定在某个机器上的,而是通过YARN进行调度,通过cluster mode将Spark Thriftserver提交到集群内部。同时,根据Budget Group对YARN集群分queue,不同的Budget Group有一个YARN的queue,例如广告部门有一个queue,数据部门有一个queue,每个queue可以有多个Spark Thriftserver。Carmel Spark对scheduler模块做过大量并发优化,经过压测,一个Driver调度起来的任务能把200台物理机的所有CPU压满。所以Driver调度并不是瓶颈,目前最大的一个queue仅使用一个Thriftserver就可以调度近7000个executors。
图4 Carmel Spark集群部署
(点击可查看大图)
目前有多少个queue,就有多少个Thriftserver,也就有多少个Application。但不同的Thriftserver仍然共享了一些组件,例如HDFS,Hive Metastore等。这就要求我们对所有的queue做一些管理。例如在物化视图功能中,当对一张基础表构建物化视图后,所有的queue都需要在内存里构建一些逻辑计划树。delta表的管理也类似,不过相比物化视图简单的多。例如我们要对所有的delta表进行自动化的文件清理工作,一种方式是起一个后台线程遍历Hive Metastore的所有表,对provider是delta的表进行处理。这样的好处是不需要跨Thriftserver进行任何消息的同步,坏处自然是不断遍历Hive Metastore带来的压力(多集群公用的Hive Metastore压力已经比较大了)。所以我们使用了一种更加直观的方式进行管理,即用delta表来管理delta表。
我们创建了一张名为carmel_system.carmel_ delta_meta的表,记录了如表名、owner、deltalog路径、是否自动清理、清理周期等元信息,并将其CONVERT成一张delta表。所以carmel_delta_meta表的第一条记录就是自己的信息。然后我们提供了一套操作这张表的API,以调用静态方法的方式放在DeltaTableMetadata类的半生对象中:DeltaValidate线程会自动生成Vacuum任务,并丢到Vacuum线程池调度执行。这里就不贴代码了。整个架构如图5所示:
图5 delta表的自治管理
(点击可查看大图)
此外,我们还增加了TimeTravel的SQL语义,用户可以通过在SELECT命令里增加AT关键字,单次读取delta表某个version的快照。也可以通过ROLLBACK命令永久回到某个版本:通过carmel_delta_meta中记录的一些表的血缘信息,可以实现delta表的及联回滚。在某个delta表rollback后,触发器根据carmel_delta_meta的血缘信息,自动回滚其他相关表(这需要事先定义在carmel_delta_meta的rollback依赖树和触发器条件,该功能目前还未上线)。
上面介绍了通过delta表来管理delta表的方式,这一方法能很好地帮我们解耦队列同步和外部系统依赖的问题,既方便灵活,又快捷安全。
7.
未来的工作
Carmel Spark项目经过两年的技术迭代,已经具备非常多的功能和优化,例如Range Partation、Optimized Bucket Join、Broadcast/Local Cache、Extended Adaptive Execution、Parquet File Index、Materialized View、ACL、Volcano CBO、Adaptive Runtime Filter、Mutiple Files Scan等,如何让新的功能如CRUD复用以上优化和特性,也变得越来越富有挑战了。例如我在测试时发现Broadcast Cache和Mutiple Files Scan两个功能在和CRUD功能集成时存在bug,又或者目前的Volcano CBO和Parquet File Index还不能应用在delta表上等。
除了性能的优化,Carmel Spark作为Teradata战略代替品,需要尽可能兼容Teradata的语义,后续如果有用户需要MERGE INTO或者UPSERT操作,这部分还要继续扩展。此外,目前UPDATE和DELETE的WHERE条件还不支持子查询,CONVERT TO DELTA不支持Parquet Format的Hive表,这些都将是后续的工作。
8.
实施和总结
最后说一下用户支持,其实做一个项目最复杂也是最耗时的并不是编码阶段,而是上线后接受用户的考验。该功能的第一批用户是来自eBay瑞士的财务部门分析师团队,因为不在同一个时区,春节假期里几乎每晚都会通过Zoom和我沟通。这种在用户和开发者之间的持续交流,使得一些隐藏的问题即时浮现出来,用户也得到了较好的使用体验。我们的Carmel Spark每周都会有半个小时的例行发布窗口,用户遇到的bug几乎都在下次发布窗口时得到了修复。在这一周中,我们也会找出workaround方式,帮助用户进度的推进。目前该功能已经在所有队列上启用,越来越多的用户开始参与试用。
参考文献
[14]https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html