查看原文
其他

干货|字节跳动流式数据集成基于Flink Checkpoint两阶段提交的实践和优化(1)

开发套件团队 字节跳动数据平台
2024-09-12


字节跳动开发套件数据集成团队(DTS ,Data Transmission Service)在字节跳动内基于 Flink 实现了流批一体的数据集成服务。其中一个典型场景是 Kafka/ByteMQ/RocketMQ -> HDFS/Hive 。Kafka/ByteMQ/RocketMQ -> HDFS/Hive(下面均称之为 MQ dump,具体介绍可见 字节跳动基于Flink的MQ-Hive实时数据集成 ) 在数仓建设第一层,对数据的准确性和实时性要求比较高。



文 | 字节跳动数据平台开发套件数据集成团队

目前字节跳动中国区 MQ dump 例行任务数巨大,日均处理流量在 PB 量级。巨大的任务量和数据量对 MQ dump 的稳定性以及准确性带来了极大的挑战。

本文主要介绍 DTS MQ dump 在极端场景中遇到的数据丢失问题的排查与优化,最后介绍了上线效果。 本文分两次连载,第一篇主要介绍Flink Checkpoint 以及 MQ dump 写入流程。

HDFS 集群某个元数据节点由于硬件故障宕机。在该元数据节点终止半小时后,HDFS 手动运维操作将 HDFS 切主到 backup 节点后,HDFS 恢复服务。故障恢复后用户反馈 MQ dump 在故障期间有数据丢失,产出的数据与 MQ 中的数据不一致。
收到反馈后我们立即进行故障的排查。下面先简要介绍一下 Flink Checkpoint 以及 MQ dump 写入流程 。

DataLeap

Flink Checkpoint 简介

Flink 基于 Chandy-Lamport 分布式快照算法实现了 Checkpoint 机制,能够提供 Exactly Once 或者 At Least Once 语义。
Flink 通过在数据流中注入 barriers 将数据拆分为一段一段的数据,在不终止数据流处理的前提下,让每个节点可以独立创建 Checkpoint 保存自己的快照。每个 barrier 都有一个快照 ID ,在该快照 ID 之前的数据都会进入这个快照,而之后的数据会进入下一个快照。

Checkpoint 对 Operator state 进行快照的流程可分为两个阶段:
  • Snapshot state 阶段:对应 2PC 准备阶段。Checkpoint Coordinator 将 barries 注入到 Source Operator 中。Operator 接收到输入 Operator 所有并发的 barries 后将当前的状态写入到 state 中,并将 barries 传递到下一个 Operator。
  • Notify Checkpoint 完成阶段:对应 2PC 的 commit 阶段。Checkpoint Coordinator 收到 Sink Operator 的所有 Checkpoint 的完成信号后,会给 Operator 发送 Notify 信号。Operator 收到信号以后会调用相应的函数进行 Notify 的操作。

而在任务失败后,任务会从上一个 Checkpoint state 中进行恢复,进而实现 Exactly Once 或者 At Least Once 语义。

DataLeap

MQ dump 写入流程梳理

MQ dump 利用 Flink Checkpoint 机制和 2PC(Two-phase Commit) 机制实现了 Exactly Once 语义,数据可以做到不重不丢。

根据 Flink Checkpoint 的流程,MQ dump 整个写入过程可以分为四个不同的流程:
  • 数据写入阶段
  • SnapshotState 阶段
  • Notify Checkpoint 完成阶段
  • Checkpoint 恢复阶段
整个流程可以用下面的流程图表示:
下面详细介绍上面各个阶段的主要操作。假设 Flink 任务当前 Checkpoint id 为 n,当前任务的 task id 为x。

数据写入阶段

写入阶段就主要有以下两个操作:
  • 如果是当前 Checkpoint 第一次写入(transaction),先清理要写入临时文件夹 /tmp/cp-n/task-x
  • 在临时文件夹中建立文件并写入数据
注意在写入数据之前我们会先清理临时目录。执行这个操作的原因是我们需要保证最终数据的准确性:
假设任务 x 在 Checkpoint n 写入阶段失败了(将部分数据写入到临时文件夹/tmp/cp-n/task-x),那么任务会从上一个 Checkpoint n-1 恢复,下一个写入的 Checkpoint id 仍然为 n。如果写入前不清理临时目录,失败前遗留的部分脏文件就会保留,在 Checkpoint  阶段就会将脏文件移到正式目录中。

SnapshotState 阶段

SnapshotState 阶段对应 2PC 的两个阶段中的第一个阶段。主要操作是关闭正在写入的文件,并将任务的 state (主要是当前的 Checkpoint id 和 task id)存储起来。

Notify Checkpoint 完成阶段

该阶段对应 2PC 两个阶段中的第二个阶段。主要操作如下:
  • List 临时目录文件夹 /tmp/cp-n/task-x
  • 将临时目录文件夹下的所有文件 rename 到正式目录
  • 删除临时目录文件夹 /tmp/cp-n/task-x

    Checkpoint 恢复阶段

    Checkpoint 恢复阶段是任务在异常场景下,从轻量级的分布式快照恢复阶段。主要操作如下:
    • 从 Flink state 中恢复出任务的 Checkpoint id n 和 任务的 task id x
    • 根据 Checkpoint id 和 任务的 task id x 获取到临时目录文件夹 /tmp/cp-n/task-x
    • 将临时目录文件夹下的所有文件 rename 到正式目录
    • 删除临时目录文件夹/tmp/cp-n/task-x

    了解完相关写入流程后,我们回到故障的排查。用户任务配置的并发为 8,也就是说执行过程中有 8 个task在同时执行。 在下一次连载中,我们会重点介绍故障排查和优化方案。

    开发套件团队正在招人,点击阅读原文了解

    产品介绍

    火山引擎大数据研发治理套件DataLeap

    一站式数据中台套件,帮助用户快速完成数据集成、开发、运维、治理、资产、安全等全套数据中台建设,帮助数据团队有效的降低工作成本和数据维护成本、挖掘数据价值、为企业决策提供数据支撑。后台回复数字“2”了解产品

    - End -

    修改于
    继续滑动看下一个
    字节跳动数据平台
    向上滑动看下一个

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存