Flink性能调优小小总结
1 配置内存
操作场景Flink是依赖内存计算,计算过程中内存不够对Flink的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存使用及剩余情况来判断内存是否变成性能瓶颈,并根据情况优化。监控节点进程的YARN的Container GC日志,如果频繁出现Full GC,需要优化GC。GC的配置:在客户端的"conf/flink-conf.yaml"配置文件中,在“env.java.opts”配置项中添加参数:-XX:+PrintGCDetails
-XX:-OmitStackTraceInFastThrow
-XX:+PrintGCTimeStamps
-XX:+PrintGCDateStamps
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=20
-XX:GCLogFileSize=20M
优化GC
开发Flink应用程序时,优化DataStream的数据分区或分组操作。
2 设置并行度
操作场景并行度控制任务的数量,影响操作后数据被切分成的块数。调整并行度让任务的数量和每个任务处理的数据与机器的处理能力达到最优。查看CPU使用情况和内存占用情况,当任务和数据不是平均分布在各节点,而是集中在个别节点时,可以增大并行度使任务和数据更均匀的分布在各个节点。增加任务的并行度,充分利用集群机器的计算能力,一般并行度设置为集群CPU核数总和的2-3倍。操作步骤任务的并行度可以通过以下四种层次(按优先级从高到低排列)指定,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。算子层次
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
执行环境层次
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
客户端层次
系统层次
3.配置进程参数
操作场景Flink on YARN模式下,有JobManager和TaskManager两种进程。在任务调度和运行的过程中,JobManager和TaskManager承担了很大的责任。因而JobManager和TaskManager的参数配置对Flink应用的执行有着很大的影响意义。用户可通过如下操作对Flink集群性能做优化。操作步骤1.配置JobManager内存JobManager负责任务的调度,以及TaskManager、RM之间的消息通信。当任务数变多,任务平行度增大时,JobManager内存都需要相应增大。您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。在使用yarn-session命令时,添加“-jm MEM”参数设置内存。
在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。
在使用yarn-session命令时,添加“-n NUM”参数设置TaskManager个数。
在使用yarn-cluster命令时,添加“-yn NUM”参数设置TaskManager个数。
在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。
在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。
将在使用yarn-sesion命令时,添加“-tm MEM”参数设置内存。
将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。
设计分区方法
操作场景合理的设计分区依据,可以优化task的切分。在程序编写过程中要尽量分区均匀,这样可以实现每个task数据不倾斜,防止由于某个task的执行时间过长导致整个任务执行缓慢。操作步骤以下是几种分区方法随机分区:将元素随机地进行分区。dataStream.shuffle();
Rebalancing (Round-robin partitioning):基于round-robin对元素进行分区,使得每个分区负责均衡。对于存在数据倾斜的性能优化是很有用的。dataStream.rebalance();
Rescaling:以round-robin的形式将元素分区到下游操作的子集中。如果你想要将数据从一个源的每个并行实例中散发到一些mappers的子集中,用来分散负载,但是又不想要完全rebalance 介入(引入rebalance()),这会非常有用。dataStream.rescale();
广播:广播每个元素到所有分区。dataStream.broadcast();
自定义分区:使用一个用户自定义的Partitioner对每一个元素选择目标task,由于用户对自己的数据更加熟悉,可以按照某个特征进行分区,从而优化任务执行。简单示例如下所示:
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));
// 定义用于分区的key值,返回即属于哪个partition的,该值加1就是对应的子任务的id号
Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() {
@Override
public int partition(Tuple2<String, Integer> key, int numPartitions) {
return (key.f0.length() + key.f1) % numPartitions;
}
};
// 使用Tuple2进行分区的key值
dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
return value;
}
}).print();
配置netty网络通信
操作场景Flink通信主要依赖netty网络,所以在Flink应用执行过程中,netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。操作步骤以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。•“taskmanager.network.netty.num-arenas”:默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。 •“taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。 •“taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。“taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp _ [rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。
“taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。
解决数据倾斜
当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。需要重新设计key,以更小粒度的key使得task大小合理化。
修改并行度。
调用rebalance操作,使数据分区均匀。
由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。示例可以参考如下:env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
Checkpoint 调优
1.什么是 checkpoint 简单地说就是 Flink 为了达到容错和 exactly-once 语义的功能,定期把 state 持久化下来,而这一持久化的过程就叫做 checkpoint ,它是 Flink Job 在某一时刻全局状态的快照。当我们要对分布式系统实现一个全局状态保留的功能时,传统方案会引入一个统一时钟,通过分布式系统中的 master 节点广播出去给每一个 slaves 节点,当节点接收到这个统一时钟时,它们就记录下自己当前的状态即可。Flink 作业的问题定位
1.问题定位口诀“一压二查三指标,延迟吞吐是核心。时刻关注资源量 , 排查首先看GC。”一压是指背压,遇到问题先看背压的情况,二查就是指 checkpoint ,对齐数据的时间是否很长,state 是否很大,这些都是和系统吞吐密切相关的,三指标就是指 Flink UI 那块的一些展示,我们的主要关注点其实就是延迟和吞吐,系统资源,还有就是 GC logs。看反压 :通常最后一个被压高的 subTask 的下游就是 job 的瓶颈之一。
看 Checkpoint 时长 :Checkpoint 时长能在一定程度影响 job 的整体吞吐。
看核心指标 :指标是对一个任务性能精准判断的依据,延迟指标和吞吐则是其中最为关键的指标。
资源的使用率:提高资源的利用率是最终的目的。
在关注背压的时候大家往往忽略了数据的序列化和反序列化,过程所造成的性能问题。
一些数据结构 ,比如 HashMap 和 HashSet 这种 key 需要经过 hash 计算的数据结构,在数据量大的时候使用 keyby 进行操作, 造成的性能影响是非常大的。
数据倾斜 是我们的经典问题,后面再进行展开。
如果我们的下游是 MySQL,HBase这种,我们都会进行一个批处理的操作,就是让数据存储到一个 buffer 里面,在达到某些条件的时候再进行发送,这样做的目的就是减少和外部5. 系统的交互,降低 网络开销 的成本。
频繁GC ,无论是 CMS 也好,G1也好,在进行 GC 的时候,都会停止整个作业的运行,GC 时间较长还会导致 JobManager 和 TaskManager 没有办法准时发送心跳,此时 JobManager 就会认为此 TaskManager 失联,它就会另外开启一个新的 TaskManager
窗口是一种可以把无限数据切割为有限数据块的手段。比如我们知道,使用滑动窗口的时候数据的重叠问题,size = 5min 虽然不属于大窗口的范畴,可是 step = 1s 代表1秒就要进行一次数据的处理,这样就会造成数据的重叠很高,数据量很大的问题。
欢迎点赞+收藏+转发朋友圈素质三连