查看原文
其他

spark streaming 读取 kafka 的各种姿势解析

spark技术分享 张江打工人 2021-09-05

姐妹篇

spark streaming 如何控制 拉取 kafka 的速度

spark streaming 处理 kafka中数据如何做到 exact once

使用  spark-shell 探索kafka中的数据

spark streaming 数据源获取的两种方式

我们需要先有一点背景知识,一个是我们需要搞清楚 spark streaming 数据源获取的两种方式

  • 接收方式, 接收方式的数据源 DStream 一般都继承了 ReceiverInputDStream, spark streaming 的启动的时候,会分发一个spark job, 目的就是到 各个executor 上去启动数据接收器, 例如  SocketInputDStream 就是要到每个executor 上去启动一个线程监听端口去接收数据, 然后把数据一块块放在每个 executor 的BlockManager(不懂的可以参考  spark 自己的分布式存储系统 - BlockManager) 并且把元信息汇报到 driver 上面,  最终 ReceiverInputDStream 计算的时候会产生 一个可以根据元信息去 BlockManager中拿数据的BlockRDD。



  • 拉取方式,  这种就比较简单了, 就是直接去数据源拉取数据,  举例,FileInputDStream 每个batch都去 hdfs 的某个目录下去轮询有没有新文件,如果发现新文件,就去使用 hdfs 的接口拉取文件内容, DirectKafkaInputDStream 这个DStream 计算的时候就是直接去kafka中拉取数据。

kafka 的consumer 版本

还有一个背景知识是,kafka 的consumer客户端最开始是使用 scala实现的, 这个是0.8 的版本

  • ”high-level”的消费者API,可以支持消费组和故障处理,但是不支持更多更复杂的场景需求

  • 简单的消费者客户端(SimpleConsumer,即low-level),可以支持自定义的控制,但是需要应用程序自己管理故障和错误处理.

还有一个 0.10 使用 java 实现的 kafka consumer 客户端, 它不再依赖scala运行时环境和zookeeper.在你的项目中可以作为一个轻量级的库,

spark 集成 kafka 中的版本

所以在 spark 的源文件中, 对 kafka 的数据集成部分为 kafka-0-8(针对 kafka 0.8.2.1 or higher ) 和 kafka-0-10(针对kafka 0.10.0 or higher) 两个版本。

spark的kafka-0-8版本消费kafka 数据有两种方式,一种 DirectKafkaInputDStream(拉取方式), 一种是 KafkaInputDStream(接受方式),两种方式各有优缺点,

  • 拉取方式,可以根据自己处理的速率进行拉取数据, 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了,可以创建跟 kafka partition 一样多的 RDD partition, 并行从 kafka中读取数据,kafka partition 和 RDD partition 一一对应,  周期性使用 Simple consumer api 来获取指定  kafka offset 范围的数据。

  • 接受方式, 一旦你的Batch Processing 被delay了,或者被delay了很多个batch,那估计你的Spark Streaming程序离奔溃也就不远了。 Direct Approach (No Receivers) 则完全不会存在类似问题。就算你delay了很多个batch time,你内存中的数据只有这次处理的。需要开启数据冷热备份机制来保证数据可靠性,浪费性能。

spark的kafka-0-10 版本中,就只保留了 DirectKafkaInputDStream (拉取方式), 废弃了接受方式。

今天我们着重介绍下 kafka-0-8版本 的 KafkaInputDStream(接受方式) 和  kafka-0-10 版本中  DirectKafkaInputDStream (拉取方式)

KafkaInputDStream(接受方式)

官方给的一个例子

import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext,     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

由于KafkaInputDStream 继承了 ReceiverInputDStream , 属于接收方式, 所以启动的时候会到各个executor 上去启动数据接收器 KafkaReceiver, KafkaReceiver 创建一个线程池,每个线程对应一个 stream, 然后使用 high level 的api 去brocker 上去拉取数据然后放在local的 blockmanager 上, 所有 executor 的 KafkaReceiver 对应的 kafka consumer 属于同一个 consumer group, consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费, 并且在分区改变获取某些消费者crash的时候会进行 Rebalance。

这种方式有很多弊端,

  • 如果数据延迟了,会有多个 batch 的数据都堆积在 executor 的 blockmanager 内存中,这样会对内存造成很大的压力, 估计你的Spark Streaming程序离奔溃也就不远了。

  • 由于 数据接收器 KafkaReceiver 是一个常驻在 executor的线程, 数据接收和数据处理,不是在一块的, 因为数据接收要一直进行,使用的是高层 api, 没有直接维护每个 batch 的 offset range, 所以没有办法保存每个 batch对应的kafka offset range 等元信息,来进行容错回溯, 所以只能自己拉取到数据后再进行冷备份,存储在hdfs,  或者多个 executor 直接拷贝热备份, 会浪费很大的性能, 大大降低数据处理的速度。

  • Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the KafkaUtils.createStream() only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data 这个是官方给的一个说明, 意思是 kafka 分区跟 rdd partition 不是一一对应的, 所以增加每个 topic 指定的分区数目,只是增加单个接受器的线程数目, 不是增加处理的并行度。

我个人不推荐使用这种方式。

DirectKafkaInputDStream (拉取方式)

官方给的一个例子

import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe val kafkaParams = Map[String, Object](  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",  "key.deserializer" -> classOf[StringDeserializer],  "value.deserializer" -> classOf[StringDeserializer],  "group.id" -> "use_a_separate_group_id_for_each_stream",  "auto.offset.reset" -> "latest",  "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("topicA", "topicB") val stream = KafkaUtils.createDirectStream[String, String](  streamingContext,  PreferConsistent,  Subscribe[String, String](topics, kafkaParams) ) stream.map(record => (record.key, record.value))

DirectKafkaInputDStream 这种方式在 kafka-0-10 版本中, Subscribe 方法可以传入topic 和其他配置, 当然也可以传入起始offset数组, DirectKafkaInputDStream compute 计算的时候会从Subscribe这个类中获取到 kafka consumer, 这个新的消费者结合了旧的API中”simple”和”high-level”消费者客户端两种功能,能够同时提供消费者协调(高级API)和lower-level的访问,来构建自定义的消费策略。这个新的消费者完全使用java编写,它不再依赖scala运行时环境和zookeeper。 spark streaming 定时动态根据 DAG 静态模板 DStreamGraph生成 job,  Job 动态生成的时候,针对每个 batch,都将根据这个“模板”生成一个 RDD DAG 的实例。 对 spark streaming 处理流程不太清楚的可以参考 Spark streaming 设计与实现剖析 针对每个batch,都会调用 DirectKafkaInputDStream 的 compute方法,最终生成一个KafkaRDD, 这个方法会做以下几件事情

  • 调用 kafka consumer 的 assignment 方法获取到订阅的所有topic的所有分区的集合 Set

    , 如果在处理过程中kafka topic里面动态增加分区数目了,需要加到 batch对应的kafka offset range 等元信息 currentOffsets 数据结构中。这个数组是用来记录当前 batch处理的数据的起始位置数组。
  • 调用  kafka consumer 的 partitionsFor 方法获取 partition leader 所在的host 用来确定位置偏好,尽量让计算追着数据走,把task调度到数据所在的host上的executor上去处理。 如果你的 spark跟kafka是混部的,是可以提高很大性能的。

  • 调用  kafka consumer 的 seekToEnd 方法对当前kafka 每个分区最新的数据位置做一个快照,也就是这个batch 我就处理到这个位置了, 并且把这个数组当做下个batch的 起始位置数组。

  • 然后把每个分区 起始位置数组和 最新位置数组组成一个 消费的数据范围,并且包装为一个 KafkaRDD 返回。

KafkaRDD 在 compute的时候会吧 kafka中的分区和rdd分区一一对应, 最为一个 KafkaRDDPartition, 并且对 所有的 kafka 连接客户端进行了缓存,如果消费了以前消费过的分区,会取出缓存中的 KafkaConsumer client。 值得注意的是,这里新建的KafkaConsumer client都是调用了 assign方法手动确定自己要消费的分区,

Manual topic assignment through this method does not use the consumer’s group management  functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic  metadata change

KafkaConsumer 源文件中给了说明, 这种使用方式相当于没有使用 consumer group。 也不会触发 rebalance 操作。然后就seek到指定位置,poll kafka中的数据。

欢迎关注 spark技术分享:

                                     

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存