查看原文
其他

Trino 在哔哩哔哩湖仓一体化平台中的实践

张明磊 DataFunSummit 2023-12-21

导读 今天给大家带来的分享是 Trino 在 Bilibili 湖仓一体的实践,

聚焦三个方面:

1. 计算引擎

2. 稳定性

3. 容器化
分享嘉宾|张明磊 哔哩哔哩 高级开发工程师

编辑整理|赵超

内容校对|李瑶

出品社区|DataFun

01
计算引擎
1. Trino 在湖仓一体的位置

首先简单介绍一下整体的架构,如下图所示。

  • 数据输入方面,支持实时、离线以及 SDK 形式的接入。

  • 数据存储方面,数据实际存储在 HDFS 上,在 HDFS 上引入数据湖组件 Iceberg 作为 Table format。

  • 左上方的部分是 Trino 在湖仓引擎中的位置,部署在容器化环境中。

  • 右上方部分是位于 Iceberg 表之上的存储服务。比如 Optimize 针对小文件的合并,Sorting 可以对文件内部数据进行排序,OpenAPI 提供了表元信息的展示、直接调用 SDK 写 iceberg 等功能、Index Building 和 Cube Building 可以在  Iceberg 表上创建索引和预计算、Auto Analyze 方便用户透明的使用湖仓一体化引擎,无需关心索引和 Cube 的创建以及其他的优化操作,通过自动优化减少了用户的学习成本。

由于 Trino 在大部分情况下无法满足在毫秒内返回查询结果的需求,所以我们会建议对时延要求比较高的用户使用 ElasticSearch/ClickHouse 等时延小的组件。

2. 我们对 Trino 做的主要工作

  • 第一部分是索引,基本思想是尽可能的只访问查询需要的数据。

  • 第二部分是预计算,主要是针对 Aggregation 和 Join 的优化。

  • 第三部分是优化器和算子,围绕 iceberg 的统计信息来优化某些查询。

1)索引

我们支持的索引包括 Min/Max、BloomFilter、BitMap、BloomRF (Range Filter)、BITMAP、TOKEN-BloomFilter、TOKEN-BitMap 索引。其中 BloomRF 支持范围查询并且可以有效减少存储空间,主要解决 BitMap 索引对于高基数字段存储空间较大的问题。

在日志场景下,我们提供了针对中文分词的 TokenBloomFilter、TokenBitMap 和英文分词的 NgramBloomFilter、NGramBitMap 索引。

值得注意的是,我们的索引分为轻量级和重量级。轻量级索引存储在 Iceberg 的 manifest 文件中,而重量级索引(如 BitMap 索引)存储在单独的索引文件里面,并且我们的索引文件和数据文件是一一对应的。

计算引擎适配

索引文件需要在计算引擎层面做适配进而在查询过程中跳过不需要读的文件,下面讲解如何实现。

Trino 调度时首先会获取 split 并会将索引信息填充到 split 对象中。这样在下发到worker 的时候,split 中就已经包含有索引的相关信息了。在真正读取数据之前,算子会利用 split 的索引信息来判断用户输入的查询表达式是否真正需要读取该 split。如果该 split 需要被读取,则无法跳过这个文件;如果不需要被读取,则直接返回EmptyPageSource。

我们对 Trino Iceberg 相关的 UI 也做了一些增强,并且索引是表级别的,由下图可以看到,使用索引来跳过数据文件分为两种,一种是在 coordinator 端跳过的(轻量级索引,如 Min/Max 索引和 Bloomfilter 索引),一种是在 worker 端跳过的(重量级索引,如 BitMap 索引)。

上图中的左上方的查询在使用 Min/Max 索引过滤文件时跳过了所有的数据文件,即在读取文件阶段时没有对任何文件进行读取。右下方的查询(group_id 与左上方查询不同)中,在 MinMax 索引中无法跳过所有的数据文件,剩下的数据文件继续使用 BitMap 索引或 BloomFilter 索引进行过滤。

2)预计算

若 join 的左右两张表的数据表数量都很大,显然 shuffle join 会对参与表的数据进行 repartition,这样导致的网络开销会很大。另外多字段做 aggregation 计算量也很大。aggregation 一般又分为两个阶段,partial 和 final。若是在 aggregation 的partial 阶段没有预聚合效果的话,那么 partial 阶段就可以被省略。

  • 概念:关联列

关联列是针对 Join 场景做的优化。SSB (Star Schema Benchmark) 场景大部分查询的过滤条件在右表上,dynamic filter 生成左表 join key 的 Min/Max 值区间范围大,此时用 join key 做索引过滤效果较差。

在这种背景下我们引入了关联列。它是一个定义在事实表上的虚拟列且通过关联关系得到。我们可以基于关联列做数据组织优化并创建索引,计算引擎把基于关联列的 filter 下推到事实表进行过滤数据即可适配。

预计算并没有这样去使用关联列,只是使用了关联列的语法定义。对关联列的添加语法如下图所示。

  • 星型模型如何使用预计算

三张表做 join 的原始查询如下,其中 orderdate 和 regionkey 是维度字段。

定义关联列的语句如下

除了定义维度字段,还需要定义聚合值


  • 单表场景如何使用预计算

原始查询如下

定义如下

我们拓展了 Iceberg 中的 Action 操作生成预计算文件

原始查询在集群上运行超过了 SLA 的时间限制。通过预计算处理之后,该查询可以在一秒钟之内跑出结果。

  • 避免存储爆炸

如果为每一种查询均生成 CUBE, 那么很容易造成维度上的存储爆炸。那么是否只生成一次 CUBE 就可以响应不同的查询。比如 A⋈B⋈C 生成的 CUBE 是否可以响应A⋈B 或者 B⋈C 或者 A⋈C。

为了解决这个问题,我们引入 Record Preserved Join(RPJ)这个概念,对于表 A 和表 B,如果 A 的每一条记录都出现在 A⋈B 的结果中,并且没有其他的记录,那么称该 Join 为 Record Preserved Join。即 join 之后,表 A 内的数据在 join 结果中出现且仅出现一次。

以下面的四张表为例:

这四张表的 join 结果如下所示


可以发现左表的数据没有增加也没有减少,那么该 join 就是一个 RPJ。RPJ 对数据是有要求的,那么什么样的数据才能满足 Record Preserved Join?通过对 JoinKey 做限制,这些限制包括了:

  • Unique 约束:右表的 join key 满足唯一性且 join type 为 left join,则为 RPJ。

  • PK_FK 约束:左表的 join key 是来自于右表的主键,则 left join 和 inner join 等价,都是 RPJ。

  • None:不符合 RPJ。

注意,RPJ 是创建 CUBE 的非必要条件。如果 Join 不是 RPJ,则单独为查询创建CUBE 即可,但理论上是存在维度爆炸的风险。比如说,维度字段有10个,则有2的10次方的组合。理论上这么多的 CUBE 会耗费很大的存储空间。

  • 集成 Trino,支持 CUBE 查询

预计算生成 CUBE 的阶段已经把 Join 和针对每个文件 aggregation 的工作完成了,所以在查询阶段就已经不需要额外的 Join 操作。下图是查询改写的过程,我们会对这个逻辑计划树进行 Cube 定义的匹配操作,若是能够匹配上,则将左侧等价地修改为右侧针对 Cube 的查询并会将读取数据文件修改为 Cube 的模式,不再读取原始数据。

要注意到 Cube 的生成是异步的,因此需要支持部分文件没有生成 Cube 的情况。这种情况下,会将执行计划修改为如下图所示。图中右侧是已经生成 Cube,左侧是还未生成 Cube 的原始数据,对这些数据进行 partial aggregation,然后对两侧的查询结果进行 union 操作就得到了最终的结果。

在 Runtime 阶段, 我们需要注意在调度 split 时,要包含 cube 的信息(如文件路径),以保证 worker 的 TableScanOperator 能够从 cubefile 中读取文件,而不是从 datafile 中读取文件。显然,如果预聚合程度很高的话,聚合文件是非常小的。当聚合效果很好时,甚至可以将1GB 的原始数据聚合为1KB 的 cubefile。由于读取的数据量小并且没有进行 join 操作,因此查询性能会非常高。

下图为预计算的性能测试结果。可见预计算场景下,性能会有5到40倍的提升;而在关联列场景下,性能会有1到5倍的提升。

3)如何利用Iceberg的统计信息来优化查询

这里主要有两部分的工作,第一部分是优化 Tirno 的 Min/Max/Count 函数,第二部分是优化 Trino 逻辑计划树上的 Sort 和 TopN 节点。

下图为 Iceberg 的 Manifest 文件存储的统计信息。其中 lowe_bounds 和 upper_bounds 存储了各列的最大和最小值;若文件已经按照某列进行了排序,则sort_order_id 不为0,这时可以用该列来优化 sort 和 topN 节点。

  • 优化 MIN/Max/Count 函数

显然,直接将 datafile 文件读入内存进行聚合是很耗时的。我们可以通过直接读取manifest 文件内保存的 min/max/count 这些基本统计信息来避免 datafile 的读取。

对应地,逻辑计划树就可以由对 datafile 的 tablescan 和 aggregation 操作修改为对 manifest 的读取。

下图三张图表示了优化效果。可以看到,2200亿的数据量内性能提升相当明显。


  • 日志查询

第二个优化是基于日志场景的,一般典型的日志格式为:[2023−08−05T11:40:39] [ip=192.168.1.1], XXX,YYY,ZZZ。包含有时间、IP、日志内容等信息。

下图为一个线上查询,可以看到这个查询通过_timestamp进行了排序。

以7月5号这一天的分区为例,统计信息如下:

Size: 15.0TB

Record Counts: 67,327,459,233

Total File Counts: 49,847

Total Splits: 506,942

针对这样一条查询,会将所有的日志数据载入内存,然后进行排序以获取 topN。即使我们为 timestamp、ip 列创建索引以加速日志查询,有时还不足以能够满足用户期望的延时。尤其当数据量特别大的时候,TopN 算子会进行大量的计算,导致性能低下,那么我们是否可以利用 Iceberg 统计信息对 Sort、TopN 算子进行优化。

我们下推了 Partial Sort。下图中的每一部分都是查询计划树的不同优化阶段。我们在 Add local exchange 之后改写了查询计划树。选择在 Add local exchange 之后的一个显而易见的原因是可以尽可能地利用优化后的查询计划树,在匹配到Exchange 节点之后,我们把 Exchange 的 Source 节点,即 Partial SortNode 节点移除并且只在最终 final 阶段做一次归并排序就可以了。

TopNNode 情况会稍微复杂一些,因为要考虑 SortOrder 方向以及 Null Ordering的次序。有时查询的顺序(比如 DESC)与用户定义 SortOrder 字段时的顺序(比如 ASC)不一致。我们扩展了 Trino 的逻辑计划树,增加了一个 LastNNode 节点,对应的算子是 LastNOperator。如果查询的顺序与定义的顺序一致,那么直接生成 LimitNode 节点即可。

其次是 Null Ordering 的处理, SQL 2011标准对 Null Ordering 的次序没有做出明确的限制,具体的实现取决于不同的数据库软件。以 Iceberg 为例,如果按照 ASC 顺序写入,则默认的 Null Ordering 实现是 Null FIRST。但是,Trino 查询中默认的是 Null LAST。另外,由于 datafile 文件已经排序,所以在某些场景下将数据文件再切分为 split 是一个不需要的优化。比如,前面的例子中,我们查询的是前200行,那么我们只需要读取49847个文件的前200行就可以了。若是为了提高并发将这些文件切分为506942个 split,那就需要读取506942个 split,这显然是没有意义的。

我们支持了读和写的 SortOrder 完全相同或完全相反的场景,而这个场景恰好是我们的日志场景下的需求。我们的日志场景下写模式为 ASC NULL FIRST,读模式为 DESC NULL LAST LIMIT 200。

下图描述了 Partial TopN 下推后的性能对比。

  • 读序和写序不一致的思考

我们希望一种写入的顺序(ASC)能够支持两种不同的读取顺序(ASC,DESC),目前我们确实也支持了这样两种读取顺序。但是,若写入的数据为1、2、3、4、5,而读取时希望读取到4和5,这时,我们会将所有的数据读入,但是抛弃了1、2、3,时间复杂度还是 O(N)。第一个直观的想法是,我们是否可以从文件末尾开始读取。这从操作系统层面讲,由于读取局部性的原因,操作系统会将5后面的数据读取到内存,但实际上你又不需要这部分数据。所以,当读写顺序不一致时,目前LastNOperator 算子的性能提升不明显,只有1到2倍的性能提升。另一个直观的想法是,改造 OrcReader,从内存中逆向读取 row group,时间复杂度就可以从 O(n) 下降到 O(ceiling(count / row group) * row group)。比如,当 row group 的大小为10000行时,读取200条数据的话,只需要读取一个 row group 即可达到目标,而不是读取所有的 row group。

02
稳定性

我们在稳定性优化方面做的工作工作比较多,这里仅介绍一些比较重要的点。

1. 优化器阶段限制

我们通过以下限制增加了查询的稳定性:

  • 禁止 Cross Join。

  • 禁止只有 SortNode 查询。

  • 禁止对 Iceberg 分区表的查询不包含分区字段

2. 透明升级

Yuuni 是一个基于 HTTP 协议的,用于统一访问 Trino/ ClickHouse 集群的代理网关,对所有分析查询请求进行认证、监控、路由、缓存、限流等控制。

若我们有 cluster-cs-01和 cluster-cs-02两个集群,当我们需要对cluster-cs-01集群的 Trino 进行升级时,借用 Yuuni 的代理能力,将cluster-cs-01的查询无缝转移到 cluster-cs-02 上去,待 cluster-cs-01 升级完毕,再将查询切回 cluster-cs-01上来,全部过程对用户透明无感知。

03
容器化

我们 Trino 集群目的有2套部署方式,一套部署在物理机上,另一套部署在容器化环境,长远看来会全部迁移到容器化环境中。

物理机部署的过程是:在代码仓库 gitlab 上构建,手动将 tar 包下载到本地,然后将该 tar 包分发到所有的机器上,最后利用公司的脚本执行平台将该 tar 包启动起来。这个过程的缺点是:回滚困难且全程几乎人肉操作,CPU 利用率比较低。优点是:运维工具多,基于物理机可以随意下载想要的运维工具,排查问题比较方便,操作简单。

容器化一站式部署环境

优点是:

  • 资源池化。物理机上线时可以直接将该物理机加入到资源池中并且可视化CPU和内存的增长。

  • 自定义 CPU 核数和内存大小。

  • 基准核,避免慢节点。公共集群内的物理机型号和配置不同,有的机器 CPU 很快,有的就很慢。同样的 CPU 核数,计算能力可能不同。Trino 在计算层面不可能知道机器的 CPU 能力,而基准核能力可以合理分配 work 的 split 下发。

  • 混合部署。可以调度一些其他的任务,提高 CPU 利用率。

  • 方便回滚。

  • CPU 平均利用率提高到80%以上。

缺点是:

  • 运维工具少。目前的集成不是很充分,很多运维工具不能使用。

  • 排查问题不方便。很多重保业务没有在容器化环境中部署。

  • 需要额外的开发以支持上述功能。

容器化的部署流程如下图所示。代码在 gitlab 上,会自动关联平台进行构建,在选择任意的 tag /分支/commit_id 自动构建后,会自动关联计算平台,在计算平台上选择 CPU 和内存规格后,就直接发布了。而且,回滚也是非常方便的。

以上就是本次分享的内容,谢谢大家。


分享嘉宾

INTRODUCTION


张明磊

哔哩哔哩

高级开发工程师

专注于超大规模分布式系统的研发与应用实践, 之前在阿里云数据库OLAP团队参与ADB产品的研发, 现在在哔哩哔哩从事交互式分析产品的研发。

继续滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存