查看原文
其他

Kafka源码系列之通过源码分析Producer性能瓶颈

2017-06-07 浪尖 Spark学习技巧

Kafka源码系列之通过源码分析Producer性能瓶颈

本文,kafka源码是以0.8.2.2,原因是浪尖一直没对kafka系统进行升级。主要是java的kafka生产者源码,Broker接收到producer请求之后处理的相关源码。估计源码内容是比较多的,只给出大致逻辑,主类和函数名称。本文的目的是让大家,彻底了解发送消息到kafka的过程及如何对producer进行调优。

一,kafka的producer基本介绍及主要类

1,基本介绍

Kafka的Producer,主要负责将消息发送给kafka集群。主要核心特性有两点:

1),异步 or 同步。

可以通过配置producer.type(1或async和2或sync)

设置为异步的话,其实就是启动了一个后台线程,负责从队列里面取数据发送给kafka,我们的主程序负责往队列了写数据。支持批发送。

2),消息的key决定分区。

可以采用分区器,来决定我们的消息发往哪个partition。实现Partitioner可以实现自定义分区器。默认的分区策略是对key取hash值,然后对总的分区数取余,得到的余数作为我们发送消息到哪个分区的依据。

2,主要类

Producer

kafka.producer.Producer该类异常重要,负责对DefaultEventHandler进行初始化并且在此过程也初始化真正的发送者池ProducerPool。

异步发送消息的策略情况下会对初始化我们后台发送线程。

DefaultEventHandler

该类主要是为消息发送做准备,比如更新broker信息,找到分区的leader等,最终通过SyncProducer将消息发送给Broker。

ProducerSendThread

这个是异步发送情况下才会有的,一个后台发送线程。这种模式下,生产端实际是形成了一个生产消费模型,用户调用Producer.send实际是将消息添加到了一个消息队列里面LinkedBlockingQueue,然后由ProducerSendThread的processEvents,批量处理后交给了DefaultEventHandler。

ProducerPool

维护了Broker数目个SyncProducer,会在每次发送消息前更新。

SyncProducer

每个SyncProducer对象,包含了一个到一个broker链接,我们通过获取该对象来发消息到特定的broker。

BrokerPartitionInfo

该类主要是在获取topic元数据信息的时候会用到。在更新元数据的时候会同时更新我们ProducerPool,避免Borker挂掉之后SyncProducer对象不可用。

Partitioner

主要是分区器。对我们的消息按照key进行分区。

KafkaApis

Kafka的各种请求的逻辑处理类。

Processor

负责应答请求。

 

二,源码讲解

producer与Broker通信骨干

由上图可以看出,生产者于消费者通讯的骨干分两类:

1,TopicMetadataRequest

RequestId:RequestKeys.MetadataKey。主要是负责请求topic的元数据,更新Producer里面syncProducers链接信息,也获取了topic的具体信息。

在DefaultEventHandler的Handle方法里,开启了元数据的更新

brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)

def updateInfo(topics: Set[String], correlationId: Int) {
  var topicsMetadata: Seq[TopicMetadata] = Nil
  val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)

 

在ClientUtils.fetchTopicMetadata方法里,会根据我们给定的broker列表,随机选出一个创建一个SyncProducer,去获取元数据。获取到就会推出(一台Broker会包含所有的元数据)。

val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
// shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the
// same broker
val shuffledBrokers = Random.shuffle(brokers)
while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
  val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
  info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
  try {
    topicMetadataResponse = producer.send(topicMetadataRequest)
    fetchMetaDataSucceeded = true
  }
  catch {
    case e: Throwable =>
      warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
        .format(correlationId, topics, shuffledBrokers(i).toString), e)
      t = e
  } finally {
    i = i + 1
    producer.close()
  }
}

ducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))

TopicMetadataRequest被KafkaApis识别为获取topic元数据的请求,最终调用的处理函数是handleTopicMetadataRequest(request)。

2,ProducerRequest

RequestId:RequestKeys.ProduceKey。被KafkaApis识别为消息发送请求。会调用的处理函数handleProducerOrOffsetCommitRequest(request)。

具体过程如下:

根据上图可知,异步通讯这种方式其实涵盖了,同步通讯。所以,这块源码呢,我们主要关注异步通讯。

1),初始化

前面已经说过了在构建kafka.producer.Producer对象的时候会初始化ProducerPool和DefaultEventHandler。

//构建DefaultEventHandler的时候会通过反射构建分区器,和key-value的序列化方式
def this(config: ProducerConfig) =
  this(config,
       new DefaultEventHandler[K,V](config,
                                    Utils.createObject[Partitioner](config.partitionerClass, config.props),
                                    Utils.createObject[Encoder[V]](config.serializerClass, config.props),
                                    Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
                                    new ProducerPool(config)))

假如为异步消息发送也会构建一个后台发送线程


config.producerType match {
  case "sync" =>
  case "async" =>
    sync = false
    producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                     queue,
                                                     eventHandler,
                                                     config.queueBufferingMaxMs,
                                                     config.batchNumMessages,
                                                     config.clientId)
    producerSendThread.start()
}

2),具体数据发送过程

A),消息生产到消息队列

producer.send(new KeyedMessage<Integer, String>(topic, messageStr));

调用kafka.producer.Producer对象asyncSend将消息存储到消息队列,自己的。

asyncSend(messages: Seq[KeyedMessage[K,V]])

LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)

B),后台线程从消息队列取出消息

此过程,牵涉到发送调优的一个策略:

(1)满足消息发送批大小发送

(2)消息等待超时也会将消息发送

首先是构建清空队列的流

Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
                  .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach

然后里面会进行判断

// check if the batch size is reached
full = events.size >= batchSize

if(full || expired) {
  if(expired)
    debug(elapsed + " ms elapsed. Queue time reached. Sending..")
  if(full)
    debug("Batch full. Sending..")
  // if either queue time has reached or batch size has reached, dispatch to event handler
  tryToHandle(events)
  lastSend = SystemTime.milliseconds
  events = new ArrayBuffer[KeyedMessage[K,V]]
}

C),DefaultEventHandler消息发送前的准备

在handle方法里,首先是会判断是否满足topic元数据更新的时间间隔,间隔设置方式:

topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)

满足则进行topic元数据更新

brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)

然后呢?获取leader,按照key进行分区,构建发送的消息块,发送消息。具体由DefaultEventHandler的dispatchSerializedData方法完成

首先,用DefaultEventHandler的partitionAndCollate方法获取

Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]

其中,其中map的key是分区leader的brokerId

Map的Value,又是一个map,key是TopicAndPartition,value是该分区的具体消息。

接着,正式进入消息封装与发送

val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
val failedTopicPartitions = send(brokerid, messageSetPerBroker)

在send方法中构建了

val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
  config.requestTimeoutMs, messagesPerTopic)

从Producer池中获取一个SyncProducer,发送消息

val syncProducer = producerPool.getProducer(brokerId)

val response = syncProducer.send(producerRequest)

Broker接受到消息根据requestId,进行逻辑处理,最终是交给了KafkaApis的方法

case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)

 

三,总结

由于kafka在我们整个生产环境系统中的重要性,主要体现在,kafka集群一旦垮了,会导致真个业务系统断了,系统瘫痪。所以,保证kafka集群的存活,高效运转,是我们大数据工作者一个重要责任。这次主要是从生产者的角度强调我们的优化过程。后面会陆续出文章讲多种消费者角色的源码和优化策略和kafka本身的优化策略。

1,采用异步发送策略

异步策略,增加了一个后台发送线程,增加并发度。

异步策略支持批量发送和超时发送,提升了性能。

2,设置合理的批大小和超时时间(异步处理情况)

配置

默认

作用

queue.buffering.max.ms

5000

异步发送消息超时发送时间

batch.num.messages

200

异步消息批量发送的阈值

 

3,设置合适的kafka分区数

分区数一方面决定了我们写消息的并发度,由此也影响着吞吐量。也决定后端处理线程的并发度。

分区并发度决定因素:数据量,后端业务处理复杂度,磁盘数量。

并不是分区数越多就越好,磁盘竞争也很影响性能的。

4,尽量使数据均匀分布

重要等级高,可以使我们后端处理线程负载均匀。

1),key随机或者轮训分区进行发送

2),自定义分区策略

5,如何保证消息顺序性

将需要保证顺序的消息,采用同步的方式发送发送到同一个分区里。

6,高级优化策略

自己使用kafka client的api实现自己的生产者,减少中间环节,尤其针对生产者跟kafka集群在同一台主机的时候,我们可以只发送数据到当前的主机的分区,减少了流量跨主机传输,节省带宽。


此文,乃原创,转载请注明出处。

    欢迎大家关注,浪尖的公众号,一起学习,共同进步。

 

 

 


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存