实时日志ETL服务,需要在日志的格式、字段发生变化时保证正常解析;
实时NLP服务,需要及时识别新添加的领域词与停用词;
实时风控服务,需要根据业务情况调整触发警告的规则。
那么问题来了:配置每次变化都得手动修改代码,再重启作业吗?答案显然是否定的,毕竟实时任务的终极目标就是7 x 24无间断运行。Spark Streaming和Flink的广播机制都能做到这点,本文分别来简单说明一下。
Spark Streaming的场合
Spark Core内部的广播机制: 广播变量(broadcast variable)的设计初衷是简单地作为只读缓存,在Driver与Executor间共享数据,Spark文档中的原话如下:
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
也就是说原生并未支持广播变量的更新,所以我们得自己稍微hack一下。直接贴代码吧。
public class BroadcastStringPeriodicUpdater { private static final int PERIOD = 60 * 1000; private static volatile BroadcastStringPeriodicUpdater instance; private Broadcast<String> broadcast; private long lastUpdate = 0L; private BroadcastStringPeriodicUpdater() {} public static BroadcastStringPeriodicUpdater getInstance() { if (instance == null) { synchronized (BroadcastStringPeriodicUpdater.class) { if (instance == null) { instance = new BroadcastStringPeriodicUpdater(); } } } return instance; } public String updateAndGet(SparkContext sc) { long now = System.currentTimeMillis(); long offset = now - lastUpdate; if (offset > PERIOD || broadcast == null) { if (broadcast != null) { broadcast.unpersist(); } lastUpdate = now; String value = fetchBroadcastValue(); broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value); } return broadcast.getValue(); } private String fetchBroadcastValue() { }}
这段代码将字符串型广播变量的更新包装成了一个单例类,更新周期是60秒。在Streaming主程序中,就可以这样使用了:
dStream.transform(rdd -> {
String broadcastValue = BroadcastStringPeriodicUpdater.getInstance().updateAndGet(rdd.context());
rdd.mapPartitions(records -> {
});
});
这种方法基本上解决了问题,但不是十全十美的,因为广播数据的更新始终是周期性的,并且周期不能太短(得考虑外部存储的压力),从根本上讲还是受Spark Streaming微批次的设计理念限制的。接下来看看Flink是怎样做的。
Flink的场合
Flink中也有与Spark类似的广播变量,用法也几乎相同。但是Flink在1.5版本引入了更加灵活的广播状态(broadcast state),可以视为operator state的一种特殊情况。它能够将一个流中的数据(通常是较少量的数据)广播到下游算子的所有并发实例中,实现真正的低延迟动态更新。
下图来自Data Artisans(被阿里收购了的Flink母公司)的PPT,其中流A是普通的数据流,流B就是含有配置信息的广播流(broadcast stream),也可以叫控制流(control stream)。流A的数据按照keyBy()算子的规则发往下游,而流B的数据会广播,最后再将这两个流的数据连接到一起进行处理。
既然它的名字叫“广播状态”,那么就一定要有与它对应的状态描述符StateDescriptor。Flink直接使用了MapStateDescriptor作为广播的状态描述符,方便存储多种不同的广播数据。示例:
MapStateDescriptor<String, String> broadcastStateDesc = new MapStateDescriptor<>( "broadcast-state-desc", String.class, String.class );
接下来在控制流controlStream上调用broadcast()方法,将它转换成广播流BroadcastStream。controlStream的产生方法与正常数据流没什么不同,一般是从消息队列的某个特定topic读取。
BroadcastStream<String> broadcastStream = controlStream
.setParallelism(1)
.broadcast(broadcastStateDesc);
然后在DataStream上调用connect()方法,将它与广播流连接起来,生成BroadcastConnectedStream。
BroadcastConnectedStream<String, String> connectedStream =
sourceStream.connect(broadcastStream);
最后就要调用process()方法对连接起来的流进行处理了。如果DataStream是一个普通的流,
需要定义BroadcastProcessFunction,反之,如果该DataStream是一个KeyedStream,
就需要定义KeyedBroadcastProcessFunction。
并且与之前我们常见的ProcessFunction不同的是,它们都多了一个专门处理广播数据的方法
processBroadcastElement()。类图如下所示。
connectedStream.process(new BroadcastProcessFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);
for (Entry<String, String> entry : state.immutableEntries()) {
String bKey = entry.getKey();
String bValue = entry.getValue();
// 根据广播数据进行原数据流的各种处理
}
out.collect(value);
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);
// 如果需要的话,对广播数据进行转换,最后写入状态
state.put("some_key", value);
}
});
如果觉得文章对你有帮助,请转发朋友圈、点在看,让更多人获益,感谢您的支持!
END
关注我
公众号(zhisheng)里回复 面经、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
Flink 实战
1、《从0到1学习Flink》—— Apache Flink 介绍
2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3、《从0到1学习Flink》—— Flink 配置文件详解
4、《从0到1学习Flink》—— Data Source 介绍
5、《从0到1学习Flink》—— 如何自定义 Data Source ?
6、《从0到1学习Flink》—— Data Sink 介绍
7、《从0到1学习Flink》—— 如何自定义 Data Sink ?
8、《从0到1学习Flink》—— Flink Data transformation(转换)
9、《从0到1学习Flink》—— 介绍 Flink 中的 Stream Windows
10、《从0到1学习Flink》—— Flink 中的几种 Time 详解
11、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 ElasticSearch
12、《从0到1学习Flink》—— Flink 项目如何运行?
13、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 Kafka
14、《从0到1学习Flink》—— Flink JobManager 高可用性配置
15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了
19、大数据“重磅炸弹”——实时计算框架 Flink
20、《Flink 源码解析》—— 源码编译运行
21、为什么说流处理即未来?
22、OPPO数据中台之基石:基于Flink SQL构建实时数据仓库
23、流计算框架 Flink 与 Storm 的性能对比
24、Flink状态管理和容错机制介绍
25、原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
26、Apache Flink 是如何管理好内存的?
27、《从0到1学习Flink》——Flink 中这样管理配置,你知道?
28、《从0到1学习Flink》——Flink 不可以连续 Split(分流)?
29、Flink 从0到1学习—— 分享四本 Flink 的书和二十多篇 Paper 论文
30、360深度实践:Flink与Storm协议级对比
31、Apache Flink 1.9 重大特性提前解读
32、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了
33、美团点评基于 Flink 的实时数仓建设实践
34、Flink 灵魂两百问,这谁顶得住?
35、一文搞懂 Flink 的 Exactly Once 和 At Least Once
36、你公司到底需不需要引入实时计算引擎?
37、Flink 从0到1学习 —— 如何使用 Side Output 来分流?
38、一文让你彻底了解大数据实时计算引擎 Flink
39、基于 Flink 实现的商品实时推荐系统(附源码)
40、如何使用 Flink 每天实时处理百亿条日志?
41、Flink 在趣头条的应用与实践
42、Flink Connector 深度解析
43、滴滴实时计算发展之路及平台架构实践
44、Flink Back Pressure(背压)是怎么实现的?有什么绝妙之处?
45、Flink 实战 | 贝壳找房基于Flink的实时平台建设
46、如何使用 Kubernetes 部署 Flink 应用
47、一文彻底搞懂 Flink 网络流控与反压机制
48、Flink中资源管理机制解读与展望
49、Flink 实时写入数据到 ElasticSearch 性能调优
50、深入理解 Flink 容错机制
51、吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比
Flink 源码解析
知识星球里面可以看到下面文章
文章不错?点个【在看】吧! 👇