2 万字 | Kafka 知识体系保姆级教程,附详细解析
The following article is from 大数据老哥 Author 大数据老哥
一、什么是消息队列
消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列 。来看一下下面的代码
// 1.创建一个保存字符串的队列
Queue<String> queue = new LinkedList<>();
// 2. 往消息队列中放入消息
queue.offer("hello");
// 3. 从消息队列中取出消息把那个打印
System.out.println(queue.poll());
上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的
「小结」: 「消息队列指的就是将数据放置到一个队列中, 从队列一端进入, 然后从另一端流出的过程」
二、消息队列的应用场景
消息队列在实际应用中包括如下四个场景:
「1、应用耦合:」
多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
「2、异步处理:」
多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
「3、 限流削峰:」
广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
「4、消息驱动的系统:」
系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理下面详细介绍上述四个场景以及消息队列如何在上述四个场景中使用
异步处理
「具体场景」:用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信。对这两个操作的处理方式有两种:串行及并行。1 ) 串行方式: 新注册信息生成后 , 先发送注册邮件, 再发送验证短信注意 : 在这种方式下,需要最终发送验证短信后再返回给客户端
假设以上三个子系统处理的时间均为 50ms ,且不考虑网络延迟,则总的处理时间:串行:50+50+50=150ms 并行:50+50 = 100ms如果引入消息队列 , 在来看整体的执行效率 :
应用耦合
「具体场景:」 用户使用 QQ 相册上传一张图片,人脸识别系统会对该图片进行人脸识别,一般的做法是,服务器接收到图片后,图片上传系统立即调用人脸识别系统,调用完成后再返回成功,如下图所示: 如果引入消息队列 , 在来看整体的执行效率
人脸识别系统被调失败,导致图片上传失败; 延迟高,需要人脸识别系统处理完成后,再返回给客户端,即使用户并不需要立即知道结果; 图片上传系统与人脸识别系统之间互相调用,需要做耦合;若使用消息队列:
此时图片上传系统并不需要关心人脸识别系统是否对这些图片信息的处理、以及何时对这些图片信息进行处理。事实上,由于用户并不需要立即知道人脸识别结果,人脸识别系统可以选择不同的调度策略,按照闲时、忙时、正常时 间,对队列中的图片信息进行处理。
限流削峰
「具体场景:」 购物网站开展秒杀活动,一般由于瞬时访问量过大,服务器接收过大,会导致流量暴增,相关系统无法处理请求甚至崩溃。而加入消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲。
消息驱动系统
「具体场景:」 用户新上传了一批照片, 人脸识别系统需要对这个用户的所有照片进行聚类,聚类完成后由对账系统重新生成用 户的人脸索引( 加快查询 ) 。这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理。
三、消息队列的两种方式
点对点模式
点对点模式下包括三个角色
消息队列 发送者 (生产者) 接收者(消费者) 消息发送者生产消息发送到 queue 中,然后消息接收者从 queue 中取出并且消费消息。消息被消费以后, queue 中不再有存储,所以消息接收者不可能消费到已经被消费的消息。
「点对点模式特点:」
每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中); 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息; 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
发布/订阅模式
发布 / 订阅模式下包括三个角色:
角色主题(Topic) 发布者(Publisher) 订阅者(Subscriber)
每个消息可以有多个订阅者; 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;
四、常见的消息队列的产品
RabbitMQ RabbitMQ 2007 年发布,是一个在 AMQP ( 高级消息队列协议 ) 基础上完成的,可复用的企业消息系统,是当前最主 流的消息中间件之一。 ActiveMQ: ActiveMQ 是由 Apache 出品, ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。它非常快速 ,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能, 目前市场的活跃 度比较低, 在 java 领域正在被 RabbitMQ 替代 RocketMQ RocketMQ 出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka ,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理 等 kafka Apache Kafka 是一个分布式消息发布订阅系统。它最初由 LinkedIn 公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log) ,之后成为 Apache 项目的一部分。Kafka 系统快速、可扩展并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。
各种消息队列产品的对比图:
五、Kafka的基本介绍
官网:「http://kafka.apache.org/」 kafka 是最初由 linkedin 公司开发的,使用 scala 语言编写, kafka 是一个分布式,分区的,多副本的,多订阅者的日 志系统(分布式MQ 系统),可以用于搜索日志,监控日志,访问日志等Kafka is a distributed,partitioned,replicated commit logservice 。它提供了类似于 JMS 的特性,但是在设计实现上完全不同,此外它并不是JMS 规范的完整实现。kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer, 消息 接受者成为Consumer, 此外 kafka 集群有多个 kafka 实例组成,每个实例 (server) 成为 broker 。无论是 kafka 集群,还是producer和 consumer 都依赖于 zookeeper 来保证系统可用性集群保存一些 meta 信息
「kakfa的特点:」
可靠性: 分布式, 分区 , 复制 和容错等 可扩展性: kakfa消息传递系统轻松缩放, 无需停机 耐用性: kafka使用分布式提交日志, 这个意味着消息会尽可能快速的保存在磁盘上, 因此它是持久的 性能: kafka对于发布和订阅消息都具有高吞吐量, 即使存储了许多TB的消息, 他也爆出稳定的性能-kafka非常快: 保证零停机和零数据丢失
apache kafka 是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将消息从一个 端点传递到另一个端点,kafka 适合离线和在线消息消费。kafka 消息保留在磁盘上,并在集群内复制以防止数据丢失。kafka构建在 zookeeper 同步服务之上。它与 apache 和 spark 非常好的集成,应用于实时流式数据分析。
「kafka的主要应用场景:」
指标分析 : kafka 通常用于操作监控数据 , 这设计聚合来自分布式应用程序和统计信息 , 以产生操作的数据集中反馈 日志聚合解决方法 : kafka 可用于跨组织从多个服务器收集日志 , 并使他们一标准的合适提供给多个服务器 流式处理 : 流式的处理框架 (spark, storm , flink) 从主题中读取数据 , 对其进行处理 , 并将处理后的结果数据写入新的主题, 供用户和应用程序使用 , kafka 的强耐久性在流处理的上下文中也非常的有用
「版本说明:」Kafka版本为2.4.1,是2020年3月12日发布的版本。可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。
六、Kafka特点总结
kafka是大数据中一款消息队列的中间件产品, 最早是有领英开发的, 后期将其贡献给了apache 成为apache的顶级项目 kafka是采用Scala语言编写 kafka并不是对JMS规范完整实现 仅实现一部分 , kafka集群依赖于zookeeper kafka可以对接离线业务或者实时业务, 可以很好的和apache其他的软件进行集成, 可以做流式数据分析(实时分
「小结:」
高可靠性 : 数据不容易丢失, 数据分布式存储, 集群某个节点宕机也不会影响 高可扩展性 : 动态的进行添加或者减少集群的节点 高耐用性 : 数据持久化的磁盘上 高性能 : 数据具有高吞吐量 非常快: 零停机和零数据丢失 (存在重复消费问题)
七、Kafka架构
「kafka cluster」: kafka的集群 「broker」: kafka集群中各个节点 「producer」: 生产者 「consumer」: 消费者 「topic」: 主题 话题 类似于大的容器 「replicas」: 副本 对每个分片构建多个副本, 保证数据不丢失副本数量是否会受限于集群节点的数据呢? 是 最多和节点是一致的
八、搭建Kafka集群
1、 将Kafka的安装包上传到虚拟机,并解压
cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
cd /export/server/kafka_2.12-2.4.1/
2、修改 server.properties
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
# 指定broker的id
broker.id=0
# 指定 kafka的绑定监听的地址
listeners=PLAINTEXT://node1:9092
# 指定Kafka数据的位置
log.dirs=/export/server/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=node1:2181,node2:2181,node3:2181
3、将安装好的kafka复制到另外两台服务器
cd /export/server
scp -r kafka_2.12-2.4.1/ node2:$PWD
scp -r kafka_2.12-2.4.1/ node3:$PWD
修改另外两个节点的broker.id分别为1和2
---------node2--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=1
listeners=PLAINTEXT://node2:9092
--------node3--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=2
listeners=PLAINTEXT://node3:9092
5、启动服务器
# 启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
cd /export/server/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
# 测试Kafka集群是否启动成功 :
使用 jps 查看各个节点 是否出现有kafka
九、目录结构分析
十、Kafka一键启动/关闭脚本
为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。
1、在节点1中创建 /export/onekey 目录
cd /export/onekey
2、准备slave配置文件,用于保存要启动哪几个节点上的kafka
node1
node2
node3
3、编写start-kafka.sh脚本
vim start-kafka.sh
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
}&
wait
done
4、编写stop-kafka.sh脚本
vim stop-kafka.sh
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
}&
wait
done
5、给start-kafka.sh、stop-kafka.sh配置执行权限
chmod u+x start-kafka.sh
chmod u+x stop-kafka.sh
6、执行一键启动、一键关闭
./start-kafka.sh
./stop-kafka.sh
十一、Kafka的shell命令使用
1、 创建topic
创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。
# 创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server node1:9092 --topic test
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server node1:9092
2、生产消息到kafka
使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。
bin/kafka-console-producer.sh --broker-list node1:9092 --topic test
3、从kafka中消费消息
使用下面的命令来消费 test 主题中的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test
4、查看主题的命令
查看 kafka 当中存在的主题
bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181
5、运行describe的命令
运行 describe 查看 topic 的相关详细信息
[root@node01 bin]# ./kafka-topics.sh --describe --zookeeper node01:2181 --topic demo
Topic:demo PartitionCount:3 ReplicationFactor:1 Configs:
Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: demo Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: demo Partition: 2 Leader: 2 Replicas: 2 Isr: 2
6、 增加topic分区数
任意 kafka 服务器执行以下命令可以增加 topic 分区数
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
7、删除topic
目前删除 topic 在默认情况下知识打上一个删除的标记,在重新启动 kafka 后才删除。如果需要立即删除,则需要 在server.properties 中配置:
delete.topic.enable=true
然后执行以下命令进行删除 topic
bin/kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
十二、Kafka的java API编写
依赖的jar包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.0</version>
</dependency>
1、生产者代码
需求:
编写Java程序,将1-100的数字消息写入到Kafka中
代码开发
生产者代码1: 使用默认异步发生数据方式, 不含回调函数
package com.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
// kafka的生产者的代码:
public class KafkaProducerTest {
public static void main(String[] args) {
//1.1: 构建生产者的配置信息:
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.put("acks", "all"); // 消息确认机制: all表示 必须等待kafka端所有的副本全部接受到数据 确保数据不丢失
// 说明: 在数据发送的时候, 可以发送键值对的, 此处是用来定义k v的序列化的类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//1. 创建 kafka的生产者对象: KafkaProducer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//2. 执行数据的发送
for (int i = 0; i < 100; i++) {
// producerRecord对象: 生产者的数据承载对象
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>("dsjlg", Integer.toString(i));
producer.send(producerRecord);
}
//3. 释放资源
producer.close();
}
}
「生产者的代码2: 同步发送操作」
package com.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
// kafka的生产者的代码:
public class KafkaProducerTest2 {
@SuppressWarnings("all")
public static void main(String[] args) {
//1.1: 构建生产者的配置信息:
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all"); // 消息确认机制: all表示 必须等待kafka端所有的副本全部接受到数据 确保数据不丢失
// 说明: 在数据发送的时候, 可以发送键值对的, 此处是用来定义k v的序列化的类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//1. 创建 kafka的生产者对象: KafkaProducer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//2. 执行数据的发送
for (int i = 0; i < 100; i++) {
// producerRecord对象: 生产者的数据承载对象
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>("dsjlg", Integer.toString(i));
try {
producer.send(producerRecord).get(); // get方法, 表示是同步发送数据的方式
} catch (Exception e) {
// 如果发生操作, 出现了异常, 认为, 数据发生失败了 ....
e.printStackTrace();
}
}
//3. 释放资源
producer.close();
}
}
「生产者代码3: 异步发生数据, 带有回调函数操作」
package com.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
// kafka的生产者的代码:
public class KafkaProducerTest {
public static void main(String[] args) {
//1.1: 构建生产者的配置信息:
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all"); // 消息确认机制: all表示 必须等待kafka端所有的副本全部接受到数据 确保数据不丢失
// 说明: 在数据发送的时候, 可以发送键值对的, 此处是用来定义k v的序列化的类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//1. 创建 kafka的生产者对象: KafkaProducer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//2. 执行数据的发送
for (int i = 0; i < 100; i++) {
// producerRecord对象: 生产者的数据承载对象
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>("dsjlg", Integer.toString(i));
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 在参数2中, 表示发生的状态异常, 如果 异常为null 表示数据以及发送成功, 如果不为null, 表示数据没有发送成功
if(exception != null){
// 执行数据发生失败的后措施...
}
}
}); // 异步发送方式
}
//3. 释放资源
producer.close();
}
}
2、消费者代码
「消费者代码1: 自动提交偏移量数据」
package com.it.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
// kafka的消费者的代码
public class KafkaConsumerTest {
public static void main(String[] args) {
//1.1: 指定消费者的配置信息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "test"); // 消费者组的名称
props.setProperty("enable.auto.commit", "true"); // 消费者自定提交消费偏移量信息给kafka
props.setProperty("auto.commit.interval.ms", "1000"); // 每次自动提交偏移量时间间隔 1s一次
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//1. 创建kafka的消费者核心类对象: KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//2. 让当前这个消费, 去监听那个topic?
consumer.subscribe(Arrays.asList("dsjlg")); // 一个消费者 可以同时监听多个topic的操作
while (true) { // 一致监听
//3. 从topic中 获取数据操作: 参数表示意思, 如果队列中没有数据, 最长等待多长时间
// 如果超时后, topic中依然没有数据, 此时返回空的 records(空对象)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//4. 遍历ConsumerRecords, 从中获取消息数据
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
System.out.println("接收到消息为:"+value);
}
}
}
}
「消费者代码2: 手动提交偏移量数据」
package com.it.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
// kafka的消费者的代码
public class KafkaConsumerTest2 {
public static void main(String[] args) {
//1.1 定义消费者的配置信息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "test01"); // 消费者组的名称
props.setProperty("enable.auto.commit", "false"); // 消费者自定提交消费偏移量信息给kafka
//props.setProperty("auto.commit.interval.ms", "1000"); // 每次自动提交偏移量时间间隔 1s一次
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//1. 创建消费者的核心类对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
//2. 指定要监听的topic
consumer.subscribe(Arrays.asList("dsjlg"));
//3. 获取数据
while(true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
String value = consumerRecord.value();
// 执行消费数据操作
System.out.println("数据为:"+ value);
// 当执行完成后, 认为消息已经消费完成
consumer.commitAsync(); // 手动提交偏移量信息
}
}
}
}
十三、Kafka的分片和副本机制
1、分片机制
主要解决了单台服务器存储容量有限的问题当数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服 务器上的数据,叫做一个分片
2、副本机制
副本备份机制解决了 「数据存储的高可用」 问题当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。
3、小结
分片: 解决单台节点存储容量有限的问题, 通过分片进行分布式存储方案副本: 保证数据不丢失, 提升数据可用性
十四、kafka是如何保证数据不丢失
1、如何保证生产者数据不丢失
a) 0:生产者只负责发送数据 b) 1:某个partition的leader收到数据给出响应 c) -1:某个partition的所有副本都收到数据后给出响应
3) 在同步模式下
a) 生产者等待10S,如果broker没有给出ack响应,就认为失败。 b) 生产者重试3次,如果还没有响应,就报错。
4) 在异步模式下
a) 先将数据保存在生产者端的Buffer中。Buffer大小是2万条。32M b) 满足数据阈值或者时间阈值其中的一个条件就可以发送数据。 c) 发送一批数据的大小是500条。16Kb
如果broker迟迟不给ack,而Buffer又满了。开发者可以设置是否直接清空Buffer中的数据。
2、如何保证broker端数据不丢失
「broker端:」
broker端的消息不丢失,其实就是用partition副本机制来保证。 Producer ack -1(all). 能够保证所有的副本都同步好了数据。其中一台机器挂了,并不影响数据的完整性。
3、如何保证消费端数据不丢失
「消费端:」 通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。 而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于offset的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
4、小结
「生产者端」
broker端主要是通过数据的副本和 ack为-1 来保证数据不丢失操作
「消费端」
十五、kafka消息存储机制和原理
1、消息的保存路径
消息发送端发送消息到 broker 上以后,消息是如何持久化的?
「数据分片」 kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>
。 比如创建一个名为firstTopic的topic,其中有3个partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3多个分区在集群中多个broker上的分配方法
将所有 N Broker 和待分配的 i 个 Partition 排序 将第 i 个 Partition 分配到第(i mod n)个 Broker 上
每个分片目录中,kafka 通过分段的方式将 数据 分为多个 LogSegment,一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index),其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。每个LogSegment 的大小可以在server.properties 中log.segment.bytes=107370 (设置分段大小,默认是1gb)选项进行设置。
2、日志和索引文件内容分析
可以通过kafka自带这条命令可以看到 kafka 消息日志的内容
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
输出结果为:
offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376
可以看到一条消息,会包含很多的字段,如下:
offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371
各字段的意义:
offset:记录号 ;
position:偏移量;
createTime 创建时间、
keysize 和 valuesize 表示 key 和 value 的大小
compresscodec 表示压缩编码
payload:表示消息的具体内容
为了提高查找消息的性能,kafka为每一个日志文件添加 了2 个索引文件:OffsetIndex
和 TimeIndex
,分别对应*.index以及*.timeindex, *.TimeIndex 是映射时间戳和相对 offset的文件
「查 看 索 引 内 容 命令:」
sh kafka-run-class.shkafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
「索引文件和日志文件内容关系如下」
3、在 partition 中通过 offset 查找 message过程
根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的,所以,使用二分查找算法能够根据offset 快速定位到指定的索引文件 找到索引文件后,根据 offset 进行定位,找到索引文件中的匹配范围的偏移量position。(kafka 采用稀疏索引的方式来提高查找性能) 得到 position 以后,再到对应的 log 文件中,从 position处开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息
比如说,我们要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490。最后查找到对应的消息以后返回。
4、日志的清除策略以及压缩策略
日志的清理策略有两个
根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程 根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置,当其中任意一个达到要求,都会执行删除。默认的保留时间是:7 天kafka会启动一个后台线程,定期检查是否存在可以删除的消息。
「日志压缩策略」 Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value 值。日志的压缩原理如下图:
5、消息写入的性能
「顺序写」 我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。
「零拷贝」 即使采用顺序写,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka还有一个性能策略:零拷贝 消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。如下:
操作系统将数据从磁盘读入到内核空间的页缓存 应用程序将数据从内核空间读入到用户空间缓存中 应用程序将数据写回到内核空间到 socket 缓存中 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有进行变化,仅仅是从磁盘复制到网卡缓冲区。通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
十六、Kafka生产者数据分发策略
Kafka生产者在分发数据时(多分区),一般是怎么发送数据呢?要想得到答案,我们不妨通过源码找到,Kafka默认使用DefaultPartitioner.class的分发策略,下面为源码的注释,让我们一起来解读一下:
/**
The default partitioning strategy:
<ul>
<li>If a partition is specified in the record, use it
<li>If no partition is specified but a key is present choose a partition based on a hash of the key
<li>If no partition or key is present choose a partition in a round-robin fashion
*/
其大致意思就是,如果消息指定了分区号,就按指定的分区号;如果没有指定分区号,就取key的哈希值对应的分区;如果既没有指定分区号,也没有key值,就采用轮询的方式,例如有两个个分区,上一条数据分发到了第一个分区,这条数据就会分发到第二个分区,下一条又分发给第一个分区…就这样重复轮询。
代码实现
Kafka为我们提供了以下策略:
//可根据主题和内容发送
public ProducerRecord(String topic, V value)
//根据主题,key、内容发送
public ProducerRecord(String topic, K key, V value)
//根据主题、分区、key、内容发送
public ProducerRecord(String topic, Integer partition, K key, V value)
//根据主题、分区、时间戳、key,内容发送
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
除上述以外,我们还可以自定义分区,只要实现Partitioner接口的方法即可,使用自定义分区策略之前,需要设置partition.class属性为自定义分区策略的全路径类名。
public class MyPartition implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
十七、Kafka造成数据积压如何解决
Kafka消费者消费数据的速度是非常快的,但如果由于处理Kafka消息时,由于有一些外部IO、或者是产生网络拥堵,就会造成Kafka中的数据积压(或称为数据堆积)。如果数据一直积压,会导致数据出来的实时性受到较大影响。
「出现积压的原因:」
因为数据写入目的容器失败,从而导致消费失败 因为网络延迟消息消费失败 消费逻辑过于复杂, 导致消费过慢,出现积压问题
「解决方案:」
对于第一种, 我们常规解决方案, 处理目的容器,保证目的容器是一直可用状态 对于第二种, 如果之前一直没问题, 只是某一天出现, 可以调整消费的超时时间 对于第三种, 一般解决方案,调整消费代码, 消费更快即可, 利于消费者的负载均衡策略,提升消费者数量
- EOF -
看完本文有收获?请转发分享给更多人
关注「大数据与机器学习文摘」,成为Top 1%
点赞和在看就是最大的支持❤️