查看原文
其他

基于 Log 的通用增量 Checkpoint 在美团的进展

王非凡@美团 Apache Flink 2023-06-03
摘要:本文整理自美团计算引擎工程师王非凡,在 Flink Forward Asia 2022 核心技术专场的分享。本篇内容主要分为四个部分:
    1. Log based Checkpoint 基本原理介绍    2. 美团应用场景及测试效果    3. Changelog Restore 性能优化    4. Changelog 存储选型探索


Tips:点击「阅读原文」查看原文视频&演讲 ppt


01

Log based Checkpoint 基本原理介绍


1.1 普通 Checkpoint 制作方式


Log based Checkpoint 又被称为通用的增量 Checkpoint,那就要先看一下在此之前的增量 Checkpoint 是怎么制作的,以及存在什么样的问题。这里主要是指 RocksDB 的增量 Checkpoint。为了简化描述,后面部分也都以 RocksDBStateBackend Checkpoint 为例。


如上图所示,RocksDBStateBackend Checkpoint 在制作时直接触发底层的 RocksDB 快照。在此过程中 RocksDB 需要将内存中的 MemTable Flush 到磁盘上,形成 L0 层的 SST 文件,以保证 SST 文件中保存了全量一致的状态数据。


RocksDBStateBackend 的增量 Checkpoint 就是在制作新的 Checkpoint 时不再重复上传上次 Checkpoint 已经传过的 SST 文件,通过减少重复上传文件来提高制作速度。但由于每次仍需要 RocksDB 将 MemTable 刷盘,导致高频制作 Checkpoint 时会存在一些问题。


首先是 Memtable 频繁刷盘会导致 L0 文件产生的过快,进而导致频繁的 Compaction。其次是 Checkpoint 制作时,所有 RocksDB 实例同步执行快照会导致 CPU 使用率、磁盘 IO 和网络流量的尖刺。


上图为 Log based Checkpoint 的制作方式。为了支持该功能,Flink 引入了 ChangelogStateBackend 组件。这个组件在代理状态写操作的过程中,通过背后的 Changelog Writer 组件将 State Change 数据写入到 Changelog Storage 中。在 Checkpoint 制作时,只需要 Changelog Writer 保证 Flush State Change 即可。而底层 RocksDB 的快照被称为物化,并将以较低的频率异步执行。


我们结合例子来具体看下 Checkpoint 是如何制作的,上图中也描述了随时间推进发生的事件:


  • T0 时刻作业启动,随之开始处理数据。数据处理过程中不断地有 State Change 发生。T1 时刻 Checkpoint-1 触发,此时从 T0 到 T1 所有 State Change 就是 Checkpoint-1 中的所有状态数据,也就是图中的 Change-Set-1。
  • T2 时刻触发了一次物化,即异步制作 RocksDB 快照并上传,我们简称为 m1。同时在 T2 时刻将在 State Changelog 中记录同步点,使得 T1 到 T2 之间的 State Change 构成 Change-Set-2。
  • T3 时刻触发了 Checkpoint-2,由于此时 m1 还没有完成,Checkpoint-2 仍然只由 Changelog 构成,即包含了 Change-Set-1、Change-Set-2 和 Change-Set-3。在 Checkpoint-2 制作完成之后,m1 也制作完成了。
  • T4 时刻触发了 Checkpoint-3,这时候可以发现存在一个最近完成的 m1,Checkpoint-3 可以直接由 m1 及其之后的 Change-Set-3 和 Change-Set-4 构成。
  • 通过这种方式,Flink 的 Checkpoint 与底层的状态存储快照进行了解耦,使得 Flink Checkpoint 能够以较高的频率和速度执行。


1.2 Log based Checkpoint 优势


优势有四点:


  • 第一,可以带来更轻量的 Recovery。Checkpoint 的间隔越短,Recovery 时需要回溯的数据就越少,作业恢复的速度也就会越快。
  • 第二,可以减少事务化 Sink 的端到端延迟。事务化 Sink 在 Checkpoint 完成时进行 commit,更快的 Checkpoint 意味着可以进行更频繁的 commit。
  • 第三,更可预测的 Checkpoint 间隔。不需要等 DB Flush,compaction 的影响也更小,Checkpoint 制作时长仅取决于需要持久化到 Durable Storage 上的 Changelog 数据的多少。
  • 第四,更友好的资源使用。比较重的 DB 快照操作被分散执行,可以避免 CPU 使用率、磁盘 IO 和网卡流量 随 Checkpoint 制作而产生的尖刺。


02

美团应用场景和测试效果


我们想要尝试使用 Log based Checkpoint 支持的业务需求,是流量数据天级回溯不超过千分之五。这种类型作业的特点是数据量大、作业规模大、单作业的规模有 5000 slot 左右。当前大部分作业能够稳定运行的 Checkpoint 间隔为十分钟,一天内 Failover 一次,回溯的数据量就会打破千分之五以内的要求。


针对这种需求,我们最初有两种解决思路:一个是减少数据回溯的时间,另一个是减少回溯的 Kafka 分区数。减少回溯数据时间可以通过缩短 Checkpoint 间隔来实现。这方面有 Flink 社区的 Flip-158 即我们上面介绍的 Log based Checkpoint 来支持。而减少回溯分区数可以通过只重启故障的节点来实现,这方面有 Flink 社区的 Flip-135 即 Approximate Task Local Recovery 来提供支持。


考虑到 Approximate Task Local Recovery 的方案会导致数据丢失,而这是我们的业务所不能接受的,因此我们选择了 Log based Checkpoint。由于当时该功能还是实验室阶段,因此我们首先对其进行了测试验证。


测试表明,使用 Log based Checkpoint 可以将 Checkpoint 的稳定制作间隔从十分钟下降到十秒,完全能够满足流量业务对天级数据回溯不超过千分之五的要求。


同时如上图三张资源使用率的监控所示,也验证了 Log based Checkpoint 能够优化资源使用。图中蓝线是使用传统的 Checkpoint 方式,黄线是使用了 Log based Checkpoint。具体的:CPU 使用率的峰值下降了 40%,磁盘 IO 使用率峰值下降了 29%,网卡出流量峰值下降了 61%。基本消除了传统 Checkpoint 制作过程中的资源使用尖刺。


但同时测试也暴露出 Flink 社区基于 DFS 实现的 Changelog 存储的一些问题。


首先是 Changelog Restore 重复下载文件问题。为了避免产生过多小文件,同一 TaskManager 内的 State Changelog 会尽量聚合到同一个文件中。而在 Restore 时这些文件会被同一个 TM 内的 operator 重复下载,导致 Restore 性能差。


其次是即使经过 TM 粒度的聚合,小文件问题仍然严重,HDFS NN 压力巨大。以我们一个 4800 并发的作业为例,默认配置下会产生 130 万左右个 Changelog 文件,单个作业带来的 NN 请求高达 18000 次/秒左右。


最后是 Changelog 文件写延迟太高,影响 Checkpoint 制作速度。还是以我们的 4800 并发作业为例,写 Changelog 文件 p99 延迟在 3 秒左右,最大延迟甚至达到 2 分钟,导致 Checkpoint 的制作时间非常不稳定。


针对这些问题,我们分别进行了分析和解决,这将会在后面的篇幅展开。


03

Changelog Restore 性能优化


3.1 Restore 时 Changelog 重复下载的问题


Flink 社区基于 DFS 的 Changelog Storage 在实现 Changelog 上传时,为了减少小文件问题,选择在同一 TM 内对所有 Operator Changelog 进行聚合和压缩,然后再上传到 DFS 上的文件中。而发生 Restore 时每个 Operator 实例的 Changelog Reader 都需要重复地读一次 Changelog 文件,造成严重的读放大,进而导致 Restore 的速度过慢。


减少同一个 TM 内的 slot 数可以减轻这个问题,但是又会加重 DFS 小文件的问题。


其实每个 Operator 实例的 Changelog 数据在文件中是连续的一段,并且 Checkpoint 元数据中记录了 Offset,为什么不能直接 Seek 到相应的 Offset 后再开始读操作呢,这样就可以做到每个 Operator 只读属于自己的部分,不是么?


问题就在于,为了减少 Changelog 文件的体积,Changelog 文件是经过压缩后再上传到 DFS 的。压缩导致 Offset 对压缩后的文件失效了,只能从头开始解压缩后再 Seek 到相应的 Offset 上。这样就导致了同一个 Changelog 文件的反复下载和解压缩。


针对这个问题,我们提出在 TM 上增加一个 Changelog File Cache 组件,代理 Changelog 文件的下载。Cache 组件会在需要的时候将 Changelog 文件下载并直接解压缩后存储到本地,当 Changelog Reader 发起请求时,可以直接在本地缓存的文件上 Seek 到相应的 Offset 后读取。


这样在 Restore 的过程中,每个 Changelog 文件都只需要下载和解压各一次。Changelog File Cache 组件会在内部对每个本地缓存文件记录引用计数,通过引用计数和可配置的 TTL 来清理本地缓存。这个优化已经在 Flink-1.16 上发布了。


04

Changelog 存储选型探索


上图总结了在美团业务场景下 State Changelog 数据对存储的需求。基本的功能性需求是不丢、不重和保序,以保证数据的正确性。同时在性能方面需要满足较低的写入延迟,以保证 Checkpoint 的快速的完成。最后结合美团的 Flink 作业现状,还需要能够支撑百万级的并发写入。


针对以上的各项需求,我们对候选的存储进行了对比。首先我们将存储分为两类,一类是批式上传的 HDFS 和 S3,这一类是当前实现已经支持的;另一类是流式上传,例如 BookKeeper、Kafka 和 Pulsar 等。具体细节见如下表格:


补充说明下 BookKeeper,由于在设计之初就考虑到为 WAL Log 服务,从功能保障、延迟和并发规模上来看,都比较适合用来存储 Flink 的 State Changelog。而 HDFS 和 S3 这种 DFS 类的存储,延迟和并发支持能力均不太满足需求。Kafka 由于面向分区的设计,在分区数过多的情况下表现不佳。Puslar 在能力上是满足需求的,但是相对于 BookKeeper 来看没有明显优势,又比 BookKeeper 多了 1 层 proxy 的开销。因此我们初步选择使用 BookKeeper 作为新的 Changelog 存储。


在实现 BookKeeper Changelog Storage 之前,我们先来看一下 Changelog 涉及的组件和他们的作用。如上图所示,Changelog 中组织了三个组件,分别是:


  • 负责代理状态读写的 ChangelogBackend。
  • 负责 Changelog 读写的 Changelog Storage。
  • 负责协调以上两个组件的物化 Manager。


我们要实现的就是 BookKeeper 的 Changelog Storage,其中包含 Writer 和 Reader 两个主要组件。


结合 Checkpoint 的制作流程来看一下 Changelog Writer 的角色:


  • Coordinator 触发 Checkpoint 之后,Changelog Writer 需要将已经传给自己的 State Change 数据都 Flush 到 BookKeeper 上。然后向 Coordinator ACK 一个 BookKeeper Changelog State Handle。这个 Handle 中会包含 BookKeeper 的地址、数据所在的 Ledger ID 和 Offset 等信息。
  • Coordinator 在收到 ACK 之后,将 Metadata 写入 Durable Storage 上,Checkpoint 就制作完成了。


再来看一下 BookKeeper Changelog Writer 对三个核心方法的实现:


首先是 Append 方法,用于向 Changelog Writer 传递 State Change。考虑到要减少到 BookKeeper 的请求次数,我们会在 Writer 中去攒 Batch,Batch 满之后再将 Batch 中的数据发送给 BookKeeper,并递增 Sequence Number。nextSequence 方法用来获取物化的同步点,因此需要直接将 Batch 发送给 BookKeeper,同时也要递增 Sequence Number。


Persist 的方法用于在 Checkpoint 制作时,保证 Changelog 数据被持久化到 BookKeeper 上。因此除了需要将 Batch 发送给 BookKeeper 并递增 Sequence Number 之外,由于前面的操作考虑到优化 BookKeeper 写入性能而开启了 Deferred Sink,没有要求 BookKeeper 刷盘,Persist 时还需要调用 Force 方法强制 BookKeeper 将前面收到的 entry 刷盘。最后,Persist 方法还需要收集并整理 Ledger ID 和相关的 Offset 组装成 BookKeeper Changelog State Handle 返回给 Checkpoint Coordinator。


BookKeeper Changelog State Handle 中记录了存储在 BookKeeper 上的 Changelog 数据的元信息:一部分是 BookKeeper 的地址、数据摘要类型和数据加密密钥,另一部分包含了 Ledger ID 和相关 Offset。


如上图所示,阴影部分表示了此次 Checkpoint 需要包含的 State Change,覆盖了两个 Ledger,因此需要分别记录两个 Ledger ID、Start Offset 和 End Offset。


介绍完 Changelog 的写操作,再结合 Checkpoint Restore 的流程来看下读操作。


首先 Checkpoint Coordinator 会从 Durable Storage 上读取 Metadata 并解析出其中的 BookKeeper Changelog State Handle,然后将这些 Handle 分发给不同的 BookKeeper Changelog Reader,由 Reader 负责发起读请求从 BookKeeper 中读取数据并 Apply 到 State Table 上。


如何实现 BookKeeper Changelog State Handle 在 Checkpoint Metadata 中的序列化和反序列化呢?


由于目前所有的 State Handle 类型的序列化和反序列化都是硬编码的,非常不便于实现第三方的 Changelog Storage,我们当前的方案是增加一个 Customer Keyed State Handle 的接口,允许 Keyed State Handle 自己实现序列化和反序列化方法。


Customer Keyed State Handle 在序列化时会首先将使用的序列化器的类名写入到 Metadata。再使用序列化器将 Handle 写入 Metadata。反序列化时,首先从 Metadata 中读取出序列化器类名,再根据类名使用 ClassLoader Load 出序列化器,最后使用序列化器读出 Customer Handle。


上图是 BookKeeper Changelog Storage 的配置项。除 BookKeeper 的地址外分为三类,分别用于控制 Ledger 的副本配置、Ledger 的滚动配置和批量上传配置,并且都提供了默认值。


需要说明一下,由于当前的 Shared State Registry 不支持 BookKeeper Changelog State Handle 这类并非基于 Stream State Handle 的实现,因此我们暂时通过 TTL+外部服务的方式去清理 Ledger。


我们在 State/Checkpoint 方向上的未来规划如下:


  • 继续完善 BookKeeper Changelog Storage,补充相关的指标,实现引擎内部的 Ledger 清理,并完成 Benchmark 测试和性能的分析,掌握能力边界。
  • 继续推动 Changelog 功能的落地,推动 POC 业务线上落地 Changelog 功能,在事务化 Sink 场景推广 Changelog 功能。


往期精选




▼ 活动推荐

▼ 关注「Apache Flink」,获取更多技术干货 ▼

   点击「阅读原文」,查看原文视频&演讲 PPT

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存