集群管理工具KafkaAdminClient——改造
在上一篇文章《集群管理工具KafkaAdminClient——原理与示例》中讲述了KafkaAdminClient的功能以及相应的原理,但是同时也提出了目前的KafkaAdminClient并没有非常的完善,还有许多功能还需要去丰富,这些功能可以自定义实现,在《如何获取Kafka的消费者详情》一文中介绍了如何获取Kafka的消费详情,其原理是通过Java调用Kafka的Scala代码实现的,如果要使用纯Java的方式实现就需要用到了KafkaAdminClient,另外Scala版的AdminClient也被标注为:“This client is deprecated, and will be replaced by KafkaAdminClient.”,说明官方也推荐使用KafkaAdminClient。不过现在的版本(目前最新1.1.0)并没有提供类似describeConsumerGroup和listGroupOffsets的方法实现,这一点在前文《集群管理工具KafkaAdminClient——原理与示例》也有提及,所以如果要实现获取类似消费者详情的功能,那么就需要自己动手进行改造。
参考Scala版的AdminClient,要实现获取Kafka的消费者详情的功能首先需要实现describeConsumerGroup和listGroupOffsets的方法,其中describeConsumerGroup方法内部还需要一个findCoordinator的方法用来提供消费者对应的coodinator节点,以便提供详细的消费者详情。describeConsumerGroup、listGroupOffsets和findCoordinator这三个方法都将在KafkaAdminClient类里提供自定义实现。KafkaAdminClient和XXXOptions、XXXResult的类都位于org.apache.kafka.clients.admin包下,笔者也将扩展的类也置于其同一包下,不过也进行了一些区分,如下图所示,新加入的XXXOptions、XXXResult类放入extend下,新加入的JavaBean放入model下,然后与具体应用功能对应的放在app下:
首先建立对应的XXXOptions、XXXResult类,就那简单的ListGroupOffsets来说,其ListGroupOffsetsOptions只是继承了AbstractOptions的空实现,而ListGroupOffsetsResult也很简单,提供了一个KafkaFuture的调用,代码参考如下:
public class ListGroupOffsetsResult {
private final KafkaFutureImpl<Map<TopicPartition, Long>> future;
public ListGroupOffsetsResult(KafkaFutureImpl<Map<TopicPartition, Long>> future) {
this.future = future;
}
public KafkaFutureImpl<Map<TopicPartition, Long>> values(){
return this.future;
}
}
model目录下的ConsumerGroupSummary是所要实现的describeConsumerGroup方法中所要获取的值类型,封装在DescribeConsumerGroupResult 中;ConsumerSummary在describeConsumerGroup方法内部使用,用来封装消费状态,包括consumerId、clientId、host(消费者主机)以及TopicPartition列表,最终被封装进ConsumerGroupSummary中。PartitionAssignmentState是服务于KafkaConsumerGroupService的,用来最后显示消费者详情列表。
KafkaAdminClient的父类是AdminClient(kafka-client中的抽象类),describeConsumerGroup、listGroupOffsets和findCoordinator这三个方法也需要在AdminClient类中做申明,详细参考如下:
public abstract DescribeConsumerGroupResult describeConsumerGroup(final String group,
final DescribeConsumerGroupOptions options);
public DescribeConsumerGroupResult describeConsumerGroup(final String group) {
return describeConsumerGroup(group, new DescribeConsumerGroupOptions());
}
public abstract FindCoordinatorResult findCoordinator(final String group,
final FindCoordinatorOptions options);
public FindCoordinatorResult findCoordinator(final String group) {
return findCoordinator(group, new FindCoordinatorOptions());
}
public abstract ListGroupOffsetsResult listGroupOffsets(final String group,
final ListGroupOffsetsOptions options);
public ListGroupOffsetsResult listGroupOffsets(final String group){
return listGroupOffsets(group, new ListGroupOffsetsOptions());
}
在前面2篇文章《集群管理工具KafkaAdminClient——原理与示例》和《如何获取Kafka的消费者详情》中都详细解释了describeConsumerGroup、listGroupOffsets方法,所以这里不在赘述,具体实现也很简单,可以参考笔者的实现:https://github.com/hiddenzzh/kafka/blob/master/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java。
最后来讲述一下org.apache.kafka.clients.admin.app包下的KafkaConsumerGroupService,具体代码地址在这里:https://github.com/hiddenzzh/kafka/blob/master/src/main/java/org/apache/kafka/clients/admin/app/KafkaConsumerGroupService.java,其内部通过上面改造的KafkaAdminClient和KafkaConsumer来实现,其内部逻辑和《如何获取Kafka的消费者详情》一文中的KafkaConsumerGroupCustomService一样,这里就不在赘述了。
本篇以及《Kafka的Lag计算误区及正确实现》、《如何获取Kafka的消费者详情》这三篇文章都是围绕如何获取消费者详情来做具体的陈述,回到问题的初衷:kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,那么真的需要这么复杂的转变过程么,详细请参考下下篇《Scala与Java语言的互操作》。
END
《Kafka解析之topic创建(3)——合法性验证》
《集群管理工具KafkaAdminClient——原理与示例》