浅谈ZooKeeper中Kafka相关信息的存储
val AdminPath = "/admin"
val BrokersPath = "/brokers"
val ClusterPath = "/cluster"
val ConfigPath = "/config"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val IsrChangeNotificationPath = "/isr_change_notification"
val LogDirEventNotificationPath = "/log_dir_event_notification"
val KafkaAclPath = "/kafka-acl"
val KafkaAclChangesPath = "/kafka-acl-changes"
val ConsumersPath = "/consumers"
val ClusterIdPath = s"$ClusterPath/id"
val BrokerIdsPath = s"$BrokersPath/ids"
val BrokerTopicsPath = s"$BrokersPath/topics"
val ReassignPartitionsPath = s"$AdminPath/reassign_partitions"
val DeleteTopicsPath = s"$AdminPath/delete_topics"
val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election"
val BrokerSequenceIdPath = s"$BrokersPath/seqid"
val ConfigChangesPath = s"$ConfigPath/changes"
val ConfigUsersPath = s"$ConfigPath/users"
val ProducerIdBlockPath = "/latest_producer_id_block"
broker注册信息
/brokers/ids/[broker_id]
,其中存储的数据示例如下。{
"listener_security_protocol_map": {
"PLAINTEXT": "PLAINTEXT"
},
"endpoints": ["PLAINTEXT://hadoop7:9092"],
"jmx_port": 9393,
"host": "hadoop7",
"timestamp": "1554349917296",
"port": 9092,
"version": 4
}
jmx_port:JMX端口号。
host:所在主机名或IP地址。
timestamp:启动时的时间戳。
port:开放的TCP端口号。
version:版本号。以下所有version值均是代表版本号,不再赘述。
当Kafka集群中有节点上下线时,这个路径下的数据就会更新。
topic注册信息
路径为/brokers/topics/[topic_name]
, 其中存储的数据示例如下。
{
"version": 1,
"partitions": {
"8": [103],
"4": [109],
"9": [104],
"5": [110],
"6": [111],
"1": [106],
"0": [105],
"2": [107],
"7": [102],
"3": [108]
}
}
partitions:topic中各个partition的ID,以及其对应的ISR中各个broker的ID的列表。
当有topic被创建或删除,以及partition发生变更时,这个路径下的数据就会更新。通过对topic以及上节所述节点变更注册监听,就能实现producer的负载均衡。
另外,在/admin/delete_topics
下还保存有已经标记为删除的topic名称(只有名称,没有其他数据)。在/config/topics/[topic_name]
下保存有各个topic的自定义配置。
partition状态信息
路径为/brokers/topics/[topic_name]/partitions/[partition_id]/state
,其中存储的数据示例如下。
{
"controller_epoch": 17,
"leader": 105,
"version": 1,
"leader_epoch": 2,
"isr": [105]
}
controller_epoch:controller的纪元(代数),即集群重新选举controller的次数。
leader:当前partition leader的broker ID。
leader_epoch:partition leader的纪元(代数),即当前partition重新选举leader的次数。
isr:该partition对应的ISR中各个broker ID的列表。
controller注册信息
当前controller信息的路径就是/controller
,其中存储的数据示例如下。
{
"version": 1,
"brokerid": 104,
"timestamp": "1554349916898"
}
brokerid:现在集群中controller的节点ID。
timestamp:最近一次controller变化的时间戳。
如果controller信息节点被删除的话,就会触发集群重新选举controller。ZK对选主操作可以说是有天然的支持。
另外,在/controller_epoch
路径下还保存有controller的纪元值,与partition状态信息中的值相同。每重新选举一次,该值就会加1。
consumer订阅信息
consumer本身的信息路径为/consumers/[group_id]/ids/[consumer_id]
,其中存储的数据示例如下。
{
"version": 1,
"subscription": {
"bl_mall_orders": 1
},
"pattern": "white_list",
"timestamp": "1558617131642"
}
subscription:订阅的topic名称,及该topic对应消息流个数的映射。
pattern:订阅方式,可取值为静态(static)、白名单(white_list)、黑名单(black_list)。
timestamp:consumer创建时的时间戳。
/consumers/[group_id]/offsets/[topic_name]/[partition_id]
下存储有consumer group对应各个topic及partition的消费偏移量,在/consumers/[group_id]/owners/[topic_name]/[partition_id]
下存储有consumer group对应各个topic及partition的消费者线程。最优replica选举信息
当由于节点宕机等原因使得partition leader变得不再均匀分布时,我们可以使用Kafka提供的kafka-preferred-replica-election工具重新将partition创建时的最优replica(前提是在ISR内)选举为leader,也可以开启leader自动平衡的功能(auto.leader.rebalance.enable)。当正在选举最优replica时,ZK中就会创建/admin/preferred_replica_election
节点,其中存储着需要调整最优replica的partition信息,示例数据如下。{
"version": 1,
"partitions": [
{
"topic": "bl_mall_orders",
"partition": 1
},
{
"topic": "bl_mall_products",
"partition": 0
}
]
}
partition重分配信息
与上面的kafka-preferred-replica-election工具类似,Kafka还提供了kafka-reassign-partitions工具,但它的功能更为强大。它可以重新分配partition的所有leader和follower的位置,甚至更改replica数量。当集群扩容或follower分布也不均匀时,就可以利用它。该工具会生成JSON格式的重分配计划,并存入ZK中/admin/reassign_partitions
节点,示例数据如下。{
"version": 1,
"partitions": [
{
"topic": "bl_mall_wish",
"partition": 1,
"replicas": [0, 1, 3]
}
]
}
ISR变更通知信息
各个partition的ISR集合并不是一成不变的。当ISR发生变化(比如有replica超时)时,controller会将发生变化的那个partition存入/isr_change_notification/[isr_change_x]
中。目前暂时不容易找到它的数据格式,因此留空。
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧! 👇