Kafka作为消息系统的系统补充
名称 | 解释 |
broker | Kafka集群中的实例进程,负责数据存储。在Kafka集群中每个broker都有一个唯一的brokerId。通过broker来接受producer和consumer的请求,并把消息持久化到磁盘。每个Kafka集群中会选举出一个broker来担任Controller,负责处理分区的leader选举,协调分区迁移等工作 |
topic | Kafka根据topic对消息进行归类(逻辑划分),发布到Kafka集群的每条消息都需要指定一个topic。落到磁盘上对应的是partition目录,partition目录中有多个segement组合(后缀为index、log的文件)。一个topic对应一个或多个partition,一个partition对应多个segment组合 |
producer | 向broker发送消息的生产者。负责数据生产和数据分发。生产者代码可以集成到任务系统中。 数据分发策略默认为defaultPartition Utils.abs(key.hashCode)%numPartitions |
consumer | 从broker读取消息的消费者【实际上consumer是通过与zookeeper通信获取broker地址进行消息消费】 |
ConsumerGroup | 数据消费者组,ConsumerGroup(以下简称CG)可以有多个。可以把多个consumer线程划分为一个组,组里面所有成员共同消费一个topic的数据(一个topic的多个partition),组员之间不能重复消费 |
partition | 分区,一个topic可以分为多个partition(分布在多个broker上,实现扩展性),每个partition可以设置多个副本,会从多个副本中选取出一个leader负责读写操作。每个partition是一个有序的队列(partition中的每条消息都会被分配一个有序的id(offset)),Kafka只保证按每个partition内部有序并且被顺序消费,不保证一个topic的整体(多个partition间)的顺序 |
offset | 每条消息在文件中的偏移量。Kafka的存储文件都是按照offset.index来命名,方便查找 |
zookeeper | 保存meta信息,管理集群配置,以及在CG发生变化时进行rebalance |
Replication | Kafka支持以partition为单位对消息进行冗余备份,每个partition都可以配置至少1个Replication(副本数包括本身) |
leader partition | 每个Replication集合中的partition都会选出一个唯一的leader,所有的读写请求都由leader处理。其他Replicas从leader处把数据更新同步到本地,过程类似MySQL中的Binlog同步。 |
ISR(In-Sync Replica) | Replicas的一个子集,表示目前"活着的"且与leader能够保持"联系"的Replicas集合。由于读写都是首先落到leader上,所以一般来说通过同步机制从leader上拉取数据的Replica都会和leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个partition都有它自己独立的ISR。 |
Kafka中的广播和单播
每个consumer属于一个特定的CG,一条消息可以发送到多个不同的CG,但是一个CG中只能有一个consumer能够消费该消息。目的:实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)。一个topic可以有多个CG。topic的消息会复制(逻辑概念)到所有的CG,但每个partition只会把消息发给该CG中的一个consumer。
【实现单播】所有的consumer在同一个CG
1)连接broker-list中任意一台broker服务器
2)发送数据时,需要知道topic对应的partition个数及leader partition所在节点。这些信息由broker提供,每一个broker都能提供一份元数据信息(如哪些broker是存活的,哪个topic有多少分区,哪个分区是leader)
3)数据生产,数据发送到哪个partition的leader由producer代码决定
-1: 当所有的follower都同步消息成功后发送ack。最好的持久性,只要有一个replica存活,数据就不会丢失。但相对延迟高
默认分发策略:def partition(key: T, numPartitions: Int): Int = {
其他策略:轮询、随机等。
当一个消费者组中,有consumer加入或者离开时,会触发partition消费的rebalance。均衡的最终目的为了提升topic的并发消费能力,步骤如下:
比如一个topic有4个分区:P0、P1、P2、P3,一个CG中有C1、C2两个consumer。
首先根据partition索引号对partition排序:P0、P1、P2、P3,再根据consumer的id排序:C0、C1
计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
segment的意义:当Kafka producer不断发送消息,必然会引起partition文件的无限扩张,这样对于消息文件的维护以及已经被消费的消息的清理带来严重影响。通过参数设置segment可以指定保留多长时间的数据,及时清理已经被消费的消息,提高磁盘利用率,目前默认保存7天数据。
2)segment 文件命名规则
关键字 | 解释说明 |
8 byte offset | 在partition内每条消息都有一个有序的id号:offset,它可以唯一确定每条消息在partition内的位置。即offset表示partition的第多少个message |
4 byte message size | message大小 |
4 byte CRC32 | 用crc32校验message |
1 byte "magic" | 表示本次发布Kafka服务程序协议版本号 |
1 byte "attributes" | 表示为独立版本、或标识压缩类型、或编码类型 |
4 byte key length | 表示key的长度,当key为-1时,K byte key字段不填 |
value bytes payload | 表示实际消息数据 |
00000000000000000000.index表示最开始的文件,起始偏移量为0
00000000000000000099.index的消息量起始偏移量为100=99+1
00000000000000000999.index的起始偏移量为1000=999+1
近期文章:
Spark闭包 | driver & executor程序代码执行