Hadoop平台进阶之路 | eBay Spark测试框架——Woody
供稿 | eBay CCOE ADI Hadoop Team
作者 | 王斐
编辑 | 顾欣怡
本文7253字,预计阅读时间23分钟
更多干货请关注“eBay技术荟”公众号
导读
新版本的Spark拥有更好的性能和稳定性,对于用户来说,如果长期停留在低版本的Spark,不仅会浪费集群资源,还会进一步加大平台管理团队的工作量。如果进行Spark大版本升级,考虑到版本间可能由于计算行为不一致而导致的数据质量问题,用户就要投入大量的精力去对比重要的job在不同版本下的数据质量,加大了版本升级的困难度。
ADI Hadoop team负责管理eBay的Hadoop集群、 Spark的版本升级和bug修复等事务。为了提升Spark版本升级的效率,本团队开发了Spark测试框架——Woody。该测试框架会将线上spark-sql job语句转换为和线上job隔离的测试语句,然后调用不同的Spark版本执行测试语句,最终对比版本间数据质量。Woody不仅可以用于Spark版本升级,也可用于job调优以及job pipeline的端到端测试。本文将分享Spark测试框架Woody的架构,实现以及使用场景,希望能对读者有所帮助。
01
背景
Hadoop team目前管理两个大Spark分支,Spark-2.1和Spark-2.3,目前的版本开发均基于Spark-2.3,而对于Spark-2.1分支已经不再进行维护,未来会升级到Spark-3.0。
Hadoop team从两年前就着手进行从Spark-2.1 到Spark-2.3的迁移工作,用了将近两年时间完成了迁移。
为什么会用这么长时间呢?
因为大版本之间可能会存在不兼容问题,计算行为可能发生改变,也就是说两个版本间的计算结果可能不一致。
数据质量是至关重要的,特别是对于金融数据,业务团队需要在升级之前进行两个版本间的计算结果对比。
而这需要用户去手动修改线上代码,然后利用两个Spark版本进行双跑,最后再去手动对比两个版本的计算结果。eBay内部的spark-sql任务数不胜数,大版本升级会消耗大量的资源和人力。
Spark-2.1到Spark-2.3 已经耗费了这么长时间,那么将来升级到Spark-3.0想必也是一个浩大的工程。
为了解决这个问题,Hadoop team开发了一个Spark测试框架,命名为Woody。Woody的名字取自一个卡通啄木鸟,希望可以帮助找出不同Spark版本之间或者Spark job中的bug(虫子)。
Woody可以将线上的SQL语句进行转换,然后分别启动两个Spark版本运行转换后的SQL,从而对比两个版本的计算结果,判断两个版本计算结果是否一致,也可以用于比较两个版本的性能。
02
Woody的架构
Woody的架构如图2所示,提供restful api,使用mysql存储数据,支持多个集群。用户可以一次提交一批用于测试的job,Woody用一个workflow封装这批job,由workflow调度器进行调度,每个workflow调度时生成一个对应的jobSetManager,进入job调度器,job调度器会限制同时运行job的数量。
将job语句转换为测试语句 测试运行前准备工作
调用Spark版本1运行测试语句
计算Spark版本1结果的校验信息
调用Spark版本2运行测试语句
计算Spark版本2结果的校验信息
给出数据质量报告
关于job语句的转换,Woody为各个集群启动多个长运行的conversion executor,这些conversion executor会向Woody Server进行注册并定期汇报自己的心跳,由Conversion Service Manager来管理。
在job需要运行测试语句阶段,Spark App Submit Service会向相应的集群提交Spark任务, Woody会记录其ApplicationId存入mysql。
Woody使用mysql共享状态数据,支持HA, 是cloud-native的服务。在一台Woody服务关闭时会将其正在运行的workflow标记为游离状态,这些游离状态的workflow会被其他正在运行的Woody服务接管,或者由当前Woody服务重启后重新接管。
03
Spark-sql Job
首先,介绍一下本文中对source表,working表和target表的定义:
source表是作为输入的表,是被select from的表;
working表是在job运行中被创建的表,包括(temporary)view; target表是被写入数据的表,比如被load数据,或者被insert数据等等。
前面提到了用户在测试版本间数据质量的时候,需要手动对两个Spark版本间的计算进行对比,这一操作有以下三个要点:
更改SQL语句,至少需要更改insert数据的表名,避免影响线上job;
保持source表的数据一致; 手动检查两个Spark版本的计算结果。
Woody需要自动化完成以上三方面的工作。
首先,对于如何去自动地更改线上job语句,请参考以下这组简单的Spark-sql语句:
create or replace temporary view working_table as select c1, c2, …, cn from src_table;
insert overwrite target_table select working_table.c1 from working_table;
在这组语句里面,由src_table 作为job的输入,而working_table是在job运行时候生成的,target_table则作为job的输出。
如果要更改线上job语句,不被改变的src_table是不用更改的,作为输出的target_table是必须修改的,临时(temporary)的working表(temporary view)是不用更改的,非临时的working表则必须更改。
那么如何才能找出这些src, working和target表呢?
04
Logical Plan
05
ParserRuleContext
06
语句转换
1. 隔离
insert into gdw_tables.tgt_tbl select * from gdw_tables.src_tbl;
会被转换为insert into WOODY_DB_{UNIQUE_ID}.gdw_tables__tgt_tbl select * from gdw_tables.src_tbl;2. Context
source表以及这些表是否是global的表; 后续当做target表的source表;
语句中创建的working表;
被后续当做source表的working表;
被后续当做target表的working表;
target表以及其被写入的partition信息;
在job中被写入数据的dir信息;
当前的数据库是什么,use database语句可更改当前数据库;
当前被alter(包括被写入数据,alter location等等)的表的信息等。
接下来介绍Woody如何提取信息,并将这些信息存入该context,串联整个job。
3. 提取
CreateViewCommand/CreateViewUsing/CreateTable等创建表语句,将被创建的working表信息存入context,同时记录指定的location信息; LoadDataCommand和insert语句,将被insert数据的表以及partition信息存入context;
AlterTable语句,将alter信息加入context...
Project的project list的每一个元素, Join条件的left表达式,right表达式,以及condition表达式,
Aggregate的grouping 表达式,aggregate表达式,
Sort的order表达式……
而接下来需要替换的target表,working表,source表或者location信息,均已从当前语句中提取出来,结合当前的context即可进行转换。
4. 转换
表名信息; 包含base和field信息的节点,例如语句‘select ta.id from ta’的‘ta.id’, 可能需要替换其中的表名ta部分;
表property节点,比如spark data source表property 里面的path就是表的路径,Woody可能需要转换这个路径;
Location节点。
如果该表是source表且没有被alter过,则继续用原表; 如果该表是source表,但是被alter过,则使用转换后的表名(参考第一小节:隔离)。
这个base表可能是别名不需要转换; 这个base表可能是With语句中的temp view,不用转换;
参考第一条的转换规则。
use database, Woody会在context中改变当前database; set/reset/clear cache语句,保留原语句。
由于view只是将一个查询进行封装,底层仍然可能含有source表信息,Woody为了将所有表信息更好地串联起来,会去查询该view的创建语句,然后转换该view的创建语句,直到所有的source表不存在global view为止。
5. 准备
07
数据质量校验
进行完测试前准备之后,Woody按照要对比的两个Spark版本,顺序启动两个Spark 应用来运行转换后的语句。在每个版本的Spark运行结束之后去拿到job输出的信息(table count和checksum以及sample),然后对这些输出数据清空复原。在两个版本的Spark应用均运行结束之后,对比其结果,如果结果一致,则代表两个Spark版本的数据是一致的,是兼容的。
1. 检验什么
insert 语句写入的表 job中创建但是后续没有被使用到的working表
select语句的结果
前两种场景很好理解,Woody会在语法分析的时候记录这些表,然后在Spark计算任务完成之后,去校验这些表。
2. 关于select
在Spark执行这些select语句的时候,Woody会再次识别出这些select语句;然后拿到该select语句的AnalyzedLogicalPlan,从而拿到select结果的schema信息;再根据schema信息,创建用于保存select结果的表;之后再把select语句的结果保存到这个表中,用于后续的结果校验。
3.如何校验
08
使用场景
1. Spark版本升级测试
后续会支持账号级别的升级测试,只需要选择用户的账号,Woody即可对该账号需要升级的所有job进行升级测试,测试通过即可将该账号迁移至新版本Spark。
2. Spark-sql job调优
参数调优; Sql语句调优。
Woody支持在选择Spark版本运行测试时,指定conf或者修改job语句来进行对比测试,用户可以利用Woody来进行job 调优。
3. 端到端测试
对于端到端的测试,Woody基于用户的账号来创建一个数据库用于存放job的输出结果,数据库命名规则为pre_prod_{preprod_account}_db;其location为/preprod/{preprod_account}/analysis.db。
将job pipeline进行串联,也就是将前面job的输出作为后面job的输入。一个job的输出表达有两种:数据写到了哪些表 数据写到了哪些路径
在转换第二条AlterTableSetLocation语句时,由于这个被set的location是前面写入过数据的路径,因此,Woody也会知道测试表 preprod_b_woody_sub_db.gdw_tables__tba 被set到了一个被写入过数据的路径,也是一个被写入数据的表。另外一种情况,如果第一条insert 语句和第二条AlterTableSetLocation语句分别属于两个job,但是有前后关系。在第一条Insert 语句运行完之后, ‘/preprod/b_woody_sub/analysis.db/sys/edw/gdw_tables/tba_merge/dt=20201019’会被创建出来;对于第二条AlterTableSetLocation语句,Woody判断得到:‘/sys/edw/gdw_tables/tba_merge/dt=20201019’ 对应的测试location ‘/preprod/b_woody_sub/analysis.db/sys/edw/gdw_tables/tba_merge/dt=20201019’ 已经存在,也会将 preprod_b_woody_sub_db.gdw_tables__tba 标记为被写入数据的表。最终Woody会将这个job中所有被写入数据的表跟这个jobPlan 的planId关联起来,存储到mysql数据库中,作为这个jobPlan的context。在这个jobPlan中的其他job运行时,就可以优先读取这些上游产生的测试数据,而不是去读取线上数据,这样,一条pipeline就可以串联起来。2)转换详情
为Woody开发了conversion history server以及runtime的conversions页面。图6是一个runtime的转换详情页面。
09
总结
作者简介
王斐, eBay ADI Hadoop团队Spark平台开发工程师,大数据领域技术爱好者,对Spark有浓厚兴趣和一定的了解,目前主要专注于Spark和Woody的开发工作。如对本文有任何问题或见解,欢迎移步评论区,作者大大在线回复,机不可失失不再来
数据之道 | Akka Actor及其在商业智能数据服务中的应用
eBay大量优质职位,等的就是你