手把手教你使用 Arctic 自动优化 Apache Iceberg
Arctic 是一个开放式架构下的湖仓管理系统,在开放的数据湖格式之上, 提供更多面向流和更新场景的优化,以及一套可插拔的数据自优化机制和管理服务。
背景
Apache Iceberg 作为一种面向超大型数据集的开放表格式,受到了越来越多的关注,它在设计抽象、引擎兼容性和性能优化等方面备受开源开发者和企业用户认可。如今,越来越多的企业将 Iceberg 应用于数据湖管理和数据仓库建设中,以应对传统湖仓在元数据管理、数据时效性等方面的瓶颈。
但是在使用 Iceberg 的过程中,也伴随着不少痛点,如碎片文件合并和过期文件清理等。为解决这些问题,Arctic 作为一套开放式架构下的湖仓管理系统,提供了可插拔的数据自优化机制和管理服务,而自动优化 Iceberg 表就是其中的一个重要功能,它包括了最常见的文件合并和文件清理两种场景。本文将帮助你了解 Arctic 的文件自动优化功能,并演示如何使用 Arctic 自动优化 Iceberg 表。
原理简介
在数据湖应用中,文件合并是一个常见的需求,尤其是对于时效性要求较高的表,数据的实时写入和频繁提交会导致元数据文件、碎片文件、冗余文件的快速膨胀,严重影响查询性能,并给文件系统带来压力。为此,Hudi、Iceberg 等都提供了多种文件合并的方案,大致可以分为两类:
一类是在数据写入之后,单独调度任务进行文件合并,称为 offline compaction。这种方式依赖于用户手动调度,有较高的维护成本,执行时机、策略、资源消耗等需要在实践中不断摸索。
另一类是在数据写入的同时进行文件合并,称为 inline compaction。这种方式不需要额外调度合并任务,但也存在很多不足:
延迟:合并容易对数据写入的性能产生影响,进而带来数据延迟
成本:合并增大了写入任务的资源消耗,且每张表独占合并资源,容易造成资源浪费,资源用量也不好评估
复杂度:写入任务的复杂度增大,影响了数据写入的稳定性
不同于业界的常见方案,Arctic 的文件合并具有两个鲜明特点,一个是文件的合并是异步持续进行的,不干扰数据写入过程,也不需要用户手动触发;另一个是合并的资源是多表共享的,支持用户做集中的资源管理,减少资源浪费。更多原理性的介绍请参考:Arctic 自动优化湖仓原理解析
借助 Arctic 平台,用户可以摆脱手动优化 Iceberg 表的方式,大大简化运维负担;同时 Arctic 对 Iceberg 表的优化是无侵入性的,各个引擎使用 Iceberg 表不受影响,无需额外适配。
接下来,我们将按照步骤,演示如何用最简单的方法,将已有的 Iceberg 表接入 Arctic 环境,开启表的自动优化,查看优化效果,并对优化过程进行调控。跟随这些步骤,相信你也可以轻松完成 Arctic 的接入!
详细指引
Iceberg 表接入 Arctic
第一步,我们将已有的 Iceberg 集群作为单独的 Catalog 接入 Arctic 环境,当然,在这之前,需要用户先部署 AMS。
AMS 的部署方式请参考官方文档 Deployment 部分https://arctic.netease.com/ch/guides/deployment/
部署好 AMS 环境之后,就可以开始 Iceberg Catalog 的配置啦。
配置 Iceberg Catalog
→ 步骤 1:打开 Catalogs 页面
首先,在 AMS 的 Web 页面的侧边栏点击 Catalogs,进入 Catalogs 管理页面,在这里可以查看、新建和修改已有的 Catalog。
→ 步骤 2:新建 Catalog
接着,我们点击 + 按钮,开始新建 Catalog 的流程,首先配置 Catalog 基本信息(Basic),先在 Name 栏为 Catalog 任意起一个名字。
→ 步骤 3:选择 Catalog 类型
接下来选择 Catalog 类型,这里需要选择合适的 Metastore 和 TableFormat,为了接入已有的 Iceberg 表,TableFormat 需要选择 Iceberg ,Metastore 根据自己使用 Iceberg 表的方式进行配置,我们以 Hive Metastore 为例进行演示,更多 Metastore 的选择场景参考下表:
Metastore 类型 | 场景 |
Hive Metastore | 使用 Iceberg HiveCatalog 时选择 |
Hadoop | 使用 Iceberg HadoopCatalog 时选择 |
Custom | 使用其他 Iceberg Catalog 时选择(如Glue/Rest/Jdbc),需要在参数里指定对应的 catalog-impl |
Tips:
如果选择 Hadoop 类型的 Metastore,需要额外在 Properties 中配置 warehouse 属性,值为 iceberg 环境的根目录,根目录的下级目录被识别为 database,再下级目录识别为表的路径。
→ 步骤 4:上传配置文件
下一步,配置存储信息(Storage),依次点击 Upload 按钮,将准备好的 core-site、hdfs-site、hive-site 文件上传到 AMS。
→ 步骤 5:选择认证方式
接下来,配置认证信息(Authentication),认证方式可选 SIMPLE 和 KERBEROS 两种方式,如果开启了 Kerberos 认证,还需要将 Keytab、Krb5 文件依次上传。
→ 步骤 6:配置属性并提交
配置其他属性(Properties),比如我们在这里配置
table.self-optimizing.enabled = fasle
table.table-expire.enabled = false
分别表示默认关闭 Catalog 下所有表的自动合并和文件过期。
Tips:
表级别的属性(TableProperties) 可以配置在 CatalogProperties 中,配置的方式是在 TableProperties 属性名之前加上 table. 作为前缀;CatalogProperties 对 Catalog 下所有表都生效,如果一个属性在表级别和 Catalog 级别都配置了,则以表级别的配置为准。
最后,点击底部的 Save,就完成了 Iceberg Catalog 的创建,之后,我们可以随时修改这些配置。
查看 Iceberg 表
Catalog 创建成功后,点击侧边栏的 Tables,就可以看到创建的 Catalog,并且,AMS 已经自动发现了 Catalog 下的库和表,选择某张 Iceberg 表后,我们可以查看表的详情页(Details):
详情中包括 Iceberg 的表结构(Schema)、分区(Partition Key)信息,以及一些统计信息(Metrics)和表级别的属性(Properties)。
另外还有 Transactions 页面以及 Operations 页面,分别展示了 Iceberg 表的快照(Snapshot)历史和元数据修改历史。
这些信息将 Iceberg 表的状态直观地展现出来,给 Iceberg 表的维护带来很大便利。
使用自动的文件合并
Iceberg 表接入 Arctic 后,就可以利用 Arctic 进行文件自动合并,将文件大小和数量控制在合适的水平,接下来我们演示如何开启一张表的自动合并功能。
启动 Optimizer
首先,开启自动合并之前,需要启动 Optimizer 资源。
Arctic 自动合并采用多表共享 Optimizer 资源的方式,持续不断地进行文件合并。各个表通过 Optimize Group 实现资源隔离。一旦在某个 Optimize Group 中启动了 Optimizer,该 Optimize Group 下的表都能利用这些资源进行自动合并。
启动 Optimizer 有多种方法(请参考官网以获取更多启动方式:https://arctic.netease.com/ch/guides/managing-optimizers/),生产上,可以用你最熟悉的方式,手动将 Optimizer 直接部署在现有的 Flink 集群上,步骤如下:
→ 步骤 1:配置 Optimize Group
首先配置 container 类型是 external 的 Optimize Group,表示该 group 下的 Optimizer 是外部手动启动的,编辑 AMS 的 conf/config.yaml 文件,在 containers 和 optimize_group 项目下分别添加以下配置,新建名为 test_group
的 Optimize Group :
containers:
- name: externalContainer
type: external
properties:
optimize_group:
- name: test_group
container: externalContainer
→ 步骤 2:重启 AMS
重启 AMS,执行 bin/ams.sh restart,使配置生效。在 Setting 页面中,可以看到配置成功的 Optimize Group。
→ 步骤 3:启动 Flink Optimizer
我们这里演示用 flink 自带的方式启动一个 Flink Optimizer 任务到 yarn 集群:在 flink-conf.yaml 中配置好 yarn 的信息之后,到 flink/bin 目录下,执行下列命令,即可完成一个 Optimizer 任务的启动。
flink run -m yarn-cluster -ytm {EXECUTOR_TASKMANAGER_MEMORY} -yjm {EXECUTOR_JOBMANAGER_MEMORY} -c com.netease.arctic.optimizer.flink.FlinkOptimizer {ARCTIC_HOME}/plugin/optimize/OptimizeJob.jar -a {AMS_THRIFT_SERVER_URL} -qn {OPTIMIZE_GROUP_NAME} -p {EXECUTOR_PARALLELISM} -m {EXECUTOR_MEMORY} --heart-beat 60000
参数 | 说明 |
-ytm EXECUTOR_TASKMANAGER_MEMORY | Flink 任务 TM 内存大小(MB) |
-yjm EXECUTOR_JOBMANAGER_MEMORY | Flink 任务 JM 内存大小(MB) |
ARCTIC_HOME | AMS 根目录 |
-a AMS_THRIFT_SERVER_URL | AMS 的 thrift 服务地址, 比如:thrift://127.0.0.1:1260, 可从 config.yaml 的配置中获取 |
-qn OPTIMIZE_GROUP_NAME | optimize group 的名称, 在 config.yaml 有配置,也可在前端 Optimizer-->Optimizer group里查看, 这个 group 里的 container 必须是 external 类型的 |
-p EXECUTOR_PARALLELISM | Flink 任务并行度 |
-m EXECUTOR_MEMORY | 执行内存(MB),包含 Flink 任务的 JM 的内存和 TM 的内存的总和, 用来上报给 AMS 来统计 optimizer 的资源占用 |
--heart-beat | optimizer 心跳上报间隔 |
启动后的 Optimizer Flink 任务如下所示,可以看到,该 Optimizer 有 4 个并发度。
在 AMS 的 Optimizer 页面中,我们也能看到成功启动的 Optimizer。
启动表的 Optimizing
在启动 Optimizer 资源后,只需执行一条命令,即可轻松开启指定表的自动合并功能。使用你熟悉的 Iceberg 表配置方式,或者直接使用 AMS Terminal 提供的 Spark SQL 环境,执行下面的 SQL 为表新增两个属性:
alter table db_xxx.table_xxx set tblproperties (
'self-optimizing.enabled' = 'true',
'self-optimizing.group' = 'test_group');
这样,指定表的 Optimizing 就开启了,刚才在 test_group
下启动的 Optimizer 资源,会自动对这张表进行持续的文件合并。
同样地,我们可以为更多表开启 Optimizing。这些表将共享 Optimizer 资源,并同时进行自动合并。
查看 Optimizing 的状态和效果
在 Optimizers 页面中,我们可以查看所有表当前的 Optimizing 状态,共有 idle/pending/minor/major/full 五种状态。
idle 表示表不需要进行 Optimizing pending 表示表在等待 Optimizer 资源进行Optimizing minor/major/full 表示表正在进行 Optimizing
而在表的 Optimized 页面中,展示了持续做文件合并的历史。
Optimizing 的调优
在详情(Details)页中,我们可以看到,经过 Optimizing 之后,表的平均文件大小有了显著提升,从最开始的 178.11KB 提升到 3.94MB,文件数量也降低了,如果用户对文件情况仍然不满意,可以继续调整参数来达到进一步提升文件平均大小、减少文件数量的效果。
常用的调整手段有:
1.增大 target 文件大小
参数名 self-optimizing.target-size,默认值,128M
表示 Full Optimizing 之后文件的目标大小,增大这个指标,可以提升文件平均大小。
2.增大 fragment 文件大小
参数名 self-optimizing.fragment-ratio,默认值,8
fragment-size 是 Minor Optimizing 之后文件的目标大小,参数 self-optimizing.fragment-ratio 表示 target-size 与 fragment-size 的比值,减小这个参数(最低可以减小到1),可以增大 fragment-size。
当然,调整上述指标可能导致文件合并的写放大问题加剧,从而增加资源消耗。因此,用户需要权衡其中的利弊。有关更多调节指标的信息,请参阅官方文档:https://arctic.netease.com/ch/configurations/#self-optimizing
使用自动的过期和孤儿文件清理:
快照自动过期
由于文件只有在快照过期(Expire)时才会真正从文件系统删除,因此持续对 Iceberg 表进行过期是十分必要的。
Arctic 提供了这样的能力,开启自动过期的操作同样十分简单,只需要执行:
alter table db_xxx.table_xxx set tblproperties (
'table-expire.enabled' = 'true',
'snapshot.base.keep.minutes' = '720');
snapshot.base.keep.minutes 表示快照保留时间,上述配置表示保留最近 12 个小时的快照。
由于每张表的快照过期每隔 1 小时执行一次,因此,开启快照过期功能后,需要等待一段时间才会生效。我们可以在 Transactions 页面中看到快照过期的效果,页面中展示的就是所有未过期的快照。
自动清理孤儿文件
由于孤儿文件产生的条件相对苛刻,孤儿文件清理,相比快照过期使用的场景更少一些,因此在 Arctic 中该功能默认是关闭的。如果用户需要定期清理孤儿文件,可以手动开启,开启方式仍然是:
alter table db_xxx.table_xxx set tblproperties (
'clean-orphan-file.enabled' = 'true');
为了避免误删文件,在默认情况下,最近修改时间在两天之内的文件不会被删除。由于孤儿文件清理的代价相对较高,即使开启了清理功能,也只会每隔 7 天触发一次。
孤儿文件清理的过程的和效果可以在 AMS 的日志中,通过搜索 OrphanFilesCleanService 进行跟踪。
总结
本文旨在介绍 Arctic 自动优化 Iceberg 表的使用方式,包括文件合并、快照过期和孤儿文件清理。我们期望通过 Arctic 的自动优化,不仅能为用户减轻 Iceberg 表的运维负担,也能带来切实的性能提升。在性能测试中,我们对 Iceberg 表进行了持续写入,当开启了自动优化功能时,查询耗时都能维持在一个较低的水平,而关闭自动优化后,Iceberg 表查询性能急剧下降。具体的测试场景可以参考: Arctic Benchmark:https://arctic.netease.com/ch/benchmark/benchmark/
未来,Arctic 将持续迭代 Iceberg 表的自动优化功能,例如提供更丰富的文件统计信息、资源利用率监控,更灵活的触发指标和资源分配策略等,帮助用户在资源消耗和优化效果之间做出更好的权衡。Arctic 将持续提供更多特性,欢迎大家关注社区进展。
END
看到这里 记得关注、点赞、转发 一键三连哦~
精彩回顾:
关于 Arctic 的更多资讯可查看:
【GitHub】 地址:https://github.com/NetEase/arctic 【Arctic】 文档地址:https://arctic.netease.com/ch/
【社群】:扫描下方二维码加入 Arctic 社群↓ (也可以添加小助手微信:kllnn999 邀你进群)