查看原文
其他

Flink关键技术解析与优化实战

刘建刚 大数据学习与分享 2022-07-09
作者刘建刚老师

刘建刚

快手软件工程师


  • 现就职于快手数据架构部,当前主要工作为快手Flink平台的研发和维护;

  • 曾就职于百度基础架构部。


本次分享主要分为三部分。首先介绍流式计算的基本概念,然后介绍Flink的关键技术,最后讲讲Flink在快手生产实践中的一些应用,包括实时指标计算和快速failover。

一、流式计算的介绍

流式计算的定义:流式计算主要针对unbounded data(无界数据流)进行实时的计算,将计算结果快速的输出或者修正。

这部分将分为三个小节来介绍:

第一,介绍大数据系统发展史,包括初始的批处理到现在比较成熟的流计算
第二,为大家简单对比下批处理和流处理的区别

第三,介绍流式计算里面的关键问题,这是每个优秀的流式计算引擎所必须面临的问题。



1、大数据系统发展史



上图是2003年到2018年大数据系统的发展史,看看是怎么一步步走到流式计算的。

2003年,Google的MapReduce横空出世,通过经典的Map&Reduce定义和系统容错等保障来方便处理各种大数据。很快就到了Hadoop,被认为是开源版的MapReduce,带动了整个apache开源社区的繁荣。再往后是谷歌的Flume,通过算子连接等pipeline的方式解决了多个MapReduce作业连接处理低效的问题。

流式系统的开始以Storm来介绍。Storm在2011年出现,具备延时短、性能高等特性,在当时颇受喜爱。但是Storm没有提供系统级别的failover机制,无法保障数据一致性。那时的流式计算引擎是不精确的,lamda架构组装了流处理的实时性和批处理的准确性,曾经风靡一时,后来因为难以维护也逐渐没落。

接下来出现的是Spark Streaming,可以说是第一个生产级别的流式计算引擎。Spark Streaming早期的实现基于成熟的批处理,通过mini batch来实现流计算,在failover时能够保障数据的一致性。

Google在流式计算方面有很多探索,包括MillWheel、Cloud Dataflow、Beam,提出了很多流式计算的理念,对其他的流式计算引擎影响很大。

再来看Kafka。Kafka并非流式计算引擎,但是对流式计算影响特别大。Kafka基于log机制、通过partition来保存实时数据,同时也能存储很长时间的历史数据。流式计算引擎可以无缝地与kafka进行对接,一旦出现Failover,可以利用Kafka进行数据回溯,保证数据不丢失。另外,Kafka对table和stream的探索特别多,对流式计算影响巨大。

Flink的出现也比较久,一直到2016年左右才火起来的。Flink借鉴了很多Google的流式计算概念,使得它在市场上特别具有竞争力。后面我会详细介绍Flink的一些特点。


2、批处理与流计算的区别


批处理和流计算有什么样的区别,这是很多同学有疑问的地方。我们知道MapReduce是一个批处理引擎,Flink是一个流处理引擎。我们从四个方面来进行一下对比:

1)使用场景

MapReduce是大批量文件处理,这些文件都是bounded data,也就是说你知道这个文件什么时候会结束。相比而言,Flink处理的是实时的unbounded data,数据源源不断,可能永远都不会结束,这就给数据完备性和failover带来了很大的挑战。


2)容错

MapReduce的容错手段包括数据落盘、重复读取、最终结果可见等。文件落盘可以有效保存中间结果,一旦task挂掉重启就可以直接读取磁盘数据,只有作业成功运行完了,最终结果才对用户可见。这种设计的哲理就是你可以通过重复读取同一份数据来产生同样的结果,可以很好的处理failover。

Flink的容错主要通过定期快照和数据回溯。每隔一段时间,Flink就会插入一些barrier,barrier从source流动到sink,通过barrier流动来控制快照的生成。快照制作完就可以保存在共享引擎里。一旦作业出现问题,就可以从上次快照进行恢复,通过数据回溯来重新消费。

3)性能

MapReduce主要特点是高吞吐、高延时。高吞吐说明处理的数据量非常大;高延时就是前面说到的容错问题,它必须把整个作业处理完才对用户可见。

Flink主要特点是高吞吐、低延时。在流式系统里,Flink的吞吐是很高的。同时,它也可以做到实时处理和输出,让用户快速看到结果。

4)计算过程

MapReduce主要通过Map和reduce来计算。Map负责读取数据并作基本的处理, reduce负责数据的聚合。用户可以根据这两种基本算子,组合出各种各样的计算逻辑。

Flink为用户提供了pipeline的API和批流统一的SQL。通过pipeline的API,用户可以方便地组合各种算子构建复杂的应用;Flink SQL是一个更高层的API抽象,极大地降低了用户的使用门槛。 


3、流式计算的关键问题


这部分主要通过四个问题给大家解答流式计算的关键问题,也是很多计算引擎需要考虑的问题。

1)What

What是指通过什么样的算子来进行计算。主要包含三个方面的类型,element-wise表示一对一的计算,aggregating表示聚合操作,composite表示多对多的计算。

2)Where

aggregating会进行一些聚合的计算,主要是在各种window里进行计算。窗口包含滑动窗口、滚动窗口、会话窗口。窗口会把无界的数据切分成有界的一个个数据块进行处理,后面我们会详细介绍这点。

3)When

When就是什么时候触发计算。窗口里面有数据,由于输入数据是无穷无尽的,很难知道一个窗口的数据是否全部到达了。流式计算主要通过watermark来保障数据的完备性,通过trigger来决定何时触发。当接收到数值为X的Watermark时,可以认为所有时间戳小于等于X的事件全部到达了。一旦watermark跨过窗口结束时间,就可以通过trigger来触发计算并输出结果。

4)How

How主要指我们如何重新定义同一窗口的多次触发结果。前面也说了trigger是用来触发窗口的,一个窗口可能会被触发多次,比如1分钟的窗口每10秒触发计算一次。处理方式主要包含三种:

  • Discarding,丢弃之前的状态重新计算。这种方式每次的触发结果都是互不关联的,多次触发结果的组合反映了全部的窗口内容,下游一般会再次聚合

  • Accumulating,这个就是一个聚合的状态,比如说第二次触发的时候是在第一次的结果上进行计算的,下游只需要保存最新的结果即可

  • Accumulating和retracting,这个主要在Accumulating的基础上加了一个retracting,retracting的意思就是撤销。窗口再次触发时,会告诉下游撤销上一次的计算结果,并告知最新的结果。Flink SQL的聚合就使用了这种retract的模式

二、Flink关键技术


1、Flink简介


Flink是一款分布式计算引擎, 既可以进行流式计算,也可以进行批处理。下图是官网对Flink的介绍:


Flink可以运行在k8s、yarn、mesos等资源调度平台上,依赖hdfs等文件系统,输入包含事件和各种其他数据,经过Flink引擎计算后再输出到其他中间件或者数据库等。

Flink有两个核心概念:

  • State:Flink可以处理有状态的数据,通过自身的state机制来保障作业failover时数据不丢失

  • Event Time:允许用户按照事件时间来处理数据,通过watermark来推动时间前进,这个后面还会详细介绍。主要是系统的时间和事件的时间

Flink主要通过上面两个核心技术来保证exactly-once,比如说作业Failover的时候状态不丢失,就好像没发生故障一样。


2、快照机制


Flink的快照机制主要是为了保障作业failover时不丢失状态。Flink提供了一种轻量级的快照机制,不需要停止作业就可以帮助用户持久化内存中的状态数据。 


上图中的markers(与barrier语义相同)通过流动来触发快照的制作,每一个编号都代表了一次快照,比如编号为n的markers从最上游流动到最下游就代表了一次快照的制作过程。简述如下:

  • 系统发送编号为n的markers到最上游的算子,markers随着数据往下游流动

  • 当下游算子收到marker后,就开始将自身的状态保存到共享存储中

  • 当所有最下游的算子接收到marker并完成算子快照后,本次作业的快照制作完成

一旦作业失败,重启时就可以从快照恢复。

下面为一个简单的demo说明(barrier等同于marker)。


  • barrier到达Source,将状态offset=7存储到共享存储

  • barrier到达Task,将状态sum=21存储到共享存储

  • barrier到达Sink,commit本次快照,标志着快照的成功制作


这时候突然间作业也挂掉,重启时Flink会通过快照恢复各个状态。Source会将自身的offset置为7,Task会将自身的sum置为21。现在我们可以认为1、2、3、4、5、6这6个数字的加和结果并没有丢失。这个时候,offset从7开始消费,跟作业失败前完全对接了起来,确保了exactly-once。 


3、事件时间


时间类型分为两种:

  • Event time(事件时间),指事件发生的时间,比如采集数据时的时间

  • Processing time(系统时间),指系统的时间,比如处理数据时的时间

如果你对数据的准确性要求比较高的话,采用Event time能保障exactly-once。Processing Time一般用于实时消费、精准性要求略低的场景,主要是因为时间生成不是deterministic。

我们可以看下面的关系图,X轴是Event time,Y轴是Processing time。理想情况下Event time和Processing time是相同的,就是说只要有一个事件发生,就可以立刻处理。但是实际场景中,事件发生后往往会经过一定延时才会被处理,这样就会导致我们系统的时间往往会滞后于事件时间。这里它们两个的差 Processing-time lag 表示我们处理事件的延时。


事件时间常用在窗口中,使用watermark来确保数据完备性,比如说watermarker值大于window末尾时间时,我们就可以认为window窗口所有数据都已经到达了,就可以触发计算了。


比如上面[0-10]的窗口,现在watermark走到了10,已经到达了窗口的结束,触发计算SUM=21。如果要是想对迟到的数据再进行触发,可以再定义一下后面late data的触发,比如说后面来了个9,我们的SUM就等于30。


4、窗口机制


窗口机制就是把无界的数据分成数据块来进行计算,主要有三种窗口。

  • 滚动窗口:固定大小的窗口,相邻窗口没有交集

  • 滑动窗口:每个窗口的大小是一样的,但是两个窗口之间会有重合

  • 会话窗口:根据活跃时间聚合而成的窗口,比如活跃时间超过3分钟新起一个窗口。窗口之间留有一定的间隔


窗口会自动管理状态和触发计算,Flink提供了丰富的窗口函数来进行计算。主要包括以下两种:

  • ProcessWindowFunction,全量计算会把所有数据缓存到状态里,一直到窗口结束时统一计算。相对来说,状态会比较大,计算效率也会低一些

  • AggregateFunction,增量计算就是来一条数据就算一条,可能我们的状态就会特别的小,计算效率也会比ProcessWindowFunction高很多,但是如果状态存储在磁盘频繁访问状态可能会影响性能


三、快手Flink实践


1、应用概括


快手应用概括主要是分为数据接入、Flink实时计算、数据应用、数据展示四个部分。各层各司其职、衔接流畅,为用户提供一体化的数据服务流程。



2、实时指标计算


常见的实时指标计算包括uv、pv和sum。这其中uv的计算最为复杂也最为经典。下面我将重点介绍uv。

uv指的是不同用户的个数,我们这边计算的就是不同deviceld的个数,主要的挑战来自三方面:

  • 用户数多,数据量大。活动期间的QPS经常在千万级别,实际计算起来特别复杂

  • 实时性要求高,通常为几秒到分钟结果的输出

  • 稳定性要求高,比如说我们在做春晚活动时候要求故障时间需要低于2%或更少

针对各种各样的uv计算,我们提供了一套成熟的计算流程。主要包含了三方面:

  • 字典方案:将string类型的deviceld转成long类型,方便后续的uv计算

  • 倾斜处理:比如某些大V会导致数据严重倾斜,这时候就需要打散处理

  • 增量计算:比如计算1天的uv,每分钟输出一次结果

字典方案需要确保任何两个不同的deviceId不能映射到相同的long类型数字上。快手内部主要使用过以下三种方案:


  • HBase,基于partition分区建立deviceld到id的映射,通过缓存和批量访问来加速

  • Redis,这种方案严格来说不属于字典,主要通过key-value来判断数据是否首次出现,基于首次数据来计算uv,这样就会把pv和uv的计算进行统一

  • 最后就是一个Flink内部自建的全局字典实现deviceld到id的转换,之后计算UV

这三种方案里面,前两种属于外部存储的字典方案,优点是可以做到多个作业共享1份数据,缺点是外部访问慢而且不太稳定。最后一种Flink字典方案基于state,不依赖外部存储, 性能高但是无法多作业共享。

接下来我们重点介绍基于Flink自身的字典方案,下图主要是建立一个deviceld到id的映射:


主要分成三步走:

1)建立Partition分区,指定一个比较大的Partition分区个数,该个数比较大并且不会变,根据deviceld的哈希值将其映射到指定partition

2)建立id映射。每个Partition都有自己负责的id区间,确保Partition之间的long类型的id不重复,partition内部通过自增id来确保每个deviceId对应一个id

3)使用keyed state保存id映射。这样我们的作业出现并发的大改变时,可以方便的rescale,不需要做其他的操作

除了id转换,后面就是一个实时指标计算的常见问题,就是数据倾斜。业界常见的解决数据倾斜处理方案主要是两种:

  • 打散再聚合:先将倾斜的数据打散计算,然后再聚合计算结果

  • Local-aggregate:先在本地计算预聚合,这样会大大减少下游的数据压力

二者的本质是一样的,都是先预聚合再汇总,从而避免单点性能问题。


上图为计算最小值的热点问题,红色数据为热点数据。如果直接将它们打到同一个分区,会出现性能问题。为了解决倾斜问题,我们通过hash策略将数据分成小的partition来计算,如上图的预计算,最后再将中间结果汇总计算。

当一切就绪后,我们来做增量的UV计算,比如计算1天uv,每分钟输出1次结果。计算方式既可以采用API,也可以采用SQL。

针对API,我们选择了global state+bitmap的组合,既严格遵循了Event Time又减少了state大小:


下面为计算流程(需要注意时区问题):

  • 定义跟触发间隔一样大小的window(比如1分钟)

  • Global state用来保存跨窗口的状态,我们采用bitmap来存储状态

  • 每隔一个window触发一次,输出起始至今的UV

  • 当前作用域(比如1天)结束,清空状态重新开始

针对SQL,增量计算支持的还不是那么完善,但是可以利用early-fire的参数来提前触发窗口。

配置如下:


table.exec.emit.early-fire.enabled:truetable.exec.emit.early-fire.delay:60 s

early-fire.delay就是每分钟输出一次结果的意思。

SQL如下:


SELECT TUMBLE_ROWTIME(eventTime, interval ‘1’ day) AS rowtime, dimension, count(distinct id) as uvFROM personGROUP BY TUMBLE(eventTime, interval '1' day), dimension

如果遇到倾斜,可以参考上一步来处理。


3、快速failover


最后看下我们部门最近发力的一个方向,如何快速failover。

Flink作业都是long-running的在线作业,很多对可用性的要求特别高,尤其是跟公司核心业务相关的作业,SLA要求4个9甚至更高。当作业遇到故障时,如何快速恢复对我们来说是一个巨大的挑战。

下面分三个方面来展开:

  • Flink当前已有的快速恢复方案

  • 基于container宕掉的快速恢复

  • 基于机器宕掉的快速恢复

1)Flink当前已有的快速恢复方案

Flink当前已有的快速恢复方案主要包括以下两种:

  • region failover。如果流式作业的DAG包含多个子图或者pipeline,那么task失败时只会影响其所属的子图或者pipeline,而不用整个DAG都重新启动

  • local recovery。在Flink将快照同步到共享存储的同时,在本地磁盘也保存一份快照。作业失败恢复时,可以调度到上次部署的位置,并从local disk进行快照恢复

2)基于container宕掉的快速恢复

实际环境中,container宕掉再申请有时会长达几十秒,比如因为hdfs慢、yarn慢等原因,严重影响恢复速度。为此,我们做了如下优化:

  • 冗余资源。维持固定个数的冗余container,一旦container宕掉,冗余container立刻候补上来,省去了繁杂的资源申请流程

  • 提前申请。一旦发现作业因为container宕掉而失败,立刻申请新的container


以上优化覆盖了很大一部分场景,恢复时间从30s-60s降到20s以内。

3)基于机器宕掉的快速恢复

机器宕掉时,flink on yarn的恢复时间超过3分钟,这对重要作业显然是无法容忍的!为了做到快速恢复,我们需要做到快速感知和恢复:

  • 冗余资源并打散分配,确保两个冗余资源不在一个host,redundantContainerNum=max(containerNumOfHost) + 1

  • 作业宕机,Hawk监测系统5秒内发现

  • 冗余资源快速候补,免去申请资源的流程


通过这种方案,我们可以容忍任意一台机器的宕机,并将宕机恢复时间由原先的3分钟降低到30秒以内。

四、总结

本文从大数据系统的发展入手,进而延伸出流式系统的关键概念,之后介绍了Flink的关键特性,最后讲解了快手内部的实时指标计算和快速failover,希望对大家有所帮助。

>>>>

Q&A


Q1:打算做实时计算,可以跳过storm spark直接上手flink吗?

A:可以直接使用Flink。Storm在failover时会丢失数据,无法做到exactly-once;spark streaming是Flink的竞争者,是在批处理的基础上实现流计算,相比而言,Flink的底层是流处理,更加适合流计算。

Q2:一般怎么处理taskmanager heartbeat timeout?

A:默认10秒汇报一次心跳,心跳超时为50秒,这个时候作业会失败,如果配置了高可用那么会重启。

Q3:如何保证2天大时间跨度延迟消息的窗口计算?

A:这里主要的挑战在于时间长、状态大,建议stateBakend使用rocksdb(可以利用磁盘存储大状态),窗口计算建议使用增量计算来减少状态的大小。

Q4:flink on yarn. yarn重启会自动拉起flink任务吗,说不能拉起怎么处理,手动启动吗?

A:如果配置了高可用(依赖zookeeper),作业失败了就可以自动拉起。

Q5:kafka目前多用作数据中转平台,flink相当于替代了kafka stream吗?
  
A:Kafka的核心功能是消息中间件,kafka stream可以跟kafka很好的集成,但并不是一个专业的计算引擎。相比而言,flink是一个分布式的流式计算引擎,功能上更加强大。

Q6:你们怎么看待apache beams?

A:Apache beam在上层进行了抽象,可以类比SQL,只定义规范,底层可以接入各种计算引擎。   

推荐文章:

实用篇 | 一次Java内存泄露的排查

从 Spark Streaming 到 Apache Flink:bilibili 实时平台的架构与实践

知乎实时数仓架构演进

菜鸟供应链实时数仓的架构演进及应用场景

实时离线一体化助力渠道分析系统

美团点评实时数仓平台演进与实践

如何设计实时数据平台 —— 技术选型与架构设计

数仓大法好!跨境电商 Shopee 的实时数仓之路

OPPO 实时数仓揭秘:从顶层设计实现离线与实时的平滑迁移

海量数据实时分析服务技术架构演进

关注大数据学习与分享,获取更多技术干货

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存