查看原文
其他

字节跳动基于Flink的MQ-Hive实时数据集成

The following article is from 字节跳动技术团队 Author 李畅

背景

在数据中台建设过程中,一个典型的数据集成场景是将MQ(Message Queue,例如Kafka、RocketMQ等)的数据导入到Hive中,以供下游数仓建设以及指标统计。由于MQ-Hive是数仓建设第一层,因此对数据的准确性以及实时性要求比较高。

本文主要围绕MQ-Hive场景,针对目前字节跳动内已有解决方案的痛点,提出基于Flink的实时解决方案,并介绍新方案在字节跳动内部的使用现状。

已有方案及痛点

字节跳动内已有解决方案如下图所示,主要分了两个步骤:

  1. 通过Dump服务将MQ的数据写入到HDFS文件

  2. 再通过Batch ETL将HDFS数据导入到Hive 中,并添加Hive分区


痛点

  1. 任务链较长,原始数据需要经过多次转换最终才能进入Hive

  2. 实时性比较差,Dump Service、Batch ETL延迟都会导致最终数据产出延迟

  3. 存储、计算开销大,MQ数据重复存储和计算

  4. 基于原生Java打造,数据流量持续增长后,存在单点故障和机器负载不均衡等问题

  5. 运维成本较高,架构上无法复用公司内Hadoop/Flink/Yarn等现有基础设施

  6. 不支持异地容灾

基于 Flink 实时解决方案

优势

针对目前公司传统解决方案的痛点,我们提出基于Flink的实时解决方案,将MQ的数据实时写入到Hive,并支持事件时间以及Exactly Once语义。相比老方案,新方案优势如下所示:

  1. 基于流式引擎Flink开发,支持Exactly Once语义

  2. 实时性更高,MQ数据直接进入Hive,无中间计算环节

  3. 减少中间存储,整个流程数据只会落地一次

  4. 支撑Yarn部署模式,方便用户迁移

  5. 资源管理弹性,方便扩容以及运维

  6. 支持双机房容灾

整体架构

整体架构如下图所示,主要包括DTS(Data Transmission Service) Source、DTS Core、DTS Sink三大模块,具体功能如下:

  1. DTS Source接入不同MQ数据源,支持Kafka、RocketMQ等

  2. DTS Sink将数据输出到目标数据源,支持HDFS、Hive等

  3. DTS Core贯穿整个数据同步流程,通过Source读取源端数据,经过DTS Framework处理,最后通过Sink将数据输出到目标端。

  4. DTS Framework集成类型系统、文件切分、Exactly Once、任务信息采集、事件时间、脏数据收集等核心功能

  5. 支持Yarn部署模式,资源调度、管理比较弹性


DTS Dump架构图


Exactly Once

Flink框架通过Checkpoint机制,能够提供Exactly Once或者At Least Once语义。为了实现MQ-Hive全链路支持Exactly-once语义,还需要MQ Source、Hive Sink端支持Exactly Once语义。本文通过Checkpoint + 2PC协议实现,具体过程如下:

  1. 数据写入时,Source端从上游MQ拉取数据并发送到Sink端;Sink端将数据写入到临时目录中

  2. Checkpoint Snapshot阶段,Source端将MQ Offset保存到State 中;Sink端关闭写入的文件句柄,并保存当前Checkpoint ID到State中;

  3. Checkpoint Complete阶段,Source端Commit MQ Offset;Sink端将临时目录中的数据移动到正式目录下

  4. Checkpoint Recover阶段,加载最新一次成功的Checkpoint目录并恢复State信息,其中Source端将State中保存的MQ Offset作为起始位置;Sink端恢复最新一次成功的Checkpoint ID,并将临时目录的数据移动到正式目录下

实现优化

在实际使用场景中,特别是大并发场景下,HDFS写入延迟容易有毛刺,因为个别Task Snapshot超时或者失败,导致整个Checkpoint失败的问题会比较明显。因此针对Checkpoint失败,提高系统的容错性以及稳定性就比较重要。

这里充分利用Checkpoint ID严格单调递增的特性,每一次做Checkpoint时,当前Checkpoint ID一定比以前大,因此在Checkpoint Complete阶段,可以提交小于等于当前Checkpoint ID的临时数据。具体优化策略如下:

  1. Sink端临时目录为{dump_path}/{next_cp_id},这里next_cp_id的定义是当前最新的cp_id + 1

  2. Checkpoint Snapshot阶段,Sink端保存当前最新cp_id到State,同时更新next_cp_id为cp_id + 1

  3. Checkpoint Complete阶段,Sink端将临时目录中所有小于等于当前cp_id的数据移动到正式目录下

  4. Checkpoint Recover阶段,Sink端恢复最新一次成功的cp_id,并将临时目录中小于等于当前cp_id的数据移动到正式目录下

类型系统

由于不同数据源支持的数据类型不一样,为了解决不同数据源间的数据同步以及不同类型转换兼容的问题,我们支持了DTS类型系统,DTS类型可细化为基础类型和复合类型,其中复合类型支持类型嵌套,具体转换流程如下:

  1. 在Source端,将源数据类型,统一转成系统内部的DTS类型

  2. 在Sink端,将系统内部的DTS类型转换成目标数据源类型

  3. 其中DTS类型系统支持不同类型间的相互转换,比如String类型与Date类型的相互转换

DTS Dump架构图


Rolling Policy

Sink端是并发写入,每个Task处理的流量不一样,为了避免生成太多的小文件或者生成的文件过大,需要支持自定义文件切分策略,以控制单个文件的大小。目前支持三种文件切分策略:文件大小、文件最长未更新时间、Checkpoint。

优化策略

Hive支持Parquet、Orc、Text等多种存储格式,不同的存储格式数据写入过程不太一样,具体可以分为两大类:

  1. RowFormat:基于单条写入,支持按照Offset进行HDFS Truncate操作,例如Text格式

  2. BulkFormat:基于Block写入,不支持HDFS Truncate操作,例如Parquet、ORC格式

为了保障Exactly Once语义,并同时支持Parquet、Orc、Text等多种格式,在每次Checkpoint时,强制做文件切分,保证所有写入的文件都是完整的,Checkpoint恢复时不用做Truncate操作。

容错处理

理想情况下流式任务会一直运行不需要重启,但实际不可避免会遇到以下几个场景:

  1. Flink计算引擎升级,需要重启任务

  2. 上游数据增加,需要调整任务并发度

  3. Task Failover

并发度调整

目前Flink原生支持State Rescale。具体实现中,在Task做Checkpoint Snapshot时,将MQ Offset保存到ListState中;Job重启后,JobMaster会根据Operator并发度,将ListState平均分配到各个Task上。

Task Failover

由于网络抖动、写入超时等外部因素的影响,Task不可避免会出现写入失败,如何快速、准确的做Task Failover就显得比较重要。目前Flink原生支持多种Task Failover策略,本文使用Region Failover策略,将失败Task所在Region的所有Task都重启。

异地容灾

背景

大数据时代,数据的准确性和实时性显得尤为重要。本文提供多机房部署及异地容灾解决方案,当主机房因为断网、断电、地震、火灾等原因暂时无法对外提供服务时,能快速将服务切换到备灾机房,并同时保障Exactly Once语义。

容灾组件

整体解决方案需要多个容灾组件一起配合实现,容灾组件如下图所示,主要包括MQ、YARN、HDFS,具体如下:

  1. MQ需要支持多机房部署,当主机房故障时,能将Leader切换到备机房,以供下游消费

  2. Yarn集群在主机房、备机房都有部署,以便Flink Job迁移

  3. 下游HDFS需要支持多机房部署,当主机房故障时,能将Master切换到备机房

  4. Flink Job运行在Yarn上,同时任务State Backend保存到HDFS,通过HDFS的多机房支持保障State Backend的多机房



容灾过程

整体容灾过程如下所示:

  1. 正常情况下,MQ Leader以及HDFS Master部署在主机房,并将数据同步到备机房。同时Flink Job运行在主机房,并将任务State写入到HDFS中,注意State也是多机房部署模式

  2. 灾难情况下,MQ Leader以及HDFS Master从主机房迁移到备灾机房,同时Flink Job也迁移到备灾机房,并通过State恢复灾难前的Offset信息,以提供Exactly Once语义


事件时间归档

背景

在数仓建设中,处理时间(Process Time)和事件时间(Event Time)的处理逻辑不太一样,对于处理时间会将数据写到当前系统时间所对应的时间分区下;对于事件时间,则是根据数据的生产时间将数据写到对应时间分区下,本文也简称为归档。

在实际场景中,不可避免会遇到各种上下游故障,并在持续一段时间后恢复,如果采用Process Time的处理策略,则事故期间的数据会写入到恢复后的时间分区下,最终导致分区空洞或者数据漂移的问题;如果采用归档的策略,会按照事件时间写入,则没有此类问题。

由于上游数据事件时间会存在乱序,同时Hive分区生成后就不应该再继续写入,因此实际写入过程中不可能做到无限归档,只能在一定时间范围内归档。归档的难点在于如何确定全局最小归档时间以及如何容忍一定的乱序。

全局最小归档时间

Source端是并发读取,并且一个Task可能同时读取多个MQ Partition的数据,对于MQ的每一个Parititon会保存当前分区归档时间,取分区中最小值作为Task的最小归档时间,最终取Task中最小值,作为全局最小归档时间。


乱序处理

为了支持乱序的场景,会支持一个归档区间的设置,其中Global Min Watermark为全局最小归档时间,Partition Watermark为分区当前归档时间,Partition Min Watermark为分区最小归档时间,只有当事件时间满足以下条件时,才会进行归档:

  1. 事件时间大于全局最小归档时间

  2. 事件时间大于分区最小归档时间



Hive 分区生成

原理

Hive分区生成的难点在于如何确定分区的数据是否就绪以及如何添加分区。由于Sink端是并发写入,同时会有多个Task写同一个分区数据,因此只有当所有Task分区数据写入完成,才能认为分区数据是就绪,本文解决思路如下:

  1. 在Sink端,对于每个Task保存当前最小处理时间,需要满足单调递增的特性

  2. 在Checkpoint Complete 时,Task上报最小处理时间到JM端

  3. JM拿到所有Task的最小处理时间后,可以得到全局最小处理时间,并以此作为Hive分区的最小就绪时间

  4. 当最小就绪时间更新时,可判断是否添加Hive分区



动态分区

动态分区是根据上游输入数据的值,确定数据写到哪个分区目录,而不是写到固定分区目录,例如date={date}/hour={hour}/app={app}的场景,根据分区时间以及app字段的值确定最终的分区目录,以实现每个小时内,相同的app数据在同一个分区下。

在静态分区场景下,每个Task每次只会写入一个分区文件,但在动态分区场景下,每个Task可能同时写入多个分区文件。对于Parque格式的写入,会先将数据写到做本地缓存,然后批次写入到Hive,当Task同时处理的文件句柄过多时,容易出现OOM。为了防止单Task OOM,会周期性对文件句柄做探活检测,及时释放长时间没有写入的文件句柄。



Messenger

Messenger模块用于采集Job运行状态信息,以便衡量Job健康度以及大盘指标建设。

元信息采集

元信息采集的原理如下所示,在Sink端通过Messenger采集Task的核心指标,例如流量、QPS、脏数据、写入Latency、事件时间写入效果等,并通过Messenger Collector汇总。其中脏数据需要输出到外部存储中,任务运行指标输出到Grafana,用于大盘指标展示。



脏数据收集

数据集成场景下,不可避免会遇到脏数据,例如类型配置错误、字段溢出、类型转换不兼容等场景。对于流式任务来说,由于任务会一直运行,因此需要能够实时统计脏数据流量,并且将脏数据保存到外部存储中以供排查,同时在运行日志中采样输出。

大盘监控

大盘指标覆盖全局指标以及单个Job指标,包括写入成功流量和QPS、写入Latency、写入失败流量和QPS、归档效果统计等,具体如下图所示:



未来规划

基于Flink实时解决方案目前已在公司上线和推广,未来主要关注以下几个方面:

  1. 数据集成功能增强,支持更多数据源的接入,支持用户自定义数据转换逻辑等

  2. Data Lake打通,支持CDC数据实时导入

  3. 流批架构统一,支持全量、增量场景数据集成

  4. 架构升级,支持更多部署环境,比如K8S

  5. 服务化完善,降低用户接入成本

总结

随着字节跳动业务产品逐渐多元化快速发展,字节跳动内部一站式大数据开发平台功能也越来越丰富,并提供离线、实时、全量、增量场景下全域数据集成解决方案,从最初的几百个任务规模增长到数万级规模,日处理数据达到PB级,其中基于Flink实时解决方案目前已在公司内部大力推广和使用,并逐步替换老的MQ-Hive链路。
参考文献:
1.Real-time Exactly-once ETL with Apache Flink
2.http://shzhangji.com/blog/2018/12/23/real-time-exactly-once-etl-with-apache-flink/
3.Implementing the Two-Phase Commit Operator in Flink
4.https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
5.A Deep Dive into Rescalable State in Apache Flink
6.https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
7.Data Streaming Fault Tolerance
8.https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/stream_checkpointing.html

推荐文章:
基于Hive进行数仓建设的资源元数据信息统计
深度详解Apache Hive
Kafka作为消息系统的详细解析
海量小文件问题综述和解决攻略

关注大数据学习与分享,获取更多技术干货

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存