Kafka源码系列之通过源码分析Producer性能瓶颈
Kafka源码系列之通过源码分析Producer性能瓶颈
本文,kafka源码是以0.8.2.2,原因是浪尖一直没对kafka系统进行升级。主要是java的kafka生产者源码,Broker接收到producer请求之后处理的相关源码。估计源码内容是比较多的,只给出大致逻辑,主类和函数名称。本文的目的是让大家,彻底了解发送消息到kafka的过程及如何对producer进行调优。
一,kafka的producer基本介绍及主要类
1,基本介绍
Kafka的Producer,主要负责将消息发送给kafka集群。主要核心特性有两点:
1),异步 or 同步。
可以通过配置producer.type(1或async和2或sync)
设置为异步的话,其实就是启动了一个后台线程,负责从队列里面取数据发送给kafka,我们的主程序负责往队列了写数据。支持批发送。
2),消息的key决定分区。
可以采用分区器,来决定我们的消息发往哪个partition。实现Partitioner可以实现自定义分区器。默认的分区策略是对key取hash值,然后对总的分区数取余,得到的余数作为我们发送消息到哪个分区的依据。
2,主要类
Producer
kafka.producer.Producer该类异常重要,负责对DefaultEventHandler进行初始化并且在此过程也初始化真正的发送者池ProducerPool。
异步发送消息的策略情况下会对初始化我们后台发送线程。
DefaultEventHandler
该类主要是为消息发送做准备,比如更新broker信息,找到分区的leader等,最终通过SyncProducer将消息发送给Broker。
ProducerSendThread
这个是异步发送情况下才会有的,一个后台发送线程。这种模式下,生产端实际是形成了一个生产消费模型,用户调用Producer.send实际是将消息添加到了一个消息队列里面LinkedBlockingQueue,然后由ProducerSendThread的processEvents,批量处理后交给了DefaultEventHandler。
ProducerPool
维护了Broker数目个SyncProducer,会在每次发送消息前更新。
SyncProducer
每个SyncProducer对象,包含了一个到一个broker链接,我们通过获取该对象来发消息到特定的broker。
BrokerPartitionInfo
该类主要是在获取topic元数据信息的时候会用到。在更新元数据的时候会同时更新我们ProducerPool,避免Borker挂掉之后SyncProducer对象不可用。
Partitioner
主要是分区器。对我们的消息按照key进行分区。
KafkaApis
Kafka的各种请求的逻辑处理类。
Processor
负责应答请求。
二,源码讲解
producer与Broker通信骨干
由上图可以看出,生产者于消费者通讯的骨干分两类:
1,TopicMetadataRequest
RequestId:RequestKeys.MetadataKey。主要是负责请求topic的元数据,更新Producer里面syncProducers链接信息,也获取了topic的具体信息。
在DefaultEventHandler的Handle方法里,开启了元数据的更新
brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)
def updateInfo(topics: Set[String], correlationId: Int) {
var topicsMetadata: Seq[TopicMetadata] = Nil
val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
在ClientUtils.fetchTopicMetadata方法里,会根据我们给定的broker列表,随机选出一个创建一个SyncProducer,去获取元数据。获取到就会推出(一台Broker会包含所有的元数据)。
val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
// shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the
// same broker
val shuffledBrokers = Random.shuffle(brokers)
while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
}
catch {
case e: Throwable =>
warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
.format(correlationId, topics, shuffledBrokers(i).toString), e)
t = e
} finally {
i = i + 1
producer.close()
}
}
ducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
TopicMetadataRequest被KafkaApis识别为获取topic元数据的请求,最终调用的处理函数是handleTopicMetadataRequest(request)。
2,ProducerRequest
RequestId:RequestKeys.ProduceKey。被KafkaApis识别为消息发送请求。会调用的处理函数handleProducerOrOffsetCommitRequest(request)。
具体过程如下:
根据上图可知,异步通讯这种方式其实涵盖了,同步通讯。所以,这块源码呢,我们主要关注异步通讯。
1),初始化
前面已经说过了在构建kafka.producer.Producer对象的时候会初始化ProducerPool和DefaultEventHandler。
//构建DefaultEventHandler的时候会通过反射构建分区器,和key-value的序列化方式
def this(config: ProducerConfig) =
this(config,
new DefaultEventHandler[K,V](config,
Utils.createObject[Partitioner](config.partitionerClass, config.props),
Utils.createObject[Encoder[V]](config.serializerClass, config.props),
Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
new ProducerPool(config)))
假如为异步消息发送也会构建一个后台发送线程
config.producerType match {
case "sync" =>
case "async" =>
sync = false
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
queue,
eventHandler,
config.queueBufferingMaxMs,
config.batchNumMessages,
config.clientId)
producerSendThread.start()
}
2),具体数据发送过程
A),消息生产到消息队列
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
调用kafka.producer.Producer对象asyncSend将消息存储到消息队列,自己的。
asyncSend(messages: Seq[KeyedMessage[K,V]])
LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
B),后台线程从消息队列取出消息
此过程,牵涉到发送调优的一个策略:
(1)满足消息发送批大小发送
(2)消息等待超时也会将消息发送
首先是构建清空队列的流
Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach
然后里面会进行判断
// check if the batch size is reached
full = events.size >= batchSize
if(full || expired) {
if(expired)
debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full)
debug("Batch full. Sending..")
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
events = new ArrayBuffer[KeyedMessage[K,V]]
}
C),DefaultEventHandler消息发送前的准备
在handle方法里,首先是会判断是否满足topic元数据更新的时间间隔,间隔设置方式:
topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)
满足则进行topic元数据更新
brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)
然后呢?获取leader,按照key进行分区,构建发送的消息块,发送消息。具体由DefaultEventHandler的dispatchSerializedData方法完成
首先,用DefaultEventHandler的partitionAndCollate方法获取
Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]
其中,其中map的key是分区leader的brokerId
Map的Value,又是一个map,key是TopicAndPartition,value是该分区的具体消息。
接着,正式进入消息封装与发送
val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
在send方法中构建了
val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
config.requestTimeoutMs, messagesPerTopic)
从Producer池中获取一个SyncProducer,发送消息
val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest)
Broker接受到消息根据requestId,进行逻辑处理,最终是交给了KafkaApis的方法
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
三,总结
由于kafka在我们整个生产环境系统中的重要性,主要体现在,kafka集群一旦垮了,会导致真个业务系统断了,系统瘫痪。所以,保证kafka集群的存活,高效运转,是我们大数据工作者一个重要责任。这次主要是从生产者的角度强调我们的优化过程。后面会陆续出文章讲多种消费者角色的源码和优化策略和kafka本身的优化策略。
1,采用异步发送策略
异步策略,增加了一个后台发送线程,增加并发度。
异步策略支持批量发送和超时发送,提升了性能。
2,设置合理的批大小和超时时间(异步处理情况)
配置 | 默认 | 作用 |
queue.buffering.max.ms | 5000 | 异步发送消息超时发送时间 |
batch.num.messages | 200 | 异步消息批量发送的阈值 |
3,设置合适的kafka分区数
分区数一方面决定了我们写消息的并发度,由此也影响着吞吐量。也决定后端处理线程的并发度。
分区并发度决定因素:数据量,后端业务处理复杂度,磁盘数量。
并不是分区数越多就越好,磁盘竞争也很影响性能的。
4,尽量使数据均匀分布
重要等级高,可以使我们后端处理线程负载均匀。
1),key随机或者轮训分区进行发送
2),自定义分区策略
5,如何保证消息顺序性
将需要保证顺序的消息,采用同步的方式发送发送到同一个分区里。
6,高级优化策略
自己使用kafka client的api实现自己的生产者,减少中间环节,尤其针对生产者跟kafka集群在同一台主机的时候,我们可以只发送数据到当前的主机的分区,减少了流量跨主机传输,节省带宽。
此文,乃原创,转载请注明出处。
欢迎大家关注,浪尖的公众号,一起学习,共同进步。