Hudi查询类型/视图总结
前言
上面文章Hive增量查询Hudi表提到Hudi表有读优化视图和实时视图,其实当时并没有完全掌握,所以现在单独学习总结。Hudi官网文档中文:https://hudi.apache.org/cn/docs/0.9.0/concepts/称之为视图,其实英文:https://hudi.apache.org/cn/docs/concepts/#views为query types翻译过来为查询类型
Query types
Hudi 支持下面三种视图
Snapshot Queries 快照查询/实时视图 Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features. 在此视图上的查询可以查看给定提交或压缩操作时表的最新快照。对于读时合并表(MOR表) 该视图通过动态合并最新文件切片的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟)。对于写时复制表(COW表),它提供了现有parquet表的插入式替换,同时提供了插入/删除和其他写侧功能。
Incremental Queries 增量查询/增量视图,也就是上篇文章讲的增量查询 Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines. 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。
Read Optimized Queries 读优化查询/读优化视图 : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table. 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新文件切片中的基本/列文件暴露给查询,并保证与非Hudi列式数据集相比,具有相同的列式查询性能。
表类型
Table Type | Supported Query types |
---|---|
Copy On Write | Snapshot Queries + Incremental Queries |
Merge On Read | Snapshot Queries + Incremental Queries + Read Optimized Queries |
也就是读优化视图只有在MOR表中存在,这点在上篇文章中也提到过,这次会从源码层面分析两种表类型的区别以及如何实现的。
另外关于这一点官网中文文档写错了,大家注意别被误导,估计是因为旧版本,且中文文档没有人维护贡献,就没人贡献修改了~,稍后我有时间会尝试提个PR修复一下,错误截图:
2022.06.30更新:已提交PR https://github.com/apache/hudi/pull/6008
源码
简单从源码层面分析同步Hive表时两种表类型的区别,Hudi同步Hive元数据的工具类为HiveSyncTool
,如何利用HiveSyncTool
同步元数据,先进行一个简单的示例,这里用Spark进行示例,因为Spark有获取hadoopConf的API,代码较少,方便示例,其实纯Java也是可以实现的
1val basePath = new Path(pathStr)
2val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
3val hiveConf: HiveConf = new HiveConf()
4hiveConf.addResource(fs.getConf)
5val tableMetaClient = HoodieTableMetaClient.builder.setConf(fs.getConf).setBasePath(pathStr).build
6val recordKeyFields = tableMetaClient.getTableConfig.getRecordKeyFields
7var keys = ""
8if (recordKeyFields.isPresent) {
9keys = recordKeyFields.get().mkString(",")
10}
11var partitionPathFields: util.List[String] = null
12val partitionFields = tableMetaClient.getTableConfig.getPartitionFields
13if (partitionFields.isPresent) {
14 import scala.collection.JavaConverters._
15 partitionPathFields = partitionFields.get().toList.asJava
16}
17val hiveSyncConfig = getHiveSyncConfig(pathStr, hiveDatabaseName, tableName, partitionPathFields, keys)
18new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
19
20def getHiveSyncConfig(basePath: String, dbName: String, tableName: String,
21 partitionPathFields: util.List[String] = null, keys: String = null): HiveSyncConfig = {
22 val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
23 hiveSyncConfig.syncMode = HiveSyncMode.HMS.name
24 hiveSyncConfig.createManagedTable = true
25 hiveSyncConfig.databaseName = dbName
26 hiveSyncConfig.tableName = tableName
27 hiveSyncConfig.basePath = basePath
28 hiveSyncConfig.partitionValueExtractorClass = classOf[MultiPartKeysValueExtractor].getName
29 if (partitionPathFields != null && !partitionPathFields.isEmpty) hiveSyncConfig.partitionFields = partitionPathFields
30 if (!StringUtils.isNullOrEmpty(keys)) hiveSyncConfig.serdeProperties = "primaryKey = " + keys //Spark SQL 更新表时需要该属性确认主键字段
31 hiveSyncConfig
32 }
这里利用tableMetaClient来获取表的主键和分区字段,因为同步元数据时Hudi表文件肯定已经存在了,当然如果知道表的主键和分区字段也可以自己指定,这里自动获取会更方便一些。
其实主要是获取配置文件,构造同步工具类HiveSyncTool
,然后利用syncHoodieTable
同步元数据,建Hive表
接下来看一下源码,首先new HiveSyncTool
时,会根据表类型,当表类型为COW时,this.snapshotTableName = cfg.tableName
,snapshotTableName 也就是实时视图等于表名,而读优化视图为空,当为MOR表示,实时视图为tableName_rt
,而对于读优化视图,默认情况下为tableName_ro
,
当配置skipROSuffix=true时
,等于表名,这里可以发现当skipROSuffix=true时
,MOR表的读优化视图为表名而COW表的实时视图为表名,感觉这里有点矛盾,可能是因为MOR表的读优化视图和COW表的实时视图查询均由HoodieParquetInputFormat
实现,具体看后面的源码分析
1 private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
2 public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
3 public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
4
5 protected final HiveSyncConfig cfg;
6 protected HoodieHiveClient hoodieHiveClient = null;
7 protected String snapshotTableName = null;
8 protected Option<String> roTableName = null;
9
10 public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
11 super(configuration.getAllProperties(), fs);
12
13 try {
14 this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
15 } catch (RuntimeException e) {
16 if (cfg.ignoreExceptions) {
17 LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
18 } else {
19 throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
20 }
21 }
22
23 this.cfg = cfg;
24 // Set partitionFields to empty, when the NonPartitionedExtractor is used
25 if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
26 LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
27 cfg.partitionFields = new ArrayList<>();
28 }
29 if (hoodieHiveClient != null) {
30 switch (hoodieHiveClient.getTableType()) {
31 case COPY_ON_WRITE:
32 // 快照查询/实时视图等于表名
33 this.snapshotTableName = cfg.tableName;
34 // 读优化查询/读优化视图为空
35 this.roTableName = Option.empty();
36 break;
37 case MERGE_ON_READ:
38 // 快照查询/实时视图等于 表名+SUFFIX_SNAPSHOT_TABLE即 tableName_rt
39 this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
40 // 读优化查询/读优化视图 skipROSuffix默认为false 默认情况下 tableName_ro
41 // 当配置skipROSuffix=true时,等于表名
42 this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
43 Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
44 break;
45 default:
46 LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
47 throw new InvalidTableException(hoodieHiveClient.getBasePath());
48 }
49 }
50 }
接下来再看一下,上篇文章中提到的两个视图的实现类HoodieParquetInputFormat
和HoodieParquetRealtimeInputFormat
1 @Override
2 public void syncHoodieTable() {
3 try {
4 if (hoodieHiveClient != null) {
5 doSync();
6 }
7 } catch (RuntimeException re) {
8 throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re);
9 } finally {
10 if (hoodieHiveClient != null) {
11 hoodieHiveClient.close();
12 }
13 }
14 }
15 protected void doSync() {
16 switch (hoodieHiveClient.getTableType()) {
17 case COPY_ON_WRITE:
18 // COW表只有snapshotTableName,也就是实时视图,查询时是由`HoodieParquetInputFormat`实现
19 syncHoodieTable(snapshotTableName, false, false);
20 break;
21 case MERGE_ON_READ:
22 // sync a RO table for MOR
23 // MOR 表的读优化视图,以`_RO`结尾,`READ_OPTIMIZED`的缩写,查询时由`HoodieParquetInputFormat`实现
24 syncHoodieTable(roTableName.get(), false, true);
25 // sync a RT table for MOR
26 // MOR 表的实时视图,以`_RT`结尾,`REAL_TIME`的缩写,查询时由`HoodieParquetRealtimeInputFormat`实现
27 syncHoodieTable(snapshotTableName, true, false);
28 break;
29 default:
30 LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
31 throw new InvalidTableException(hoodieHiveClient.getBasePath());
32 }
33 }
可以看到,两个表的区别为:1、COW只同步1个表的元数据:实时视图,MOR表同步两个表的元数据,读优化视图和实时视图 2、除了表名外,参数也不一样,这也就决定了查询时用哪个实现类来实现
由于这篇文章不是主要讲解同步Hive元数据的源码,所以这里只贴主要实现部分,以后会单独总结一篇同步Hive元数据源码的文章。
1 protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
2 boolean readAsOptimized) {
3 syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
4 }
5
6 private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
7 boolean readAsOptimized, MessageType schema) {
8
9 Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized);
10
11 String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
12
13 }
14
15 public static String getInputFormatClassName(HoodieFileFormat baseFileFormat, boolean realtime) {
16 switch (baseFileFormat) {
17 case PARQUET:
18 if (realtime) {
19 return HoodieParquetRealtimeInputFormat.class.getName();
20 } else {
21 return HoodieParquetInputFormat.class.getName();
22 }
23 case HFILE:
24 if (realtime) {
25 return HoodieHFileRealtimeInputFormat.class.getName();
26 } else {
27 return HoodieHFileInputFormat.class.getName();
28 }
29 case ORC:
30 return OrcInputFormat.class.getName();
31 default:
32 throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
33 }
可以看到对于存储类型为PARQUET时,当useRealtimeInputFormat
为true时,那么inputFormat的实现类为HoodieParquetRealtimeInputFormat
,当为false时,实现类为HoodieParquetInputFormat
,至于另外一个参数readAsOptimized
,是否为读优化,这个参数是Spark SQL读取时用来判断该表为实时视图还是读优化视图,相关源码
1// 同步元数据建表时添加参数:`hoodie.query.as.ro.table=true/false`
2sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
3
4Spark读取Hive表时,用来判断,在类`org.apache.hudi.DataSourceOptionsHelper`
5
6 def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
7 // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
8 // or else use query type from QUERY_TYPE.
9 val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
10 .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
11 .getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
12
13 Map(
14 QUERY_TYPE.key -> queryType
15 ) ++ translateConfigurations(parameters)
16 }
体现在建表语句里则为:
1WITH SERDEPROPERTIES ( |
2| 'hoodie.query.as.ro.table'='false',
inputFormat的语句:
1STORED AS INPUTFORMAT |
2| 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
完整的建表语句在后面的示例中
示例
DF
这里利用Apache Hudi 入门学习总结中写Hudi并同步到Hive表的程序来验证
COW表
由于之前的文章中已经有COW表的建表语句了,这里直接copy过来
1+----------------------------------------------------+
2| createtab_stmt |
3+----------------------------------------------------+
4| CREATE TABLE `test_hudi_table_1`( |
5| `_hoodie_commit_time` string, |
6| `_hoodie_commit_seqno` string, |
7| `_hoodie_record_key` string, |
8| `_hoodie_partition_path` string, |
9| `_hoodie_file_name` string, |
10| `id` int, |
11| `name` string, |
12| `value` int, |
13| `ts` int) |
14| PARTITIONED BY ( |
15| `dt` string) |
16| ROW FORMAT SERDE |
17| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
18| WITH SERDEPROPERTIES ( |
19| 'hoodie.query.as.ro.table'='false', |
20| 'path'='/tmp/test_hudi_table_1', |
21| 'primaryKey'='id') |
22| STORED AS INPUTFORMAT |
23| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
24| OUTPUTFORMAT |
25| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
26| LOCATION |
27| 'hdfs://cluster1/tmp/test_hudi_table_1' |
28| TBLPROPERTIES ( |
29| 'last_commit_time_sync'='20220512101500', |
30| 'spark.sql.sources.provider'='hudi', |
31| 'spark.sql.sources.schema.numPartCols'='1', |
32| 'spark.sql.sources.schema.numParts'='1', |
33| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
34| 'spark.sql.sources.schema.partCol.0'='dt', |
35| 'transient_lastDdlTime'='1652320902') |
36+----------------------------------------------------+
可以看到'hoodie.query.as.ro.table'='false'
,对于COW表的视图为实时视图,inputFormat为org.apache.hudi.hadoop.HoodieParquetInputFormat
MOR表
我们将之前的save2HudiSyncHiveWithPrimaryKey
方法中加个表类型的参数option(TABLE_TYPE.key(), MOR_TABLE_TYPE_OPT_VAL)
,将表名库名修改一下:
1 val databaseName = "test"
2 val tableName1 = "test_hudi_table_df_mor"
3 val primaryKey = "id"
4 val preCombineField = "ts"
5 val partitionField = "dt"
6 val tablePath1 = "/tmp/test_hudi_table_df_mor"
同步Hive表成功后,show tables
,发现建了两张表test_hudi_table_df_mor_ro
和test_hudi_table_df_mor_rt
,通过上面的源码分析部分,我们知道_ro
为读优化表,_rt
为实时表,我们再看一下建表语句:
1+----------------------------------------------------+
2| createtab_stmt |
3+----------------------------------------------------+
4| CREATE TABLE `test_hudi_table_df_mor_ro`( |
5| `_hoodie_commit_time` string, |
6| `_hoodie_commit_seqno` string, |
7| `_hoodie_record_key` string, |
8| `_hoodie_partition_path` string, |
9| `_hoodie_file_name` string, |
10| `id` int, |
11| `name` string, |
12| `value` int, |
13| `ts` int) |
14| PARTITIONED BY ( |
15| `dt` string) |
16| ROW FORMAT SERDE |
17| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
18| WITH SERDEPROPERTIES ( |
19| 'hoodie.query.as.ro.table'='true', |
20| 'path'='/tmp/test_hudi_table_df_mor', |
21| 'primaryKey'='id') |
22| STORED AS INPUTFORMAT |
23| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
24| OUTPUTFORMAT |
25| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
26| LOCATION |
27| 'hdfs://cluster1/tmp/test_hudi_table_df_mor' |
28| TBLPROPERTIES ( |
29| 'last_commit_time_sync'='20220629145934', |
30| 'spark.sql.sources.provider'='hudi', |
31| 'spark.sql.sources.schema.numPartCols'='1', |
32| 'spark.sql.sources.schema.numParts'='1', |
33| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
34| 'spark.sql.sources.schema.partCol.0'='dt', |
35| 'transient_lastDdlTime'='1656486059') |
36+----------------------------------------------------+
37
38+----------------------------------------------------+
39| createtab_stmt |
40+----------------------------------------------------+
41| CREATE TABLE `test_hudi_table_df_mor_rt`( |
42| `_hoodie_commit_time` string, |
43| `_hoodie_commit_seqno` string, |
44| `_hoodie_record_key` string, |
45| `_hoodie_partition_path` string, |
46| `_hoodie_file_name` string, |
47| `id` int, |
48| `name` string, |
49| `value` int, |
50| `ts` int) |
51| PARTITIONED BY ( |
52| `dt` string) |
53| ROW FORMAT SERDE |
54| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
55| WITH SERDEPROPERTIES ( |
56| 'hoodie.query.as.ro.table'='false', |
57| 'path'='/tmp/test_hudi_table_df_mor', |
58| 'primaryKey'='id') |
59| STORED AS INPUTFORMAT |
60| 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' |
61| OUTPUTFORMAT |
62| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
63| LOCATION |
64| 'hdfs://cluster1/tmp/test_hudi_table_df_mor' |
65| TBLPROPERTIES ( |
66| 'last_commit_time_sync'='20220629145934', |
67| 'spark.sql.sources.provider'='hudi', |
68| 'spark.sql.sources.schema.numPartCols'='1', |
69| 'spark.sql.sources.schema.numParts'='1', |
70| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
71| 'spark.sql.sources.schema.partCol.0'='dt', |
72| 'transient_lastDdlTime'='1656486059') |
73+----------------------------------------------------+
可以看到_ro
和_rt
有两个区别,一个是hoodie.query.as.ro.table
,另外一个是INPUTFORMAT,对于Hive查询来说,只有INPUTFORMAT有用,hoodie.query.as.ro.table
是Spark查询时用来判断是否为读优化表的,因为MOR表只有一次写入,所以只有parquet文件,没有增量文件.log
,所以两个表查询出来的结构是一样的,后面用Spark SQL示例两者的区别
Spark SQL
Hudi Spark SQL建表,不了解的可以参考:Hudi Spark SQL总结,之所以再提一下Spark SQL建表,是因为我发现他和DF写数据再同步建表有些许差别
COW表
1create table test_hudi_table_cow (
2 id int,
3 name string,
4 price double,
5 ts long,
6 dt string
7) using hudi
8 partitioned by (dt)
9 options (
10 primaryKey = 'id',
11 preCombineField = 'ts',
12 type = 'cow'
13 );
建表完成后,在Hive里查看Hive表的建表语句
1show create table test_hudi_table_cow;
2
3+----------------------------------------------------+
4| createtab_stmt |
5+----------------------------------------------------+
6| CREATE TABLE `test_hudi_table_cow`( |
7| `_hoodie_commit_time` string, |
8| `_hoodie_commit_seqno` string, |
9| `_hoodie_record_key` string, |
10| `_hoodie_partition_path` string, |
11| `_hoodie_file_name` string, |
12| `id` int, |
13| `name` string, |
14| `price` double, |
15| `ts` bigint) |
16| PARTITIONED BY ( |
17| `dt` string) |
18| ROW FORMAT SERDE |
19| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
20| WITH SERDEPROPERTIES ( |
21| 'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_cow', |
22| 'preCombineField'='ts', |
23| 'primaryKey'='id', |
24| 'type'='cow') |
25| STORED AS INPUTFORMAT |
26| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
27| OUTPUTFORMAT |
28| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
29| LOCATION |
30| 'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_cow' |
31| TBLPROPERTIES ( |
32| 'last_commit_time_sync'='20220628152846', |
33| 'spark.sql.create.version'='2.4.5', |
34| 'spark.sql.sources.provider'='hudi', |
35| 'spark.sql.sources.schema.numPartCols'='1', |
36| 'spark.sql.sources.schema.numParts'='1', |
37| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
38| 'spark.sql.sources.schema.partCol.0'='dt', |
39| 'transient_lastDdlTime'='1656401195') |
40+----------------------------------------------------+
我们发现,Spark SQL建的表中没有hoodie.query.as.ro.table
,我看了一下源码发现(上面有提到),Spark查询时
1val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
2 .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
3 .getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
QUERY_TYPE
的默认值为QUERY_TYPE_SNAPSHOT_OPT_VAL
,也就是快照查询,COW只有快照查询也就是默认值没有问题,QUERY_TYPE
有三种类型:QUERY_TYPE_SNAPSHOT_OPT_VAL
, QUERY_TYPE_READ_OPTIMIZED_OPT_VA
L, QUERY_TYPE_INCREMENTAL_OPT_VAL
,分别对应实时查询,读优化查询,增量查询,至于怎么利用Spark实现这些查询,这里不涉及
MOR表
1create table test_hudi_table_mor (
2 id int,
3 name string,
4 price double,
5 ts long,
6 dt string
7) using hudi
8 partitioned by (dt)
9 options (
10 primaryKey = 'id',
11 preCombineField = 'ts',
12 type = 'mor'
13 );
我们用Spark创建MOR表后,show tables看一下发现只有test_hudi_table_mor表,没有对应的_rt
、_ro
表,其实SparkSQL建表的时候还没用到Hive同步工具类HiveSyncTool
,SparkSQL有自己的一套建表逻辑,而只有在写数据时才会用到HiveSyncTool
,这也就是上面讲到的SparkSQL和DF同步建出来的表有差异的原因,接下来我们插入一条数据,来看一下结果
1insert into test_hudi_table_mor values (1,'hudi',10,100,'2021-05-05');
我们发现多了两张表,因为这两张表,是insert 数据然后利用同步工具类HiveSyncTool
创建的表,所以和程序中用DF写数据同步建的表是一样的,区别是内部表和外部表的区别,其实SparkSQL的逻辑如果表路径不等于库路径+表名,那么为外部表,这是合理的,而我们用DF建的表是因为我们程序中指定了内部表的参数,这样我们drop其中一张表就可以删掉数据,而用SparkSQL建的表,其实多了一张表内部表test_hudi_table_mor
,我们可以通过drop这张表来删除数据。
1+----------------------------------------------------+
2| createtab_stmt |
3+----------------------------------------------------+
4| CREATE EXTERNAL TABLE `test_hudi_table_mor_ro`( |
5| `_hoodie_commit_time` string COMMENT '', |
6| `_hoodie_commit_seqno` string COMMENT '', |
7| `_hoodie_record_key` string COMMENT '', |
8| `_hoodie_partition_path` string COMMENT '', |
9| `_hoodie_file_name` string COMMENT '', |
10| `id` int COMMENT '', |
11| `name` string COMMENT '', |
12| `price` double COMMENT '', |
13| `ts` bigint COMMENT '') |
14| PARTITIONED BY ( |
15| `dt` string COMMENT '') |
16| ROW FORMAT SERDE |
17| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
18| WITH SERDEPROPERTIES ( |
19| 'hoodie.query.as.ro.table'='true', |
20| 'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor') |
21| STORED AS INPUTFORMAT |
22| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
23| OUTPUTFORMAT |
24| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
25| LOCATION |
26| 'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor' |
27| TBLPROPERTIES ( |
28| 'last_commit_time_sync'='20220629153816', |
29| 'spark.sql.sources.provider'='hudi', |
30| 'spark.sql.sources.schema.numPartCols'='1', |
31| 'spark.sql.sources.schema.numParts'='1', |
32| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
33| 'spark.sql.sources.schema.partCol.0'='dt', |
34| 'transient_lastDdlTime'='1656488248') |
35+----------------------------------------------------+
36
37+----------------------------------------------------+
38| createtab_stmt |
39+----------------------------------------------------+
40| CREATE EXTERNAL TABLE `test_hudi_table_mor_rt`( |
41| `_hoodie_commit_time` string COMMENT '', |
42| `_hoodie_commit_seqno` string COMMENT '', |
43| `_hoodie_record_key` string COMMENT '', |
44| `_hoodie_partition_path` string COMMENT '', |
45| `_hoodie_file_name` string COMMENT '', |
46| `id` int COMMENT '', |
47| `name` string COMMENT '', |
48| `price` double COMMENT '', |
49| `ts` bigint COMMENT '') |
50| PARTITIONED BY ( |
51| `dt` string COMMENT '') |
52| ROW FORMAT SERDE |
53| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
54| WITH SERDEPROPERTIES ( |
55| 'hoodie.query.as.ro.table'='false', |
56| 'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor') |
57| STORED AS INPUTFORMAT |
58| 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' |
59| OUTPUTFORMAT |
60| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
61| LOCATION |
62| 'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor' |
63| TBLPROPERTIES ( |
64| 'last_commit_time_sync'='20220629153816', |
65| 'spark.sql.sources.provider'='hudi', |
66| 'spark.sql.sources.schema.numPartCols'='1', |
67| 'spark.sql.sources.schema.numParts'='1', |
68| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
69| 'spark.sql.sources.schema.partCol.0'='dt', |
70| 'transient_lastDdlTime'='1656488248') |
71+----------------------------------------------------+
我们再插入一条数据和更新一条数据,目的是为了生成log文件,来看两个表的不同
1insert into test_hudi_table_mor values (2,'hudi',11,110,'2021-05-05');
2update test_hudi_table_mor set name='hudi_update' where id =1;
1select * from test_hudi_table_mor_ro;
2+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
3| _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt |
4+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
5| 20220629153718 | 20220629153718_0_1 | id:1 | dt=2021-05-05 | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-125-7240_20220629153718.parquet | 1 | hudi | 10.0 | 100 | 2021-05-05 |
6| 20220629153803 | 20220629153803_0_2 | id:2 | dt=2021-05-05 | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet | 2 | hudi | 11.0 | 110 | 2021-05-05 |
7+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
1select * from test_hudi_table_mor_rt;
2
3+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
4| _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt |
5+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
6| 20220629153816 | 20220629153816_0_1 | id:1 | dt=2021-05-05 | bc415cdb-2b21-4d09-a3f6-a779357aa819-0 | 1 | hudi_update | 10.0 | 100 | 2021-05-05 |
7| 20220629153803 | 20220629153803_0_2 | id:2 | dt=2021-05-05 | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet | 2 | hudi | 11.0 | 110 | 2021-05-05 |
8+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
我们发现_ro
只能将新插入的查出来,而没有将更新的那条数据查出来,而_rt
是将最新的数据都查出来,我们再插入和更新时看一下存储文件
1hadoop fs -ls hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05
2Found 4 items
3-rw-rw----+ 3 spark hadoop 975 2022-06-29 15:38 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/.bc415cdb-2b21-4d09-a3f6-a779357aa819-0_20220629153803.log.1_0-186-10660
4-rw-rw----+ 3 spark hadoop 93 2022-06-29 15:37 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/.hoodie_partition_metadata
5-rw-rw----+ 3 spark hadoop 435283 2022-06-29 15:37 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-125-7240_20220629153718.parquet
6-rw-rw----+ 3 spark hadoop 434991 2022-06-29 15:38 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet
发现,insert时是生成新的parquet文件,而更新时是生成.log文件,所以_ro
表将新插入的数据也出来了,因为_ro
只能查parquet文件(基本文件)中的数据,而_rt
表可以动态合并最新文件切片的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟),至于MOR表的写入逻辑(什么条件下写增量文件)和合并逻辑(什么情况下合并增量文件为parquet),这里不深入讲解,以后我会单独总结。