查看原文
其他

华为云基于Apache Hudi实时数据湖的查询优化

孟涛 ApacheHudi 2023-07-28


导读 本文将介绍华为云在实时数据湖查询方面的优化以及一些建议。主要内容包括四大部分:


1. 华为云数据湖介绍

2. Hudi查询能力介绍

3. 华为云基于Hudi的性能优化

4. 未来规划

分享嘉宾|孟涛 华为 高级工程师

编辑整理|唐洪超 hotata

出品社区|DataFun


01

华为云数据湖介绍

首先对华为云数据湖进行一下简要的介绍。

华为云数据湖一直在不断演进。从上图中可以看到,华为云底层是HDFS以及华为云自己的OBS。历史数据批量入库,采用 CDM 方式,通过DGC调度把数据导入到Hudi里面,当贴源表使用。增量任务通过CDL 去实时抓取数据源的binlog日志,实时同步到贴源层。从贴源层到汇总层,再到集市层,都是以Hudi表为基准。对于增量ETL,我们提供了以Flink SQL 为基础的流式计算能力。对于批量计算,我们提供了 Spark 以及 SparkSQL两种方式。对外暴露查询接口采用的是HetuEngine,它基于社区的PrestoDB做了一些增强。本文中将着重介绍在交互式分析方面,我们做了哪些增强。Hudi本身属于Hadoop体系,因此Hetu或者Presto对接Hudi有着一些天然的优势。我们希望尝试优化HetuEngine,以达到一个专业 MPP 数据库的水平。

02

Hudi查询能力介绍

这一部分介绍Hudi开源社区已经具备的一些查询能力。

1. Hudi介绍

我们之所以选择 Hudi作为数据湖的一个基准,是因为Hudi每年都在不断地变化,功能越来越强大,越来越丰富。最开始Hudi仅仅把自己定义成一个数据文件组织层,包装ADF、包装 pipeline文件,对外提供一些仓的能力、ACID的能力。但是随着社区的慢慢发展,它现在已经逐渐演化成一个流批一体的真正的 server 形态了。
Hudi提供流式挖掘、增量查询,以及丰富的仓的能力,比如高效的更新删除能力,可插拔的索引。还有关系型数据库常见的事务、MVCC、并发管理以及 schema evolution和time travel等能力。Hudi还可以帮我们去解决一些比较麻烦的小文件合并的问题。并且提供了clustering 等一系列功能去使数据的布局更加紧凑,从而提升查询效率。Hudi还有着丰富的生态支持,支持Flink/Spark 写入,以及Presto、Hive、Spark、Flink以及Doris的查询。可见Hudi具备丰富的流处理能力和仓的能力。所以我们选择了Hudi作为实时数仓的基础。

2. Clustering

Hudi在查询方面的优化,主要有两点,一点是clustering,另一点是MDT。首先来看一下clustering。 Clustering从字面翻译是聚合的意思,它是一种数据布局优化。数据在写入表中时,如果按照某些规则做一些聚合性的处理,就可以在查询的时候达到比较好的效果。比如最简单的就是order,我们直接对表的某一列做排序,最终存入的数据是有序的,我们在查询的时候就可以利用每个存储文件的Min-max信息对文件进行裁剪,大幅度减少扫描数据的量,从而提升查询性能。上图中给出了一个简单的例子,如果对数据a列做排序,假如生成了 3 个文件,我们会发现每个parquet文件的minValue_a和maxValue_a都是错开的,不会有交叉。比如a<10,只可能存在 parquet1 里面,这样查询时就可以忽略掉另外两个文件,查询数据量就少了66%。对性能提升有很大帮助。另外在0.10版本,Hudi有z-order和hilbert这种高阶的聚类算法的加入,引入这类算法主要是为了解决sort无法多列排序的问题。如果只排一列,sort排得很均匀,每个文件的 Min/Max 是不交叉的;但如果是排多列,sort只会按第一列来排,后面那几列就随机了。所以空间曲线z-order/hilbert就相当于一个加权,最终排序的结果是整体有序的,在多维查询的效果是比较好。在实际生产环境中,hilbert的聚合效果要比z-order好,当然其构建成本也会相对较大,在选择多列排序的时候,可以优先选择hilbert算法,当然如果只有单列还是建议直接用 sort 的方式。综上,Clustering的目的就是为了使数据具有更好的聚合性,从而在查询时过滤掉无用数据,提升查询性能。

3.MDT

Hudi第二大查询优化是MDT。MDT 是Hudi的一个内表,对用户是不暴露的,Hudi自己维护管理。它是一张hudi mor分区表,里面有三个分区,第一个分区放的是文件和分区信息,第二个分区放的是列统计信息,第三个分区放的是主键的 bloomfilter信息。当然统计信息和bloomfilter信息这两个分区只有当开启配置选项的时候才会产生,默认只会产生文件和分区信息。首先来看一下文件与分区信息,一个查询引擎,要查一张大表的时候,首先肯定要确定要扫描哪些文件,从文件系统中list出要读的文件。扫描的过程对文件系统的list性能有一定压力,HDFS list 性能相对来说还是比较高的;但是对于对象存储,list 性能是灾难性的。如图中社区的一张对比图,从图中可以看到使用MDT的性能要远远超过直接在S3 上做list。所以 MDT的第一个分区,主要是解决构建扫描文件过程中开销过大的问题。表越大,S3 的 list 性能越差,而MDT的优势就越明显。第二,列统计信息。主要是配合上面讲过的clustering。Clustering一般要指定排序 order 或者是z-order。经过排序之后,每个文件的 Min/Max值是不会冲突的,或者冲突很小。我们把每个文件的统计信息存起来,执行引擎可以利用列统计信息,去裁掉一些不需要的文件。第三,bloomfilter,当前社区主要是用它来做快速的更新,而非查询,而且只有主键的bloomfilter。这里不做详细探讨,因为bloomfilter的表存储很大,把它加载进内存中,耗时还是比较长的。以上就是当前Hudi社区在查询方面所做的优化。

03

华为云基于Hudi的性能优化

这一章节将介绍华为云基于开源Hudi所做的性能优化。1. Hudi索引优化索引是为了加快检索速度而创建的一种存储结构,是一种空间换时间的设计思想,作用可以理解为书的目录,通过目录我们可以快速检索到需要的内容。常见的索引类型有数据索引、二级索引、前缀索引等。数据索引,比如对数据做分区,或者是sort、z-order等,这些操作不会产生额外的存储信息,不会产生索引文件。但是由于做了排序或者分区,可以去做分区裁剪,或者是文件级别的FileSkipping,从而提升查询效率。Lucene和bitmap 这种二级索引对数据排序是没要求的,可以在任意表上去构建。但它们是典型的空间换时间的思想,会产生比较大的索引,在查询的时候由查询引擎去加载这些索引,再去做相应的文件裁剪。不同索引都有各自的优缺点,引入这些索引可以提升查询效果。但并不是某一种索引就可以适用各种场景,不同索引适用于不同的场景。接下来将具体介绍数据索引Min-max、Lucene索引和Bitmap索引,并对比各种索引的优缺点,给出使用建议。2. 基于MDT的Min-max索引

首先来解释数据索引Min-max。前文提到 MDT的第二个分区里面存的是每个文件的Min-max索引。数据通过 Spark /Flink 入库,且开启Hudi的MDT时,会自动在Hudi表里面去生成一个MDT内表,这里面放了一个分区,即Col_stat_index,存放的是每个文件对于某一列/某几列的Min-max值。查询的时候Hetu或者Spark 会加载MDT的列统计信息对文件做裁剪。Col_stat_index加载到内存汇总是有一定耗时的,为了加快MDT数据的加载,所以社区针对这一过程设计了一个非常巧妙的方法,用Hfile去保存这些统计信息,而Hfile 具有非常高效的KV检查能力,当需要查询某个文件的统计信息时,可以直接查询对应的Hfile,基本可以在几十毫秒内返回。这比直接把数据全加载到内存里面要省时省力不少。需要再次强调的是,Min-max索引有效的前提是:要么在入库前数据已经按照常用条件排序完成;要么就是入库后采用Clustering指定排序列。只有这样使得数据有序,Min-max值才有意义,查询的时候才能更好地去裁剪数据。

3. Lucene

接下来介绍二级索引中的lucene。Lucene是apache的一个开源搜索工具,它的检索效率很高,solr和 es都是基于此开发的。Lucene具有非常强大的倒排索引能力,可以赋予Hudi更高效的多维查询和文件检索能力。Lucene强大的检索能力来源于倒排。如上图中所示的例子,首先是正向索引,我们将文档ID作为每个文档的唯一标识。接着用lucene倒排,lucene会对字段内容做分词, 这里会把西安、大唐不夜城和回民街这三个词分开;然后记录每个分词和其所在的文档的映射关系。西安对应的文档ID 是 1和 2,也就是在文档 1 和2 中出现过西安这个词。当要查询西安时,就要扫描文档1 和 2;要查大唐不夜城,就只需要扫描文档1就行了。有了基本概念,我们来看一下如何将其引入到大数据中。其实文档ID可以按照业务的实际情况去做定义,上述案例是定义成文档,如果把它定义成行号,那么第一行的内容就是西安大唐不夜城,第二行的内容就是西安回民街。下面来看一下这样定义的好处。如上图中右侧的例子所示,假如一张表某一列,值是1、9、4、3、8,分布在10行中,通过倒排之后,会产生如上图的一个结果。举个例子1这个值,lucene会精确的告诉你这个值出现在表的1,6,7行。有了这种列值和行号给映射关系,查询过程中我们可以快速跳到目标行,把数据检索出来。

前面讲了lucene非常强,但它也有着一些缺点。其实在大数据领域,二级索引相对来说用得还是比较少的。我们来看一下构建lucene需要注意的点。
第一点,lucene不能根据全表去构建,前文提到是根据行号去构建的,但实际在真正构建的过程中,不能拿全表去做构建。而是应该按文件级别,选择每个文件自己的行号作为它的 Doc ID,生成文件级别的lucene索引。全表级别生成行号是不现实的,因为表数据在持续地写入,之前的行号就会发生变化。第二点,lucene索引构建是一个比较耗时的任务,我们选用异步构建的方式,异步构建需要防止构建任务阻塞入库流程。当然,天级别大任务可以选择同步构建。Lucene 并不是完美的,第一个缺陷就是会生成很多小文件,如上图所示,每个文件构建lucene索引的时候,都会产生13 个文件,也就意味着文件数量膨胀了 13 倍。这对HDFS是不可接受的,小表还能这样做,大表的话HDFS基本就被打爆了。第二个缺陷是,lucene对 string 类型可以做分词,string 越长,产生的分词数就越多,这样产生的索引文件就会很大,甚至构建出来的 lucene索引文件比整个表都大。对于数值类型,它的lucene索引文件比较小,可以直接放在内存中,加快查询。综上所述,这就是lucene,它很好但并不完美,在实际应用中需要配合一些解决方案。后文中会介绍我们是如何解决这个问题的。

4. bitmap

Bitmap,使用bit数组存储,可以很好的节省存储空间。同样的,我们还是选择用行号映射到bitmap的对应的bit位。如图,对age列构建bitmap,该表一共有5行,因此对应bitmap 有5位,21 出现在第1行。所以 bitmap 第0位是1,22出现在第二行和第五行,所以 22 对应的bitmap, 1 bit和4 bit位设成为1。同理,每一个age值,都需要去构建一个bitmap。这里我们发现一个问题,如果一个表里面 age 的值很多,几十万个,这就意味着要构建几十万的bitmap,它的大小也是很可观的,把它加载到内存中是不现实的。当然一旦索引构建完之后,我们就可以利用索引去裁剪了。假如要查询age 等于21的数据,就直接把21所对应的 bitmap拿出来,发现存在第0行,那么扫数据的时候扫到第0行就直接返回了,没必要再往下扫了。如果要查age等于20,即查一些不存在的bitmap,可以直接将整个文件skip掉,如果文件级别skip不掉,后续文件内可做rowGroup/page级别的skipping。

Bitmap索引的创建执行也是通过 commit 去执行的。构建bitmap其实跟构建 lucene是一样的。需要注意的点包括:第一点也是要选择文件级别去构建,因为 bitmap 也是以行号为基准的。凡是以行号为基准的,从理论上来说,都不应该以全表为基准去构建,因为全表的行号是不固定的。第二点,bitmap构建也是比较耗时的,可以采用异步的方式,防止阻塞入库流程。第三点是bitmap占用空间是很大的。范围查询时不太友好。在刚才的例子中,如果要查询 age 大于1,只能把四个 bitmap 全部取出做并集,才知道数据是否有存在,查询范围越大,拉取的 bitmap 越多,就会导致查 bitmap 的时间比最终查询的时间都要多。要解决这个问题,可以通过bit-Slice Range encoded Bitmap 的方式构建bitmap,去压缩一下存储。这种方式能极大地减少 bitmap 所占用的空间。第四点就是bitmap入参需要是整数的,对string、double、float这些类型,没办法直接塞到 bitmap ,只能先把这些值设成字典。

下面介绍一下Hudi到底是如何构建索引的。首先Hudi会引入一个新的 timeline 类型。整个二级索引的生成是惰性的,不会去占用真正的入库时间,拖累入库效率。首先它会生成一个 index.request,主要是去扫一下全表,了解到底有哪些文件需要去构建索引。Inflight状态表示索引计划正在执行。Commit表示最终索引数据已经提交完成了,也就意味着数据索引已经可用了。整个流程按照先调度后执行的异步模式去执行。如上图示例。当然示例中有一个index server,不过没有index server也一样,Hudi内部会帮你去做的。我们在 T1,T2 时刻插入数据,T3时插入了index,告诉Hudi表,要生成index。这个时候 C3-index会产生一个request并 push到 index server 上,index server对其进行调度执行,执行过程中会把C3-index状态变成inflight,最后执行完后把C3-index变成可用状态,整个过程不会影响表实际运行。

再来讲一下二级索引的集成。前文中Min-max 集成,裁剪是在coordinator上去做的,因为min-max索引很小,几万个文件对应的索引最多就十几兆,完全可以加载到内存,但是lucene和bitmap占用的空间过大,不可能加载到coordinator上或者 Spark 的 driver上,所以我们采取的方式是,二级索引的裁剪工作交给work 去做。coordinator第一步min-max裁剪之后生成splits下发到 work上;每个work 拉取自己对应的文件级别的lucene和bitmap索引去做data skipping。我们采用了 CK 的SSB数据集来验证效果,数据规模是 1. 5 TB 120亿条数据。性能比原生的方式可以提升3倍到 11倍。

5. 各种索引对比和使用建议

使用sort 或者 z-order代价是比较大的,因为入库的时候需要排序,执行效率会比较低。Bitmap与lucene类似, lucene的能力更强大一点。它们的缺点也非常明显,索引太大了,需要想办法缓存这些索引数据。bloomfilter,只适合等值查询,而且有假阳性问题无法解决,自身存储也比较大。6. 统计信息优化

还有一个优化点是统计信息优化,主要是为CBO服务。在用 Presto去跑 TPCS 的时候发现Hudi性能下降了百分之几百。经过分析Presto on hudi不支持统计信息优化,导致性能下降严重。统计信息对于多表join是非常重要的。所以我们对 MDT 做了一个加强,给MDT 再加一个分区存放统计信息,这样在引擎查询的时候,hudi会把统计信息加载上去,提供给Presto或者Spark 的优化器去做 CBO优化。这么做的一个好处就是Hudi的实时性较高,收集统计信息可以由服务来托管,不需要用户自己收集维护,大大加强了易用性。7.查询瓶颈分析+索引缓存

最后介绍一下查询性能的瓶颈分析以及索引缓存。当我们开启MDT的时候,查询引擎第一步是要 list file, list file后要裁文件。裁完文件之后下推到work端或者是executor端去扫数据,扫数据的过程中又涉及到如何跳块,最终就是要尽可能减少扫描的数据量。这里就出现了第一个瓶颈点,即使我们有 MDT,能尽可能的减少要扫描的数据量,但是 MDT 并不是完美的,因为MDT 的大小会随着表的增涨而增涨。当表中文件数量过万后,单次访问MDT 的还是比较耗时,大概有几百毫秒,这在秒级查询的场景下,是难以接受的。第二点,在裁剪文件的过程中,需要利用一些索引信息,把索引信息存在 MDT 里面,索引信息越大加载耗时越长。有时候甚至加载耗时十几秒,这样整个索引信息毫无意义。第三点就是MDT查询每次都是冷启动,实际上你会发现查询引擎相同的查询语句会越查越快,因为查询引擎都会做一些缓存,所以Hudi MDT 也是需要缓存的。第四点,在读取parquet数据的时候,第一步要读它的元数据信息,读取元数据信息也是比较耗时的,我们可以把parquet文件元数据信息也缓存起来,可以进一步提升效果。

上图是一个基于Spark的缓存图。Spark 增量数据入库任务和Spark历史数据入库任务,Spark历史数据入库任务通过批量入库的形式写入Hudi表,写入之后会去刷新索引文件,同时刷新索引缓存,Spark增量数据入库也会做同样的操作。整个索引缓存缓存的内容就是前文提到的MDT、统计信息、索引信息,parquet元数据信息。同样的executor端也会为了加快查询做一些预热,提前加载好parquet文件的元数据信息。通过多种缓存机制以及查询预热等方法,我们可以达到日增几十 TB 的Hudi表多维查询耗时稳定在1 秒到 2 秒,99% 以上的查询能在 1 秒内。

04

未来规划

最后,简要介绍一下后续工作。第一点还是一些缓存的问题,我们要考虑怎么去缓存热点数据文件,而不仅仅是缓存索引。第二点我们要去构建实时物化视图,动态收集用户的查询条件,入库时实时构建物化视图。第三点,我们会尝试去优化一下 Mor表的读性能。可能会参考 DeltaLake中一个比较新颖的方式叫delete vectors,平衡Hudi读写开销。因为当前的Mor表索引是没法直接用上的,即使Min-max化索引对它也是无效的,因为log基本是无序的。以上就是本次分享的内容,谢谢大家。推荐阅读华为基于Hudi构建的实时数据湖架构与实践
Apache Hudi 1.x 版本重磅功能展望与讨论
Zoom 基于Apache Hudi 的流式日志处理实践
提升 Apache Hudi  Upsert 性能的三个建议
日增数据超10PB!揭秘沃尔玛Lakehouse架构选型之路


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存