查看原文
其他

Kafka 独立消费者

张乘辉 后端进阶 2022-07-05

以前我们讨论的消费组,都是 group 的形式,group 可以自动地帮助消费者分配分区,且在发生异常时,还能自定地进行重平衡(Rebalance)。正常来说,group 帮助用户实现自动监听分区消费,但是在用户需要指定分区进行精确消费的场景下,由于 group 的重平衡机制,会打破这种消费方式,这不前段时间某项目就有个需求是这样的:

消息源端有若干个,每个消息源都会产生不同的消息,目标端也有若干个,每个目标端需要消费指定的消息源类型。

在以往,由于消费组的重平衡机制会打乱这种消费方式,只能申请多个主题对消息进行隔离,每个消息源将消息发送到指定主题,目标端监听指定的主题。这么做肯定没有指定分区消费这么优雅了,每增加一种消息源,都需要新增一个 topic,且消费粒度不能灵活组合。

针对以上问题,Kafka 的提供了独立消费者模式,可以消费者可以指定分区进行消费,如果只用一个 topic,每个消息源启动一个生产者,分别发往不同的分区,消费者指定消费相关的分区即可,用如下图所示:

但是 Kafka 独立消费者也有它的限定场景:

1、 Kafka 独立消费者模式下,Kafka 集群并不会维护消费者的消费偏移量,需要每个消费者维护监听分区的消费偏移量,因此,独立消费者模式与 group 模式切勿混合使用!

2、group 模式的重平衡机制在消费者异常时可将其监听的分区重分配给其它正常的消费者,使得这些分区不会停止被监听消费,但是独立消费者由于是手动进行监听指定分区,因此独立消费者发生异常时,并不会将其监听的分区进行重分配,这就会造成某些分区消息堆积。因此,在该模式下,独立消费者需要实现高可用,例如独立消费者使用 K8s Deployment 进行部署。

下面将演示如何使用 Kafka#assgin 方法手动订阅指定分区进行消费:

public static void main(String[] args) {
  Properties kafkaProperties = new Properties();
  kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  kafkaProperties.put("bootstrap.servers""localhost:9092");
  KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProperties);
  List<TopicPartition> partitions = new ArrayList<>();
  partitions.add(new TopicPartition("test_topic"0));
  partitions.add(new TopicPartition("test_topic"1));
  consumer.assign(partitions);
  while (true) {
    ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(3000));
    for (ConsumerRecord<String, byte[]> record : records) {
      System.out.printf("topic:%s, partition:%s%n", record.topic(), record.partition());
    }
  }
}


近期热文

Seata RPC 模块的重构之路

我参与 Seata 开源项目的一些感悟

Spring 异步实现原理与实战分享

Kafka Producer 异步发送消息居然也会阻塞?

图解 DataX 核心设计原理

使用 K8s 进行作业调度实战分享

图解 K8s 核心概念和术语

kill -9 导致 Kakfa 重启失败的惨痛经历

图解:Kafka 水印备份机制

记一次 Kafka 集群线上扩容

Kafka重平衡机制

分布式事务中间件 Seata 的设计原理

我对支付平台架构设计的一些思考

聊聊 Tomcat 的架构设计

关于 Kafka 的一些面试题目

基于Jenkins Pipeline自动化部署

RocketMQ消息发送的高可用设计

深度解析RocketMQ Topic的创建机制

从源码的角度解析线程池运行原理

关于线程池你不得不知道的一些设置

你都理解创建线程池的参数吗?

Java并发之AQS源码分析(二)

Java并发之AQS源码分析(一)


扫一扫

订阅「后端进阶」文章推送


原创不易,如果对你有帮助,麻烦各位读者帮忙点个「在看」或者「转发」一下文章!

再次感谢你的阅读!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存