其他
面试官问:Kafka 会不会丢消息?怎么处理的?
周一,中午好,我是磊哥。
今天,磊哥给大家科普一下,面试官经常问的刁钻问题,Kafka 会不会丢消息?怎么处理的?
这个Kafka确实存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种。
Broker
主动调用sync或fsync函数 可用内存低于阀值 dirty data时间达到阀值。dirty是pagecache的一个标识位,当有数据写入到pageCache时,pagecache被标注为dirty,数据刷盘以后,dirty标志清除。
Broker配置刷盘机制,是通过调用fsync函数接管了刷盘动作。从单个Broker来看,pageCache的数据会丢失。
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盘
以上的引用是kafka官方对于参数acks的解释(在老版本中,该参数是request.required.acks)。
acks=0,producer不等待broker的响应,效率最高,但是消息很可能会丢。 acks=1,leader broker收到消息后,不等待其他follower的响应,即返回ack。也可以理解为ack数为1。此时,如果follower还没有收到leader同步的消息leader就挂了,那么消息会丢失。按照上图中的例子,如果leader收到消息,成功写入PageCache后,会返回ack,此时producer认为消息发送成功。但此时,按照上图,数据还没有被同步到follower。如果此时leader断电,数据会丢失。 acks=-1,leader broker收到消息后,挂起,等待所有ISR列表中的follower返回结果后,再返回ack。-1等效与all。这种配置下,只有leader写入数据到pagecache是不会返回ack的,还需要所有的ISR返回“成功”才会触发ack。如果此时断电,producer可以知道消息没有被发送成功,将会重新发送。如果在follower收到数据以后,成功返回ack,leader断电,数据将存在于原来的follower中。在重新选举以后,新的leader会持有该部分数据。数据从leader同步到follower,需要2步:
如上图中:
acks=0,总耗时f(t) = f(1)。 acks=1,总耗时f(t) = f(1) + f(2)。 acks=-1,总耗时f(t) = f(1) + max( f(A) , f(B) ) + f(2)。
性能依次递减,可靠性依次升高。
Producer
异步发送消息生产速度过快的示意图
异步发送消息改为同步发送消。或者service产生消息时,使用阻塞的线程池,并且线程数有一定上限。整体思路是控制消息产生速度。 扩大Buffer的容量配置。这种方式可以缓解该情况的出现,但不能杜绝。 service不直接将消息发送到buffer(内存),而是将消息写到本地的磁盘中(数据库或者文件),由另一个(或少量)生产线程进行消息发送。相当于是在buffer和service之间又加了一层空间更加富裕的缓冲层。
Consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 自动提交开关
props.put("enable.auto.commit", "true");
// 自动提交的时间间隔,此处是1s
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
// 调用poll后,1000ms后,消息状态会被改为 committed
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
insertIntoDB(record); // 将消息入库,时间可能会超过1000ms
上面的示例是自动提交的例子。如果此时,insertIntoDB(record)
发生异常,消息将会出现丢失。接下来是手动提交的例子:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 关闭自动提交,改为手动提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
// 调用poll后,不会进行auto commit
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// 所有消息消费完毕以后,才进行commit操作
consumer.commitSync();
buffer.clear();
}
将提交类型改为手动以后,可以保证消息“至少被消费一次”(at least once)。但此时可能出现重复消费的情况,重复消费不属于本篇讨论范围。
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 精确控制offset
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
近期技术热文
1、面试官欺负人:new Object()占用几个字节?
2、不要封装工具类了,这款神仙框架,真好用!
3、退税 = 磊哥发财了?
4、磊哥,做副业=9034.75元?
5、Java 8 中 Map 骚操作之 merge() 的用法
这个整理了1000多本 常用 技术书籍PDF,绝大部分核心的高清技术书籍都可以在这里找到!
推荐,GitHub 地址,电脑打开体验更好
阅读原文:一键直达,GitHub 地址