查看原文
其他

面向流批一体的 Flink Runtime 新进展

高赟@阿里 Apache Flink 2023-05-01

▼ 关注「ApacheFlink」,看更多大咖 ▼

摘要:本文整理自阿里巴巴技术专家高赟 (云骞) 在 Flink Forward Asia 2021 核心技术专场的演讲。主要内容包括:
  1. 流批一体
  2. 语义完善与增强
  3. 性能优化
  4. Remote Shuffle
  5. 总结与展望

Tips:点击「阅读原文」查看原文视频 & 演讲PDF~

一、流批一体



流批一体的目标是希望能够为有限数据和无限数据提供一套统一的处理 API,包括 Datastream API 与 Table/SQL API,其中有限数据的处理对应离线处理,而无限数据的处理则对应在线处理。


之所以需要这么一套流批一体的处理 API,主要有以下两个原因:


  • 首先,随着实时计算的不断发展,大多数企业数据处理的 pipeline 都是由离线处理和在线处理组成的,使用同一套开发 API 就可以减少流处理和批处理作业开发过程中的学习与维护的成本;

  • 另外,在许多场景下,用户的流处理作业可能受限于延迟数据或者线上逻辑变更的情况,例如用户可能会过很长时间才会发起评论或线上处理的逻辑可能需要进行升级,这种情况下就需要使用离线作业对之前处理的结果进行修正,即 backfill 的情况。

这种情况下如果使用两套不同的 API 会很难维护处理结果的一致性,因此用户需要一套统一的 API 来解决上述问题。


Flink 流批一体的 API 由 Datastream API 与 Table/SQL API 两部分组成,其中 Table/SQL API 是相对比较 high level 的 API,主要提供标准的 SQL 以及与它等价的 table 操作,datastream 则相对 low level,用户可以显式地操作算子 time 和 state,这两套 API 可以互相转换结合使用。


针对两套 API,Flink 提供了两种不同的执行模式:


  • 其中流执行模式是通过保留算子的 state 等新数据到达的时候进行增量计算来实现的。它可以同时用于有限数据集和无限数据集的处理,可以支持任意处理逻辑,比如 salt 操作,它允许保留所有历史数据并支持 retraction,当一条新数据到达时,就可以更新保留的历史数据并对历史上收到的所有数据进行重新排序,最后对之前发送的结果进行 retraction 来实现。比如 reduce 算子,可以进一步优化来避免实际存储无限大的历史状态。另外增量计算中,由于数据到达时是无序的,因此它对 sql 的访问也是无序的,从而可能导致随机 io。最后,流处理模式依赖于定时 checkpoint 来支持 failover,这种情况也会导致一定的处理开销。

  • 因此对于有限数据集的处理,我们也提供了专用的批处理模式,算子通过逐级运算的方式来处理,因此只能用于有限数据的处理。这种情况下算子实现可以进行特定的优化,比如对数据先进行排序,然后按 key 进行逐个处理,从而避免无限大的状态随机 io 的问题。

Flink 能够保证,在两种执行模式下,相同的有限输入数据的处理结果可以保持一致。此外,它对两种不同的模式也提供了统一的 pipelined region 调度器、统一的 shuffle service 插件接口以及统一的 connector 接口,为两种接口提供了统一的支持。


目前 Flink 的架构如上图所示,无论是在 API 上还是在具体实现上,已经整体做到了流批一体的状态。

二、语义增强与完善


对于上述流批一体的定义,我们在最近几个版本中也进行了持续的完善和优化,第一部分是关于流批一体语义的增强与完善。


首先是在流模式下支持部分 task 结束后继续做 checkpoint 工作。


目前流程之下作业的结束可以分为两种情况:


  • 如果 source 是有限的,作业最终会执行结束;

  • 在无限 source 的情况下,用户可以通过 stop-with-savepoint--drain 命令来终止作业,并保留一个 savepoint。如果不指定 drain 参数,就不会进行 drain 操作,这种情况一般是为了保留 savepoint 来重启作业,不属于作业终止的情况。

之前的 Flink 不支持部分 task 结束后进行 checkpoint,因为这会导致两个问题:

  • 第一,两阶段提交的 sink 在流模式下依赖于 checkpoint 实现数据端到端的一致性。这种情况下,两阶段提交的 sink 会首先将数据写入临时文件或外部事务中,只有当 Flink 内部的 checkpoint 成功之后,在保证 checkpoint 之前的数据不会进行重放的前提下,两阶段提交的 sink 才可以放心地通过重命名文件或提交事务的方式来进行事务实际的提交。如果部分 task 结束之后不能做 checkpoint,那么最后一部分数据总是无法进行提交,也就无法保证流模式与批模式处理结果的一致性。

  • 第二,对于同时包括有限数据 source 和无限数据 source 的混合作业,如果部分执行结束后不能进行 checkpoint,那么后续执行一旦发生 failover 就会由于回退导致较大的开销。


为了解决上述问题,我们需要支持部分 task 结束之后也可以进行 checkpoint 的功能,并且修改作业结束的流程,使得使用两阶段提交的 task 最后可以等待一个 checkpoint 完成之后再退出。对于正常结束的情况,可以等待下一个 checkpoint 完成后退出;对于 drain 的情况,可以等待 savepoint 完成后退出。


为了实现部分 task 结束之后能进行 checkpoint,我们对 checkpoint 的流程进行了修改。首先重新识别新的 source task,就是那些前序任务都已经终止但本身尚未终止的 task,然后从这些 task 开始发送 barrier 进行正常的 checkpoint 操作。由于 checkpoint 中 state 是以 jobvertext 为单位进行记录的,因此如果一个 jobvertext 中所有 task 都已结束,会在它的状态中记录一个特殊的标记 ver,如果是部分 task 结束,会保留所有正在运行的 task state 作为 jobvertext state,而所有其他 jobvertext 的处理流程与正常 checkpoint 保持一致。作业发生 failover 重启之后,会跳过完全终止的 jobvertext,对其他的 task 的处理逻辑与正常的处理逻辑保持一致的。


基于上述工作,我们对作业结束后的流程和语义也进行了重新的梳理。为了保证两阶段提交的sink能够在提交最后一部分数据后再退出,必须使这些 task 能够等待最后一个 checkpoint 之后再退出。目前的逻辑下,作业在自然结束的时候,首先会发送 max watermark,然后发送 EndOfPadtitionEvent。一个 task 收到 endofPadtitionEvent 之后会分别调用算子的 endOfEInput()、close() 和 dispose() 操作。如果要在最后插入一个 checkpoint,那么最好的方式插入到 close 方法之后。因为在这里,作业已经完成了所有工作。

但是在实际场景下却有所区别,因为实际场景下会触发一个 savepoint,savepoint 成功之后,source 会调用分析式方法来结束执行,并发送 max watermark EndOfPadtitionEvent,后续逻辑和 checkpoint 情况下一致。由于前面已经进行了 savepoint,如果在 close 之后再进行 checkpoint,会显得非常冗余。在这种情况下更合适的方式是先进行作业的结束,然后再进行 savepoint 操作,并且在 savepoint 操作的同时提交最后一部分数据。

但是这也存在一个问题,如果要保留最后一个 savepoint,那么所有 task 就必须等待同一个 savepoint 才能结束,在自然结束的情况下,不同的 task 可以等待不同的 checkpoint 来退出。但是在 savepoint 情况下,作业结束之前已经发送过 EndOfPadtitionEvent,它会关闭 task 之间的网络通信,因此在作业终止之后无法再从最开始的 source 做 savepoint。


为了解决这些问题,必须能够先通知所有 task 进行结束但不关闭网络链接,等所有 task 结束之后再发起一个 savepoint 操作,并且在成功之后再关闭网络链接,就能实现所有 task 等待最后同一个 savepoint 状态而结束。

为了支持这一修改,我们引入了一条新的 EndOfDataEvent。任务收到 EndOfDataEvent 后会调用之前在 EndOfPadtitionEvent 进行的处理。之后 source 会立刻发送一个 barrier 来触发 savepoint 操作,算子会它结束之后再执行退出逻辑。

此外,我们也对之前比较有歧义的 close() 和 dipose() 操作进行了重命名,分别改成了 finish() 和 close(),其中 finish() 只会在任务正常结束时进行调用,而 close() 会在作业正常结束和异常结束的时候都进行调用。


在语义部分,我们进行的另外一个工作是 Hybrid source。

Hybrid source 支持用户读取历史批数据之后,再切回到有限流数据上进行处理,它适用于处理逻辑一致的情况下进行流批互转的操作。也就是在实时数据已经落盘用户需要进行 backfill,且流批处理逻辑一致的情况下,用户可以方便地使用 hybrid source 来实现作业。

三、性能优化


除了在语义方面进行的工作之外,我们在 runtime 层也进行了一些性能方面的优化。

3.1 调度部署性能优化



首先是关于调度部分的性能优化。Flink 由于存在 all to all 的连接关系,两个并发为 n 的算子之间会有 n² 条边,这 n² 条边显式地存在 jm 的内存中,并且许多调度和部署逻辑也会直接依赖于它进行处理,从而导致 jm 内存空间和许多计算的时间和空间复杂度都是 on²。由于 batch 作业一般具有更大的规模,并且调度更加细粒度,因此这会加重调度和部署的性能问题。


为了解决这一问题,我们利用 all to all 边的对称性,对内存中的数据结构和计算逻辑进行了重构,引入了 comsumergroup 的数据结构来代替之前的 excutionEdge 对算子之间的连接关系进行统一描述。这种方式不再重复描述堆对称的信息,从而避免了 n² 的复杂度。基于这一新的描述方式,我们不再在内存中维护 excutionEdge。

此外我们调整了许多调度的算法,比如计算 pipeline region、在一个 task 结束之后计算后续需要调度的 task 等,将它们的时间复杂度也降低到了 O(n)。


计算 pipeline region 过程中还有一部分特殊逻辑,Flink 在作业 dag 图中包含两种边, pipeline 边和 blocking 边。前者要求上下游的任务必须同时启动并通过网络传输数据,后者则要求上下游任务依次启动并通过文件来传输数据。在调度前首先需要计算 pipeling region,一般来说可以按照 blocking 边进行打断,将所有通过 pipeline 边相连的 task 放到同一个region里,但这种逻辑存在一个问题,如上图所示可以看出,因为并发 1 的 task 和并发 2 的 task 之间是通过 blocking 边分成两个 region 的,如果直接通过 blocking 边打断将它分为两个 region。而因为 task1 和 task2 之间存在 all to all 的 shuffle 关系,最后在 region 组成的图上会存在环形依赖的问题,在调度的时候会产生死锁。

在之前的实践中,我们通过 tarjan 强联通分支算法来识别这种环境依赖,彼时的识别是直接在 excutiongraph 上进行,所以它的时间复杂度是 O(n²),因为 all to all 的边存在 n² 的连接。通过进一步分析发现,如果在 jobgraph 中直接进行 pipeline 的认证识别,只要图中有 all to all 的边就一定存在环形依赖,因此可以直接在 jobgraph 上先进行判断,识别出所有 all to all 的边,然后在 excutiongraph 上再对非 all to all 的边进行处理。

通过这种方式,可以将环形依赖识别的复杂度降低到 O(n)。

3.2 部署性能优化



另一部分优化是关于部署性能。

Flink 在部署 task 的时候会携带它的 shuffle descriptors。对于上游来说,shuffle descriptors 描述了数据产出的位置,而对于下游来说,它描述了需要拉取数据的位置。shuffle descriptors 与 ExcutionEdge 的数量是相等的,因此这个数量级也是 O(n²)。在内存中进行计算序列化存储的时候,shuffle descriptors 会消耗大量 CPU 和内存,卡死主线程,导致 TM 及耗尽内存的问题。

但是由于上下游存在对称性,因此有很多 shuffle descriptors 其实是重复的,我们可以通过缓存 shuffle descriptors 的方式来降低维护它的数量。

另外为了进一步防止并发量过大导致 shuffle descriptors 过大,导致内存 oom,我们改用 BlobServer 来传输 shuffle descriptors。


实现了上述优化以后,我们采用一个 10000×10000 的 all to all 两级作业进行测试,可以看出调度和内存占用缩减了 90% 以上,部署时间缩减 65% 以上,极大提高了调度和部署的性能。


流执行模式调度和部署的优化极大地减少作业 failover 时重新启动的时间,但是一旦发生 failover,仍然需要花费一定的时间来进行重新部署以及初始化、加载 state 等工作。为了进一步减少这个时间,我们正在尝试在作业发生 failover 的时候,只重启出错节点。其中的难点在于如何保证数据的一致性,我们目前正在探索中。


另外一部分 runtime 的优化是在流模式下通过 Buffer Debloating 来动态调整 buffer 的大小,从而在反压的情况下减少 checkpoint 的 buffer 对齐所需要的时间,避免 checkpoint 超时。如果产生反压,当作业中间缓存的数据量过大时,可以及时减少 buffer 的大小来控制中间缓存的数据大小,从而避免因为处理数据而阻塞 barrier 的情况。

四、Remote Shuffle



shuffle 是批处理作业执行中非常重要的一部分,由于云原生可以提供统一的运维 API、减少运维开销,以及在离线混部和动态伸缩的情况下提供更好的支持,Flink 在最近的几个版本里也在积极拥抱云原生,比如提供了 Flink on k8s 的完整的实现,以及支持动态伸缩的 schedule。但是由于 Flink shuffle 需要使用本地磁盘,如果要支持云原生的 Flink,我们也需要实现存储计算分离的 shuffle。存储计算分离的架构可以使得计算资源与存储资源单独伸缩,避免 task manager 无法在计算完成后立刻退出,从而提高整个资源的利用率。同时也可以避免 task 执行失败导致 TM 退出而影响 shuffle 文件服务的稳定性,从而对下游的执行造成影响。

针对上述存储计算分离 shuffle 的需求,我们在内部也研发了 remote shuffle service,这一功能已经于今年年初在内部上线。经过一段时间的试用,我们在前段时间对这一系统进行了开源,下面将对这一系统进行详细介绍。


Flink 可以支持多种不同的场景,不同场景下的 shuffle 在存储介质传输方式和部署方式方面是存在较大差异的。比如流处理模式下,Flink 一般采用的基于网络的在线传输方式,数据缓存在上游 TM 的内存中,并在下游 task 有空闲 buffer 的时候及时进行发送。而分析处理模式下,为了支持算子的逐级运行,Flink 还需要支持基于文件的离线传输方式,先写入离线文件中,下游 task 启动之后再通过网络发送给下游的 task,离线文件可以存在本地的 TM 中,也可以存在远程的服务中。

另外,不同的 shuffle 在生命周期管理、元数据管理和数据分发策略方面也存在许多共同需求,所有 shuffle 都需要调度器在启动上游 task 的时候,申请相应的 shuffle 资源,并对其进行记录。还需要调度器在部署下游 task 的同时,携带 shuffle 的资源描述符,从而使下游 task 可以顺利读取相应的数据。最后 shuffle 还依赖调度器在特定的生命周期比如结束或者执行失败的时候,对它的资源进行清理。

为了对不同的 shuffle 提供统一的支持,Flink 从 1.9 版本开始引入了插件化的 shuffle 架构。一个 shuffle 的插件主要由两部分组成,shuffle master 负责在 jm 端与调度器进行交互,从而实现申请和释放 shuffle 资源的功能;而 result partition 和 input gate 分别作为数据的 write 和 read 端,根据调度器提供的 shuffle 资源描述符,将数据输出到特定位置或从数据位置进行读取。而所有 shuffle 实现中,共性的部分则由 Flink 统一实现,调度器会通过 partition track 对已经申请的 shuffle 资源进行记录,并根据作业的执行模式维护 shuffle 资源的生命周期。

通过统一的插件化 shuffle 接口,Flink 可以简化不同 shuffle 实现的复杂度,且允许不同的 shuffle 在实际存储与传输的方式上进行自由选择。


基于 Flink 统一的插件化 shuffle 接口,Flink remote shuffle 的整体架构如上图所示。它的 shuffle 服务由一个单独集群提供,其中 shuffle manager 作为整个集群的 master 节点,负责管理 worker 节点,并对 shuffle 数据集进行分配和管理。Shuffle worker 作为整个集群的 slave 节点,负责读写和清理数据集,Shuffle manager 还通过心跳对 Shuffle worker 和 Shuffle master 进行监听,在心跳超时的时候做数据删除和同步,从而使得集群中数据集的状态保持一致。


我们对传输过程也进行了大量优化。网络部分基于 credit-based 协议来实现,它与 Flink 目前的网络传输协议类似,我们还在其中实现了 tcp 连接复用,以及压缩可控内存管理和零拷贝等一系列优化。io 部分我们提供了支持 io 调度优化的 mapPartition 存储格式。通过 io 调度优化,它在 hdd 上的访问速度达到 140M/s 以上。

此外我们目前也正在开发基于预先 merge 的 reducepartition 的存储格式,它会将数据根据下游预先进行 merge,并存储到特定的 worker 上,下游不能全部同时启动时,可以取得比 mapPartition 更好的效果。


在部署上,Remote shuffle 可以支持多种不同的部署方式,此外我们也提供了版本间的协议兼容,使得当服务器端进行升级的时候,无需升级客户端。最后我们还在系统中提供了常用 metric 的操作,更多的运维工具也正在积极开发中。

五、总结与展望



总的来说,Flink 目前已经具备可以上线的流批一体的数据处理能力,未来我们也将进一步提升这项能力,并在更多的流批融合场景下提供支持,例如相对 Hybrid source,在 backfill 的场景下,如果流批处理的逻辑不一致,我们也在考虑支持批处理结束后保留状态用于启动流处理作业的方式。

另外我们也将进一步提高整个系统的稳定性与性能,并更深入地考虑流模式和批处理模式的本质差异,以及流批一体更深刻的内涵。

往期精选


▼ 关注「Apache Flink」,获取更多技术干货 ▼
更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~


   戳我,查看原文视频&演讲PDF~

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存