查看原文
其他

kafka实践(十二):生产者(KafkaProducer)源码详解和调试

yzg 数据社 2022-07-29

本节对producer的源码解析以熟悉生产者数据发送过程,关于使用Idea对kafka源码编译和调试,参考https://blog.csdn.net/yezonggang/article/details/98212267,本次分析版本是kafka-1.0.0;

一、环境准备

在前面已经完成win环境下zk(3.4.12版本)的运行,并对kafka源码编译, 参考:本地kafka源码的编译和调试,在idea的run-->debug-->中新增configuration来创建topic:yzg(3分区1备份),本地启动运行效果:

二、生产过程和KafkaProducer类解析

KafkaProducer在 org.apache.kafka.clients.producer的包下(所有关于生产者源码都在这包),在使用生产者类时要实例化KafkaProducer,其中定义了发送机制,KafkaProducer是Producer的子类,生产者实例(producer)通过实例化KafkaProducer类,并调用它的send()方法完成数据发送,梳理如下:
1. 首先过一个拦截器;2. 调用KafkaProducer.send().doSend()方法,doSend首先把key和value按照指定的序列化器进行序列化;3. partition()函数得到数据和序列化后的数据后,对数据进行分区;4. 调用RecordAccumulator.append()方法,将处理后的数据扔进RecordAccumulator(缓存对象)的RecordAppendResult类属性中;5. RecordAccumulator.append()方法首先将数据进行队列化放在Deque对象中,Deque包含多个ProducerBatch;6. 上面流程完成后,调用this.sender.wakeup()唤醒sender线程,该线程就干一件事就是发数据,

KafkaProducer类的构造函数如下,在生产者实例传入集群config和序列化器后(暂未传入topic名称),KafkaProducer实例化后完成所有相关属性的实例化,主要的对象有
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { try { Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = Time.SYSTEM; String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ? (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null; LogContext logContext; if (transactionalId == null) logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId)); else logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId)); log = logContext.logger(KafkaProducer.class); log.trace("Starting the Kafka producer"); Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); if (keySerializer == null) { this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class)); this.keySerializer.configure(config.originals(), true); } else { config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); this.keySerializer = ensureExtended(keySerializer); } if (valueSerializer == null) { this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class)); this.valueSerializer.configure(config.originals(), false); } else { config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this.valueSerializer = ensureExtended(valueSerializer); } // load interceptors and make sure they get clientId userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, true, clusterResourceListeners); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.transactionManager = configureTransactionState(config, logContext, log); int retries = configureRetries(config, transactionManager != null, log); int maxInflightRequests = configureInflightRequests(config, transactionManager != null); short acks = configureAcks(config, transactionManager != null, log); this.apiVersions = new ApiVersions(); this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, time, apiVersions, transactionManager); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), this.metadata, clientId, maxInflightRequests, config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.requestTimeoutMs, time, true, apiVersions, throttleTimeSensor, logContext); this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); this.errors = this.metrics.sensor("errors"); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 close(0, TimeUnit.MILLISECONDS, true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); }}

1、数据预处理(拦截器、序列化、分区器、缓存)

① 生产者producer在拿到props后实例化KafkaProducer,然后多线程调用send(),KafkaProducer如果没有定义拦截器interceptors(ProducerInterceptors类的实例)数据record保持不变,若定义了interceptors就调用拦截器的ProducerInterceptors.onSend()方法过滤数据record,这个拦截器就是用来自定义的,源码里面没有过滤方法;
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback);}

② 接下来调用doSend方法,方法体内首先调用partition()方法,入参是原始的record数据,以及key和value序列化结果;
// 确认topic和集群信息正确ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);Cluster cluster = clusterAndWaitTime.cluster;// 分区器int partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);//如果没有指定分区,就使用内置的分区器partitioner.partition()private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition(                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
③ 接下来调用KafkaProducer类持有的RecordAccumulator对象的RecordAccumulator.append()方法,返回RecordAppendResult对象;
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);// append方法的实现,返回RecordAppendResult 对象public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { Deque<ProducerBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { return appendResult; } MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); }}

④ 继续上面的代码,生产者本地维护一个未发送数据的缓存池,也是一个后台IO线程用来将records转换为网络请求,这就是RecordAccumulator,RecordAccumulator持有RecordAppendResult对象,其future作为整个producer.send()方法的返回值;
public final static class RecordAppendResult { public final FutureRecordMetadata future; public final boolean batchIsFull; public final boolean newBatchCreated; public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) { this.future = future; this.batchIsFull = batchIsFull; this.newBatchCreated = newBatchCreated; }}

RecordAccumulator通过getOrCreateDeque(tp)得到deque队列(持有ProducerBatch对象),ProducerBatch是最小的发送数据实体,RecordAccumulator计算字节数并分配本地资源,不断往deque队列新增ProducerBatch对象;
Deque<ProducerBatch> dq = getOrCreateDeque(tp);

至此 KafkaProducer.send()方法的逻辑结束,也就是原始数据经过逻辑转换后放在本地的Deque队列中;2、sender线程处理
在KafkaProducer实例化后sender也被实例化,KafkaProducer.send().doSend()会通过this.sender.wakeup()把线程方法启动,它持有一个NetworkClient实例,sender实例的run()方法包含对NetworkClient的处理逻辑。
// 网络请求的构造器NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), this.metadata, clientId, maxInflightRequests, config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.requestTimeoutMs, time, true, apiVersions, throttleTimeSensor, logContext); // run方法中对于网络请求的逻辑void run(long now) { if (transactionManager != null) { try { if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { // as long as there are outstanding transactional requests, we simply wait for them to return client.poll(retryBackoffMs, now); return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, now); return; } else if (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional

三、生产者demo使用和调试

源码编译运行后相当于本地搭建了kafka集群,在源码examples包下 producer类来了解数据发送流程,首先定义kafka提供的KafkaProducer类,再调用它的send()方法发送数据;很多工作是在KafkaProducer类实例化的时候已经做了;
    producer类需定义key和value、topic名称、同步或者异步,然后构造器指定kafka集群地址,生产者id(可选),序列化器    producer线程类的执行方法(while死循环),判断是异步还是同步发送配置(高版本默认都异步),调用send方法发送数据,send方法的第1个参数是ProducerRecord,第2个是messageNo记录发送批次,第3个是数据record,DemoCallBack是回执类(不是函数) callback类有开始时间、自增的messageno、messageStr字符串三个参数,并重写onCompletion方法来定义异常;

稍微修改这个实现类进行调试,集群是idea在运行的本地集群(127.0.0.1:9092),指定的topic是yezonggang,异步发送,producer.send()方法写在线程方法内调用,如下:
package demo;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;import java.util.concurrent.ExecutionException; public class DemoForProducer extends Thread{ public static void main(String[] args) { //System.out.println("hello"); DemoForProducer dfp=new DemoForProducer("yezonggang",true); dfp.run(); } private final KafkaProducer<Integer, String> producer; private final String topic; private final Boolean isAsync; public DemoForProducer(String topic, Boolean isAsync) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("client.id", "DemoForProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); this.topic = topic; this.isAsync = isAsync; } public DemoForProducer(KafkaProducer<Integer, String> producer, String topic, Boolean isAsync) { this.producer = producer; this.topic = topic; this.isAsync = isAsync; } // 线程类的执行方法(while死循环),判断是异步还是同步发送配置(高版本默认都异步),调用send方法发送数据,send方法的第1个参数是ProducerRecord,第2个是messageNo记录发送批次,DemoCallBack是回执函数 public void run() { int messageNo = 1; while (true) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoForCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } ++messageNo; } }}


在 KafkaProducer类实例化后,idea中运行的本地kafka集群就已经拿到了producerconfig设置,如下,client.id=DemoForProducer已经说明该生产者已经被kafka集群捕获,即使当前send()方法还未启动;
ProducerRecord(topic=yezonggang, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=1, value=Message_1, timestamp=null)
INFO ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [127.0.0.1:9092]buffer.memory = 33554432client.id = DemoForProducercompression.type = noneconnections.max.idle.ms = 540000enable.idempotence = falseinterceptor.classes = nullkey.serializer = class org.apache.kafka.common.serialization.IntegerSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 0retry.backoff.ms = 100sasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer(org.apache.kafka.clients.producer.ProducerConfig)

欢迎加入数据分析|数仓技术交流群

进群方式:请加微信(微信号:dataclub_bigdata),回复:数据,通过审核会拉你进群。


(备注:行业-职位-城市)


历史好文推荐
  1. 从0到1搭建大数据平台之计算存储系统

  2. 从0到1搭建大数据平台之调度系统

  3. 从0到1搭建大数据平台之数据采集系统

  4. 如何从0到1搭建大数据平台

  5. 从0到1搭建自助分析平台


福利时刻

01. 后台回复「数据」,即可领取大数据经典资料。

02. 后台回复「转型」,即可传统数据仓库转型大数据必学资料。

03. 后台回复「加群,或添加一哥微信IDdataclub_bigdata  拉您入群(大数据|数仓|分析)或领取资料。

Q: 关于大数据,你还想了解什么?


欢迎大家扫描下方二维码订阅「数据社」内容并推荐给更多数据方向的朋友,希望有更多机会和大家交流。


  

关注不迷路~ 各种福利、资源定期分享


你也「在看」吗?👇



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存