导读 近年来,湖仓一体化在业界备受瞩目,不论是单点优化还是整体迁移,都取得了显著成果。本文将介绍腾讯游戏数据分析的湖仓一体化实践。(本文整理自 2023 年 7 月 21 日 DataFunCon2023(北京站)(线下)周威老师关于《腾讯游戏数据分析的湖仓一体化实践》的演讲,文章内容均切合于当时时间点)
主要包含以下内容:1. 项目背景
2. 存算分离
3. 数据分层
4. 湖仓一体化
5. 性能优化
6. 效果展示和后续规划
7. 问答环节
分享嘉宾|周威 腾讯游戏 腾讯游戏分析引擎Tech Lead
编辑整理|刘明城
内容校对|李瑶
出品社区|DataFun
01
1. 项目背景
腾讯游戏公共数据平台部在开始制定新数据平台方案前,是基于已经运行了大约十年的原始架构,是一个典型的 Lambda 架构,其中涉及众多组件,流程复杂。典型的 Lambda 架构下主要存在三方面问题:- 数据平台架构复杂,需要维护两个数据处理流程,数据一致性难以保证。
- 离线计算会产生大量的中间结果表,造成数据急速膨胀,加大服务器存储压力。
- 使用结果表展示数据,灵活性比较差,如果需要对数据进行上卷和下钻,则需要单独开发程序,计算新的结果表。
为了解决上述问题,我们考虑在同时兼顾效率与成本的情况下采用新的开发模式或架构,实现以下几个目标:- 简化架构,合并实时离线两条处理流程,减少组件数量。
- 我们需要一个更灵活的方案,能够在性能方面提升,并且在资源管理、计算和存储方面具备更弹性的特点,从而降低成本。
- 使用同一套架构,通过不同的组件和配置支持不同场景的数据分析服务。
2. 平台架构
为了实现上述目标,我们对业界的数据产品和组件进行了一系列调研。最终,选择了基于 StarRocks 的方案来构建我们的湖仓一体化产品。在新的架构中,StarRocks 分析引擎是核心组件。结合了 Iceberg 数据湖和腾讯云的对象存储COS 来实现。实时数据通过 Kafka 和 Routine load 方式导入,离线数据则通过 Spark load 方式从 Hive 表中导入。除此之外,还通过这样一个统一的方案,实现了数据科学的 Spark 任务、在线 BI 报表以及 ETL 任务的支持。
存算分离
1. 弹性计算节点
在转向新方案时,原 StarRocks 的架构分为两层,上层是 FE 节点,下层是 BE 节点。FE 节点的主要作用是生成查询计划和管理原始数据,而具体的执行由下方的 BE 节点完成。然而,BE 节点不仅执行计算任务,还负责存储。BE 节点存算一体结构,会导致存储和计算资源不匹配的问题。在很多情况下,集群中存储的数据量并不大,但计算任务却非常繁重,提升计算能力只能通过扩充机器,但这会导致 BE 节点存储的浪费。另一个问题是,由于 BE 需要负责存储,扩展机器后的均衡和搬迁操作较慢,且在均衡时会消耗性能,影响查询性能。为了解决这些问题,我们与社区合作提出了一个方案:将 BE 的计算能力分离,建立 CN 节点(Computer Node)。将 BE 的存储引擎替换为新组件,并放置在 CN 节点上。在调度过程中,通过 exchange 算子将数据从 BE 拉到 CN 上本地进行计算,实现了加速或扩展集群计算资源的目的。
2. 隔离与弹性
将 BE 的计算能力分离,建立 CN 节点(Computer Node)时,可以看到 CN 已经演变成一个无状态的节点。为了解决 CN 的扩容和弹性需求,我们充分利用了 K8s 的特性,并开发了一个 K8s 的 operator。通过将 CN 运行在容器中,能够利用 K8s 的 HPA 机制动态监控 CN 的负载,并自动执行扩容和缩容操作。此外,我们在 CN 上引入了一个属性,实现了 CN 的分组能力。通过将不同的业务负载分配到不同的 CN 组中运行,实现了不同业务之间的隔离。同时,可以根据不同的 CN 组制定不同的弹性任务策略,例如在白天高负载时扩容,在晚上低负载时缩容。通过这种方式,能够灵活地进行计算资源的弹性伸缩,不论是大规模还是小规模的计算都可以在同一个集群中满足。
03
数据分层
1. 数据下沉
除了计算部分,我们还需要解决存储方面的弹性需求。目前我们的 StarRocks 集群使用 SSD 存储,为了高可用性,我们的所有数据都存储了三份副本。然而,由于游戏周期较长,远古时期的数据很少被访问,如五年前或三年前的数据。将这些数据直接存储在本地成本很高。为了解决这一问题,我们考虑与数据湖结合,将数据存储到数据湖中。为此我们提出了两种下沉到数据湖的方案:- 实时流+离线数据先导入 BE 本地存储;FE 按照“时间分区”定时生成下沉任务(export 命令),BE 执行数据下沉到数据湖的操作。
- 实时流数据同时写入 BE 和 Iceberg 数据湖(实现 IcebergTableSink);定期对实时入湖的数据进行 compact,排序并合并来提升访问性能。
2. 统一查询
在查询数据时也面临一些问题。在我们的游戏场景中,业务通常是隔离的,因此我们为每个业务建立了一个集群。在数据分析时会遇到数据需要共同处理的情况,由于这些数据存在于不同的 StarRocks 集群中,无法进行关联查询。为了解决这个问题,我们通过数据湖的方式将这些数据联动起来,实现下沉后进行分析。由于我们使用 Iceberg,并使用相同的 MetaStore 进行元数据管理,因此所有集群都能直接查看这些下沉后的数据,实现关联分析。
3. Agg 下推优化
在对湖仓数据进行合并测试时,发现其性能存在问题。我们进行了分析,并查看了执行计划。原先的执行计划比较简单,它首先读取 BE 表的数据,再读取 CN 表的数据,接着对这些数据进行联合运算,最终得出结果。然而,如果只需要进行简单的 count 操作,按照原先的执行计划执行的话,实际上会读取符合条件的所有数据并将其传输到另一个 CN 节点上,在那个节点上进行 count 计算,然后再返回结果。显然,这个过程中传输了大量不必要的数据,实际上我们只需要传输 count 的结果就可以。为了解决这个问题,我们可以避免将原始数据传输 shuffle,而是在原地进行 count 计算,然后再进行 sum 操作。基于这样的思想,借助于 StarRocks 具备的一个多阶段聚合的特性。我们可以将第一阶段的聚合下推到 union 操作的下面,如此一来数据传输量就会大大减少。通过这种计算拆分的方式,数据的读取性能提升了约六倍。
4. 批量数据读取
在我们的场景下,还涉及到一些数据科学的任务,即运行 Spark 任务并计算出一个模型。在数据科学的任务中,通常需要读取最近一个月甚至一年的所有明细数据。然而,使用 JDBC 的方式显然效率很低,可能会导致集群崩溃或者非常慢。为解决这个问题,StarRocks 社区提供了一种解决方案,即使用 Spark 来读取数据。然而,社区方案只能读取存在 BE 的数据。为了实现查询的统一,我们对StarRocks 原生的 Spark connector 和 Iceberg Spark connector 进行了融合,并在上面实现了数据的路由。通过原始数据的信息,可以确定数据实际上存储在BE 上,然后直接从 BE 读取数据。如果发现数据存储在湖中,就从对象存储中读取数据,然后直接返回。
湖仓一体化
1. DDL 一体化
在完成存算分离和数据分层这两步,我们已经实现了任务计算和存储的弹性。然而,要更全面地利用它们,仅有这两项弹性还不够,我们需要考虑更简洁、更易用的方面。例如,StarRocks 对数据湖的原生支持只涉及读取,但在我们的场景中,有时会通过运行 etl 任务生成数据并将其写入数据湖。对于大型数据,人们希望能够直接将临时数据写入数据湖而不是下沉的方式,然后在 StarRocks 上使用。然而,StarRocks 原生不支持建立 Iceberg 表,这就导致了体验不连贯,同时 ETL 任务可能无法自动化。因此,我们需要实现建表功能。理论上创建 Iceberg 表只需要获取两种信息:MetaStore 的访问信息,用于获取原始数据信息和对象存储路径,以及对象存储的权限信息。一旦有了这两种信息,就可以在 StarRocks 中创建表,我们将这些信息都存储在 Resource 对象中。通过这种方式,用户在编写脚本时只需指定 Resource 对象,就可以创建表。此外,在执行其他 DDL 语句时,我们也将这些信息同步到 Iceberg 中。
2. Delete 一体化
在实际操作中,我们经常会遇到数据的补录或修正的场景。例如,用户可能发现半年前的数据存在错误,需要进行修正。在这种情况下,我们的处理逻辑很简单,即删除数据并重新上传。然而,StarRocks 之前并不支持 Iceberg 数据删除操作。Iceberg 的数据删除有两种方法:第一种方法是 Copy on Write。这种方法需要先读取所有数据,然后删除不需要的数据,生成新的数据并存储。虽然这种方案实现简单,读写性能较好,但写入速度较慢,因为需要一次全量读取和写入。另一种方案是 Merge on Read,即在读取数据后标记不需要的数据,并将删除文件写入,需要对读取部分做改造以过滤不需要的数据。考虑到补录业务通常是紧急需求,我们选择了 Merge on Read 方案。然而,Merge on Read 方案对查询性能并不友好,因为每次查询都需要过滤一遍删除文件。我们借助了 compact 任务,定期合并数据来提升查询性能。
3. 统一导入
在大规模数据导入的场景中,数据基于 Hive 架构的存储,需要迁移至另一存储地点,且数据量庞大。过去使用的 steam load 方案存在问题,需要在 BE 节点进行序列化、反序列化、数据分发、任务提交和压缩。而且单次 steam load 数据量有限,需要多次提交,给 BE 节点带来巨大压力。针对此问题,社区提出了 Spark load 方案。该方案在数据读取后,使用 Spark 进行数据重组,根据 BE 节点所需的 StarRocks 数据格式进行分区、统计和排序,存储到外部存储中,然后请求 BE 节点将外部存储数据一次性导入。此流程能减少多余步骤,BE 节点只需执行一次压缩就可导入数据。结合湖仓一体场景,可考虑将外部存储更换成数据湖,直接将数据写入对象组中。具体来说,Spark 任务完成数据构建后,将数据写入湖仓。若数据为热数据,即一年内使用的数据,可向 BE 请求将数据加载回本地;若数据为冷数据,则可直接使用。通过此方式简化数据导入,减少对负载的影响。
性能优化
1. 总体视图
我们在实践中发现,读取数据湖近 90% 甚至更多的时间都花费在 IO 操作上,也就是将数据从湖中读取出来的过程。因此,优化数据湖的方案主要聚焦于减少数据读取量。从湖中直接读取数据文件时,根据用户的 where 条件,使用条件进行排序和重组数据,还会调整文件的大小。另外,压缩算法也有一定作用。不同的数据可使用不同的压缩算法,其压缩比也会有所不同。当然,这些参数需要结合数据格式进行验证才能得到合理的结果。
2. 数据湖元数据访问优化
此外,执行计划的生成也是一个重要的方面,执行计划的生成包括语法解析、语义解析、优化器、逻辑计划和物理计划。在 CBO 优化器层面,为了获取数据的统计信息,通常至少需要一个最大值和一个最小值进行筛选。在一个典型的场景中,假设一天会产生一万个数据文件,而表有 400 列,这个是一个比较平均的表大小。经简单计算,每天都要做 800 万次的最小值比较,这个代价是相当高的,其实没有必要每次都这样做。所以我们进行了优化,将所有的统计数据先计算出来,放到内存中,下次取数据时直接从缓存中获取结果。通过这种方式,可以减少统计信息计算的耗时。
优化物理执行和物理计划时,需要从 Iceberg 获取所有的 data file 文件列表,在获取 metadata.json 之后通过 MetaStore 获取所有 snapshot,再获取 manifest 文件。除了第一个步骤需要通过数据库读取外,其余文件都需要从对象存储读取。由于数据量大,因此读取过程非常耗时。最初的想法是将所有内容存入内存,每次读取时直接从内存中取出,但测试结果发现内存无法存储如此庞大的数据,有可能高达上百 GB。因此,我们将数据持久化到本地存储,以防止因 FE 重启而丢失数据。此外,我们还采取了一些查询优化手段,主要集中在减少文件数量和大小方面。
效果展示和后续规划
1. 某游戏业务效果展示
因为我们的游戏都是相对独立的,所以每个游戏都有自己的数据集群。例如,某头部游戏业务,数据部分涉及大约 300 多个 BE 节点,以及 100 多个 CN 节点。因此,该游戏的本地数据量大约为 2PB,同时数据湖中的数据量约为 3PB。该业务每天会新增大约 50TB 的数据,单次查询的总量约为 3 万条,同时支持 200 个并发查询,查询 P90 的耗时为 2 秒。
2. 后续规划
下面是后续规划。尽管我们已经对数据湖进行了一些优化,但仍然有一些业务抱怨速度太慢,尤其是对于某些活动数据的频繁查询,比如游戏中的活动,可能会关注去年或前年的活动数据。因此,我们希望通过物化视图的能力,将这部分数据临时缓存到本地。另外,我们对原始数据进行了优化,但解析过程仍然耗时,因为所有解析都在 FE 上进行,而 FE 是一个单点,且只有一台机器,会有性能上限。我们计划将原始数据解析后的结果存储到 OLAP 表中。这样可以通过 BE 或 CN 的引擎能力提升筛选速度,减少序列化反序列化的时间。另外,我们还计划使用 CN 集群做预构建导入,并减少 compact 消耗,以提升大批量数据导入速度。
07
Q1:我注意到您之前提到在 StarRocks 上实现了存算分离,并引入了 CN 节点。对于社区来说,您认为这种设计有什么贡献意义?社区如何看待这个方案?A1:这个方案已经融入到社区,并可以在 2.4 以上版本使用,因此它已成为社区的标准特性。Q2:关于数据湖,在您的场景中,数据存储在对象存储上,您能详细介绍一下“对象存储”在数据库方面的优劣势吗?您是否与其他存储进行过比较?A2:对象存储的好处在于它是一个通用且云原生的方案。对于我们的业务来说,在腾讯内部是相对特殊的,因为我们有很多海外场景,而腾讯的其他部门可能没有这种需求。因此,我们更关注我们的架构是否能够在海外或公有云环境中部署。由于很多情况下我们的游戏是代理游戏,海外的合作伙伴倾向于使用亚马逊或谷歌的云服务。因此,我们需要考虑一些通用性,选择对象存储作为我们的云布局方案。当然,使用对象存储可能会有一些劣势,比如在性能方面,由于数据量较大,可能会存在带宽瓶颈。为了解决这个问题,我们与腾讯云合作,他们提供了一些加速方案来提升访问性能。Q3:如果使用 StarRocks 的湖仓一体化架构,是否更加高效?为什么选择现在的架构而不是直接使用 StarRocks 整体化架构?A3:这个问题的关键是时间因素。我们的架构方案早在 StarRocks 3.0 的湖仓一体化方案提出之前就已经开始了。所以社区的方案也借鉴了我们的一些早期经验,比如 CN 节点就是我们和社区共同开发的。另外一个考虑是在我们公司中,我们更倾向于使用存储格式更加开放的方案。因此,我们选择了一个相对较开放的Iceberg 的数据格式来进行实现我们的架构。Q4:MetaStore 未来会集成在 StarRocks 上,还是会成为独立组件?A4:首先,MetaStore 肯定是一个独立的组件,不会集成到 StarRocks 中。因为这个组件比较独立,整个数据湖环境是一个独立的架构,我们会有一个团队来维护数据湖,并与 OLAP 团队合作。因此,我们不会将这个组件完全集成到 StarRocks 中。但我们实现了这种同步机制,当在 StarRocks 中做了更改时,会自动同步到 MetaStore,包括建表和 DDL 操作都会被同步。
分享嘉宾
INTRODUCTION
周威
腾讯游戏
腾讯游戏分析引擎 Tech Lead
担任腾讯游戏分析引擎团队 Tech Lead,负责游戏数据分析引擎技术研发和规划工作;近期主要从事数据分析引擎的研发以及数仓一体化架构在腾讯游戏数据分析场景下的落地;在开源社区有诸多贡献,StarRocks 开源社区 Committer;同时在存储计算底层技术领域,如 MySQL,Ceph,K8s,Linux Kernel 等领域均有丰富的研发和运营经验。