查看原文
其他

当 Structured Streaming 碰到 kafka

spark君 张江打工人 2021-09-05

本文中,我们来看下用  Structured Streaming 怎么处理 kafka中的复杂json 数据流,Structured Streaming 强项我们都知道,可以使用 event-time  进行聚合,可以使用  watermark处理延迟数据, 保证 exactly-once, 可以输出各种外部系统,spark 和 kafka 一起可以让你放飞自我

  • 可以让你流式处理和批处理的代码一样一样的

  • 整合各种外部存储系统,比如 S3,HDFS, MySQL

  • 对每个批次的增使用 Catalyst 优化器 进行优化

我们先来了解下 kafka, 然后举几个  Structured Streaming 读写 kafka的例子,然后看一个 真实使用场景。


 / kafka 了解一下?/ 


kafka 是现在最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景, kafka 基本是标配。

kafka 把生产者发送的数据放在不同的分区里面,这样就可以并行进行消费了。每个分区里面的数据都是递增有序的,跟 structured commit log 类似,生产者和消费者使用 kafka 进行解耦,消费者不管你生产者发送的速率如何,我只要按照我们的节奏进行消费就可以了。每条消息在一个分区里面都有一个唯一的序列号 offset, kafka 会对内部存储的消息设置一个过期时间,如果过期了,就会标记删除,不管这条消息有没有被消费。


kafka 消费策略

kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了,这就涉及一个问题,就是你如果开始消费,就要定一下从什么位置开始。

  • earliest 从最起始位置开始消费,当然不一定是从 0 开始,因为如果数据过期了,就清掉了,所以可以理解为从现存的数据里最小位置开始消费。

  • latest 这个好理解,从最末位置开始消费。

  • per-partition assignment , 对每个分区都指定一个 offset, 然后从这组 offset 位置开始消费。


当你第一次开始消费一个 kafka 流的时候,上述策略任选其一,如果你之前已经消费了,而且做了 checkpoint ,比如你消费程序升级了, 这时候就会从你上次结束的位置开始继续消费。


 / Structured streaming 对kafka的支持 / 


Structured Streaming 对 批处理 和流处理 提供统一的 API, 这一点在我看来是很重要的,我们一套相同的处理逻辑,没必要写出来两套代码,减少了维护成本。 Structured Streaming 从 kafka 拉取消息,然后就可以把 流数据看做 一个 DataFrame , 一张无限增长的大表,在这个大表上做查询,Structured Streaming 给你保证了端到端的 exactly-once,你只需要关心你的业务即可,不用费心去关心底层是怎么做的。

从 kafka topics 中读取消息

首先需要你指定你的数据源,kafka 集群的连接地址,你需要消费的 topic, 指定topic 的时候,可以使用正则来指定,也可以指定一个 topic 的集合,是不是很灵活?我们这里只消费一个 topic


这个例子里面我们指定消费 topic1, 在 spark 内部 DataStreamReader 会根据你的 配置来 拉取指定的 topic,  kafka.bootstrap.servers (i.e. host:port) 地址   和 topic 这两个参数是必填的。startingOffsets 可以选填,默认是 从 latest 位置开始消费

df.printSchema() , 我们来打印一些 DataFrame 的 schema

我们可以看到,schema 里面包含了 kafka record 里面的key value 和 相关 元信息字段,到这时候,我们就可以使用 DataFrame 和 Dataset 的方式来进行转换处理了,有人问,如果 kafka 消息的 value 字节数组里面包含的 是 json 数据怎么办,我们在这里拿到的 key 和 value 都是  字节数组,需要解析一下格式,当然  Spark SQL 内嵌了一些类型处理方法了。

Data Stored as a UTF8 String

把字节数据转为 strings 类型

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Data Stored as JSON

如果你在kafka 里面保存的是 json 格式,我们可以使用内嵌的 from_json 去解析,你需要传入一个 schema, 然后就变成了 Spark SQL  的类型了。


使用UDF注册反序列化类


你也可以注册一个 udf 进行反序列化,你可以实现一个 MyDeserialzer ,只要实现了 Kafka Deserializer 的接口即可


 / 输出数据到 kafka / 


往 kafka 里面写数据也类似,我们可以在 DataFrame 上调用 writeStream  来写入 kafka, 指定value, key 是可选的,如果你不指定,就是null。如果 key 为null, 有时候可能导致 分区数据不均匀,需要注意一下。

需要打到哪个 topic, 可以在  DataStreamWriter 上指定 option 配置, 也可以操作 DataFrame 的时候 在每条 record 上加一列 topic 字段指定。


这个例子中,DataFrame 中是用户信息数据,我们把 序列化后的 userId 作为 key,然后把所有字段 都转成 json string 格式当做 value

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存