Kafka源码系列之Consumer高级API性能分析
一,kafka的Consumer的高级API内部模型介绍
1,一个主线程,入口,也是数据迭代的出口
用户主线程使用KafkaStream(ConsumerIterator)迭代获取ConsumerFetcherThread取得的数据。
2,一个定时调度线程
实际是ScheduledThreadPoolExecutor,负责按照用户设置的自动提交偏移的时间间隔定时将偏移信息提交到zookeeper。
3,一个FindLeader线程
主要负责,从zookeeper获取Broker信息,然后随机选举一台Broker,并通过创建一个同步的生产者SyncProducer去获取topic的元数据。然后会创建分区leader所在Broker数目个(每个Broker只会创建一个),ConsumerFetcherThread。
4,若干取数据的线程
负责获取数据,将消息加入队列,供用户主线程使用KafkaStream(ConsumerIterator)迭代获取数据。
5,两个重点
一处是,消费者的消费偏移等信息如何被定时线程获取,并提交到zookeeper的。
kafka.conusmer.ZookeeperConsumerConnector的topicRegistry中存储的partitionTopicInfo相关的引用,经过整理传递给了ConsumerFetcherThread,每次获取数据后都会更新其offset,然后,就可以在方法commitOffsets使用topicRegistry,将偏移提交到zookeeper(可手动调用也可开启自动提交其实就是一个ScheduledThreadPoolExecutor在定时调用commitOffsets)。
另一处,就是ConsumerFetcherThread线程获取的数据是如何被得迭代器KafkaStream(ConsumerIterator)得到并将数返回的。
在kafka.conusmer.ZookeeperConsumerConnector的conusme方法中创建了一个链表阻塞对列,并将引用传递给了KafkaStream。
new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
在reinitializeConsumer中,对列被加入到topicThreadIdAndQueues,然后在addPartitionTopicInfo方法中被加入到了topicRegistry,再在updateFetcher方法中经整理传递给了ConsumerFetcherManager的partitionMap变量,最终传递给ConsumerFetcherthread,进而可被其获取,并向其中添加消费到的消息,然后我们的主线程就可以使用KafkaStream从队列里取消息并进行具体处理了。是一个典型的生产消费模型。
二,主要类介绍
KafkaStream
该类主要封装了ConsumerIterator。主要是负责从BlockingQueue获取,消费者线程ConsumerFetcherthread写入队列的消息。
ZookeeperConsumerConnector
kafka.conusmer.ZookeeperConsumerConnector主要协调消费者于zookeeper的交互。
ConsumerFetcherManager
继承了AbstractFetcherManager。主要作用可以归纳为两点,一是创建了查找partition的leader的线程LeaderFinderThread,然后在其中创建了对于分布于leader的Broker数个消费者线程ConsumerFetcherThread。
LeaderFinderThread
主要是作用是获取partition的leader。可以分成两个步骤:
1.先从zookeeper上获取Broker的信息。
2.构建随机选择一台Broker,构建SyncProducer获取topic的元数据。
ConsumerFetcherThread
继承了AbstractFetcherThread。主要职责是从同一个Broker上获取订阅的partition的数据。获取数据最终使用的对象是SimpleConsumer。
主要是做了两步更新:1,更新了偏移,2,往消息队列里添加消息,供KafkaStream获取。
三,具体源码介绍
1,消费者消费数据的过程
(1),获取Broker信息
在构建ZookeeperConsumerConnector的时候创建了,ConsumerFetcherManager,在调用其rebalance,会重新构建消费者和分区的分配关系,并将调用updateFetcher,重新构建LeaderFinderThread在其doworker方法中获取所有Broker信息
//获取集群的所有broker
val brokers = getAllBrokersInCluster(zkClient)
(2),随机选择一个Broker,然后获取topic的元数据
//获取topic所有元数据信息
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
brokers,
config.clientId,
config.socketTimeoutMs,
correlationId.getAndIncrement).topicsMetadata
构建SyncProducer,去获取topic的元数据信息
val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
al producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
topicMetadataResponse = producer.send(topicMetadataRequest)
(3)构建有订阅partition leader的Broker总数个ConsumerFetcher
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
mapLock synchronized {
val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => //分组求和
BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
case None =>
fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
fetcherThread.start
}
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
topicAndPartition -> brokerAndInitOffset.initOffset
})
}
}
(4),获取数据
在构建ConsumerFetcherThread后,在其父类AbstractFetcherThread中构建了SimpleConsumer最终使用其去获取数据
在doWork中构建了FetcherRequest并调用processFetchRequest方法获取数据。
response = simpleConsumer.fetch(fetchRequest)
(5),将数据添加到队列中
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
val pti = partitionMap(topicAndPartition)
if (pti.getFetchOffset != fetchOffset)
throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
.format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
}
(6),然后在kafkaStream(ConsumerIterator)迭代输出
主要是在makeNext方法中
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
localCurrent = currentDataChunk.messages.iterator
var item = localCurrent.next()
然后我们用户代码
while(it.hasNext())
System.out.println(new String(it.next().message()));
2,将偏移提交至zookeeper的过程
主要区别在步骤7和6
步骤6,获取已经消费的偏移
val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
partitionTopicInfos.map { case (partition, info) =>
TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
}
}.toSeq:_*)
步骤7,将偏移提交至Zookeeper
offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) =>
commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
}
def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
if (checkpointedZkOffsets.get(topicPartition) != offset) {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
checkpointedZkOffsets.put(topicPartition, offset)
zkCommitMeter.mark()
}
}
3,消费者rebalance的过程
主要有以下几个步骤
A),在ZookeeperConusmerConnector方法中,构建ZKRebalancerListener,并指定其监控的目录为:/consumers/groupid/ids
B),进行rebalance
首次,启动的话直接
loadBalancerListener.syncedRebalance()
每当有消费者加入或者退出
if (doRebalance)
syncedRebalance
C),获取所有的Broker
val brokers = getAllBrokersInCluster(zkClient)
D),解除掉原有的所属关系并释放掉原有的fetcher
//关闭之前所有的Fetcher
closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
//释放掉原有的关系
releasePartitionOwnership(topicRegistry)
E),获取当前消费者和分区的对应关系
val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
//重新根据获取分配策略,得到分配后的关系
val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
F),更新topicRegistry
val offsetFetchResponse = offsetFetchResponseOpt.get
topicPartitions.foreach(topicAndPartition => {
val (topic, partition) = topicAndPartition.asTuple
val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
val threadId = partitionOwnershipDecision(topicAndPartition)
addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
})
/**
* move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
* A rebalancing attempt is completed successfully only after the fetchers have been started correctly
*/
if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size
partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
.foreach { case (topic, partitionThreadPairs) =>
newGauge("OwnedPartitionsCount",
new Gauge[Int] {
def value() = partitionThreadPairs.size
},
ownedPartitionsCountMetricTags(topic))
}
topicRegistry = currentTopicRegistry
G),构建LeaderFinderThread和ConsumerFetcherThread
updateFetcher(cluster)
至此结束。
四,总结
Consumer高级API底层获取kafka的数据使用的依然是SimpleConusmer,只是在上层进行了封装,使我们能更方便的构建自己的消费者,主要特点如下:
1,帮助我们完成了partition leader的查找。
2,Conusmer上下线partition跟cousumer关系的重分配。(目前两种重分配策略,后面讲)。该特征是分组消费的根本。
分组消费,消费者跟分区对应的关系:
A),一个分区只能被同一个组的一个消费者消费
B),同一个组的消费者可以同时消费多个分区
C),假如消费者数目大于分区数,会有消费者无法消费而空闲。
3,消费偏移自动管理。消费偏移被自动维护到集群
那么这种消费者的缺点是什么呢?
1,创建了过多的线程
A,调度线程,自动提交偏移
B,LeaderFinderThread线程
C,订阅分区leader分布所在Broker总数个ConsumerFetcherThread
2,维护偏移到Zookeeper,并未实现一次消费语义
本文乃原创,不足之处希望大家指正批评。
欢迎大家关注浪尖的公众号,一起开启分布式学习之旅。