查看原文
其他

Hudi Spark SQL总结

董可伦 伦少的博客 2023-04-18

前言

总结Hudi Spark SQL的使用,本人仍然以Hudi0.9.0版本为例,也会稍微提及最新版的一些改动。Hudi 从0.9.0版本开始支持Spark SQL,是由阿里的pengzhiwei同学贡献的,pengzhiwei目前已不负责Hudi,改由同事YannByron负责,现在又有ForwardXu贡献了很多功能特性,目前好像主要由ForwardXu负责。
三位都是大佬,都是Apache Hudi Committer,膜拜大佬,向大佬学习!!!大佬的github:

  • 彭志伟(阿里) pengzhiwei  https://github.com/pengzhiwei2018

  • 毕岩(阿里)   YannByron   https://github.com/YannByron

  • 徐前进(腾讯) ForwardXu   https://github.com/XuQianJin-Stars

当然还有很多其他大佬,如Apache member/Hudi PMC Raymond Xu/许世彦 https://github.com/xushiyan,负责整个Spark模块

配置参数

核心参数:

  • --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  

  • --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

使用

三种方式使用Hudi Spark SQL

Spark Thrift Server

启动hudi-spark-thrift-server

1spark-submit --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 2G --driver-cores 2 --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --name hudi-spark-thrift-server  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal spark/indata-192-168-44-128.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab --hiveconf hive.server2.thrift.http.port=20003

连接hudi-spark-thrift-server

1/usr/hdp/3.1.0.0-78/spark2/bin/beeline -u "jdbc:hive2://192.168.44.128:20003/default;principal=HTTP/indata-192-168-44-128.indata.com@INDATA.COM?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice"
2

spark-sql脚本

1spark-sql --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 2G --driver-cores 2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal spark/indata-192-168-44-128.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab

Spark 程序

配置好参数后,直接使用spark.sql(sql)即可

建表

1create table test_hudi_table (
2  id int,
3  name string,
4  price double,
5  ts long,
6  dt string
7using hudi
8 partitioned by (dt)
9 options (
10  primaryKey = 'id',
11  preCombineField = 'ts',
12  type = 'cow'
13 )
14location '/tmp/test_hudi_table'
  • using hudi 表示我们要建的表是Hudi表

  • primaryKey 主键,不设置的话,则表示该表没有主键,0.9.0版本以后必须设置

  • preCombineField 预合并字段

  • type 表类型

也支持其他hudi参数:hoodie开头的一些配置参数,表参数优先级较高,可以覆盖其他SQL默认参数,慎用,因为有些参数可能有bug,比如 hoodie.table.namehoodie.datasource.write.operation,详情参考PR:https://github.com/apache/hudi/pull/5495

  • location 指定了外部路径,那么表默认为外部表,如果不指定则使用数据库路径,为内部表

0.9.0版本以后 options 建议用 tblproperties, options可以继续使用

执行完建表语句,会在对应的表路径下初始化Hudi表,生成.hoodie元数据目录,并且会将Hudi表元数据信息同步到Hive表中,可以自行在Hive中验证内部表外部表的逻辑,Spark SQL目前不能验证,即使为外部表也不显示,不知道是否为bug

insert

1insert into test_hudi_table values (1,'hudi',10,100,'2021-05-05'),(2,'hudi',10,100,'2021-05-05')

1insert into test_hudi_table
2select 1 as id'hudi' as name10 as price, 100 as ts, '2021-05-05' as dt union
3select 2 as id'hudi' as name10 as price, 100 as ts, '2021-05-05' as dt 

insert完查询验证一下,数据是否成功插入

1select * from test_hudi_table
2
3+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
4| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                            _hoodie_file_name                             | id  | name  | price  |  ts  |     dt      |
5+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
6|
 20220513110302       | 20220513110302_0_1    | id:2                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet  | 2   | hudi  | 10.0   | 100  | 2021-05-05  |
7| 20220513110302       | 20220513110302_0_2    | id:1                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet  | 1   | hudi  | 10.0   | 100  | 2021-05-05  |
8+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
9

另外备注一下:insert默认是会随机更新的,随机指某些情况下,这和Hudi合并小文件有关,原理这里不详细解释,可以自行查看源码(以后可能会单独总结一篇相关的文章,和Hudi写文件、合并文件有关)。
要想是insert操作不更新,可以使用以下配置:

1hoodie.merge.allow.duplicate.on.inserts = true

相关PR:https://github.com/apache/hudi/pull/3644,这个PR是在Java客户端支持这个参数的,Spark客户端本身(在这之前)就支持这个参数

update

1update test_hudi_table set price = 20.0 where id = 1

price字段已经成功更新

1+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
2| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                            _hoodie_file_name                             | id  | name  | price  |  ts  |     dt      |
3+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
4|
 20220513110302       | 20220513110302_0_1    | id:2                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet  | 2   | hudi  | 10.0   | 100  | 2021-05-05  |
5| 20220513143459       | 20220513143459_0_1    | id:1                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-57-3422_20220513143459.parquet  | 1   | hudi  | 20.0   | 100  | 2021-05-05  |
6+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
7

delete

1delete from test_hudi_table where id = 1

id为1的记录被成功删除

1+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
2| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                            _hoodie_file_name                             | id  | name  | price  |  ts  |     dt      |
3+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
4|
 20220513110302       | 20220513110302_0_1    | id:2                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet  | 2   | hudi  | 10.0   | 100  | 2021-05-05  |
5+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+

merge

HUDI 支持MERGE语句,有merge into 、merge update和merge delte。可以把增删改统一为:

1merge into test_hudi_table as t0
2using (
3  select 1 as id'hudi' as name112 as price, 98 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
4  select 2 as id'hudi_2' as name10 as price, 100 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
5  select 3 as id'hudi' as name10 as price, 100 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
6 ) as s0
7on t0.id = s0.id
8when matched and opt_type!='DELETE' then update set *
9when matched and opt_type='DELETE' then delete
10when not matched and opt_type!='DELETE' then insert *
1+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+
2| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                             _hoodie_file_name                             | id  |  name   | price  |  ts  |     dt      |
3+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+
4|
 20220513143914       | 20220513143914_0_1    | id:2                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-137-7255_20220513143914.parquet  | 2   | hudi_2  | 10.0   | 100  | 2021-05-05  |
5| 20220513143914       | 20220513143914_0_2    | id:1                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-137-7255_20220513143914.parquet  | 1   | hudi    | 112.0  | 98   | 2021-05-05  |
6+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+
7

这样的好处有:
1、把所有的类型统一为一个SQL,避免产生过多的job
2、避免产生异常,对于insert,如果新insert的主键已经存在会产生异常
3、减少程序复杂度,对于update和delete,不用判断where条件和要修改哪些字段,再去拼接sql(从binlog里可能不能获取这些内容)
4、提升性能,对于批量DELETE, merge的性能要比使用in 好。
但是需要注意,当没有设置preCombineField = 'ts'时,新来的数据会直接覆盖掉历史数据,这种情况存在于新数据的到达时间早于旧数据的到达时间的情况。

merge values

在merge的基础上对源码进行优化(代码目前没有提交到社区,想使用的话可以查看:https://gitee.com/dongkelun/hudi/commits/0.9.0),使Hudi SQL 支持 merge values形式,示例如下:

1merge into test_hudi_table as t0
2using 
3(1'hudi'11298'2021-05-05','INSERT'),
4(2'hudi_2'1100'2021-05-05','UPDATE'),
5(3'hudi'10100'2021-05-05','DELETE')
6as s0 (id,name,price,ts,dt,opt_type)
7on t0.id = s0.id
8when matched and opt_type!='DELETE' then update set *
9when matched and opt_type='DELETE' then delete
10when not matched and opt_type!='DELETE' then insert *

对于想要SQL实现数据同步:这样修改的原因是merge是merge subQuery的形式,当拼接SQL很长时,如7000条记录,这样等于7000个select的语句,程序用递归的形式解析SQL很慢,仅解析subQuery的时间就要10分钟,这样不能满足我们的分钟级事务的需求。而通过修改源码支持 merge values的形式,通过values传值 ,这样解析时间从10分钟降低到几秒,后面在用程序将values转成表,直接upsert,大大提升了每个批次的事务时间。经过测试,千万级历史数据,千万级日增量,即平均每分钟7000条,可以实现分钟级事务的需求。

测评

记录用merge values语句测试的性能结果

Spark Thrift Server配置参数

1--executor-memory 4G --num-executors 15 --executor-cores 2 --driver-memory 6G --driver-cores 2 

历史数据

本次测评数据采用TPC-DS的web_sales表,历史数据一千万,模拟日增量一千万,需要注意的是,源数据表的小数类型同样为double类型,不能是decimal,否在后面增量数据同步时会有异常(Hudi Spark SQL 对于decimal类型有bug)

增量数据

程序读取增量数据拼接SQL,jdbc连接Spark Thrift Server实现增量同步,拼接SQL性能:一万条记录1秒之内完成

测评结果

Spark Server

Streaming

批次批次数据量时间(s)批次批次数据量时间(s)
160001191689859
26000792321127
36000703599936
46000684599933



5590232



6599535

Spark Server为Java程序通过JDBC连接Spark Thrift Server,在第一次没有缓存的情况下时间为120秒,在有缓存的情况下为70秒。
Streaming 为用Structured Streaming在每个批次中拼接Merge SQL,然后调用spark.sql()实现,从结果上看Streaming要比Spark Server快30秒,主要原因是Spark Server 的延迟调度时间比Streaming 的时间长,目前还没有找到解决方案使Spark Server的时间缩减到和Streaming的时间相当。

本次测评模拟的增量数据每分钟包含所有分区,没有起到分区过滤的效果。实际生产数据只包含部分少量分区,可以起到分区过滤的效果,增量同步的性能优于本次测评。

总结

本文主要总结了Hudi0.9.0版本Spark SQL常用的SQL命令的使用以及一些注意事项,其实还支持其他SQL语义,并且新版本支持的更多,大家可以自己学习测试。


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存