查看原文
其他

Java人应该知道的SpringBoot For Kafka (上)

超级小豆丁 SpringForAll社区 2021-05-26
点击上方☝SpringForAll社区 轻松关注!
及时获取有趣有料的技术文章

本文来源:http://www.mydlq.club/article/34/

. 一、概念知识

. 什么是消息中间件

. 什么是 Kafka

. Kafka 特性

. 使用场景

. 基本概念

. 生产者 ACKS 机制

. 消费者更新 Offset 偏移量两种方式

. 二、SpringBoot 操作 Kafka 示例

. 1、Maven 引入 Kafka 相关组件

. 2、Topic 配置

. 3、Producer 配置

. 4、Consumer 配置


参考信息:

  • kafka 官方网址

  • spring-kafka 2.2.7 版本文档

  • 示例 Github 地址:https://github.com/my-dlq/blog-example/tree/master/springboot/springboot-kafka

环境说明:

  • Kafka 版本:2.3.0

  • Zookeeper 版本:3.4.14

  • SpringBoot 版本:2.1.7.RELEASE

  • Spring For Apache Kafka 版本:2.2.8

一、概念知识

什么是消息中间件

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

什么是 Kafka

Apache Kafka 是一个分布式高吞吐量的流消息系统,Kafka 建立在 ZooKeeper 同步服务之上。它与 Apache Storm 和 Spark 完美集成,用于实时流数据分析,与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,数据副本和高度容错功能,因此非常适合大型消息处理应用场景。

Kafka 特性

  • 高并发: 支持数千个客户端同时读写。

  • 可扩展性: kafka集群支持热扩展。

  • 容错性: 允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。

  • 持久性、可靠性: 消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。

  • 高吞吐量、低延迟: Kafka每秒可以处理几十万消息,延迟最低只有几毫秒,每个消息主题topic可以分多个区,消费者组(consumer group)对消息分区(partition)进行消费。

使用场景

  • 日志收集: 可以用 kafka 收集各种服务的日志,通过kafka以统一接口服务的方式开放给各种消费者,如 hadoop,Hbase,Solr 等。

  • 消息系统: 解耦生产者和消费者、缓存消息等。

  • 用户活动跟踪: Kafka 经常被用来记录web用户或者app用户的各种活动,如浏览网页,搜索,点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。

  • 运营指标: Kafka也经常用来记录运营监控数据,包括收集各种分布式应用的数据,比如报警和报告等。

  • 流式处理: 比如 spark streaming 和 storm。

基本概念

  • Broker: 消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群。

  • Topic: Kafka 的消息通过 Topic 主题来分类,Topic类似于关系型数据库中的表,每个 Topic 包含一个或多(Partition)分区。

  • Partition: 多个分区会分布在Kafka集群的不同服务节点上,消息以追加的方式写入一个或多个分区中。

  • LogSegment: 每个分区又被划分为多个日志分段 LogSegment 组成,日志段是 Kafka 日志对象分片的最小单位。LogSegment 算是一个逻辑概念,对应一个具体的日志文件(”.log” 的数据文件)和两个索引文件(”.index” 和 “.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成。

  • Offset: 每个分区中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到 Partition 中,每个消息都有一个连续的序列号称之为 Offset 偏移量,用于在 Partition 内唯一标识消息。

  • Message: 消息是 Kafka 中存储的最小最基本的单位,即为一个 commit log,由一个固定长度的消息头和一个可变长度的消息体组成。

  • Producer: 消息的生产者,负责发布消息到 Kafka Broker,生产者在默认情况下把消息均衡地分布到主题的所有分区上,用户也可以自定义分区器来实现消息的分区路由。

  • Consumer: 消息的消费者,从 Kafka Broker 读取消息的客户端,消费者把每个分区最后读取的消息的 Offset 偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。

  • Consumer Group: 每个 Consumer 属于一个特定的 Consumer Group(若不指定 Group Name则属于默认的 group),一个或多个 Consumer 组成的群组可以共同消费一个 Topic 中的消息,但每个分区只能被群组中的一个消费者操作。

生产者 ACKS 机制

ACKS 参数指定了必须要有多少个分区副本接收到消息,生产者才会认为消息写入是发送消息成功的,这个参数对消息丢失的可能性会产生重要影响,主参数有如下选项:

  • acks=0: 把消息发送到kafka就认为发送成功。

  • acks=1: 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功。

  • acks=all: 把消息发送到 Kafka Leader 分区,并且 Leader 分区的副本 Follower 对消息进行了同步就认为发送成功。

消费者更新 Offset 偏移量两种方式

详情可以查看参考的一篇文章:https://www.jianshu.com/p/d5cd34e429a2

消费者把每个分区最后读取的悄息偏移量提交保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失,KafkaConsumer API 提供了很多种方式来提交偏移量,但是不同的提交方式会产生不同的数据影响。

  • 自动提交:

如果 enable.auto.commit 被设置为 true,那么消费者会自动提交当前处理到的偏移量存入 Zookeeper,自动提交的时间间隔为5s,通过 atuo.commit.interval.ms 属性设置,自动提交是非常方便,但是自动提交会出现消息被重复消费的风险,可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复悄息的时间窗,不过这种情况是无也完全避免的。

  • 手动提交:

鉴于 Kafka 自动提交 Offset 的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交 Offset 策略,将 auto.commit.offset 自动提交参数设置为 false 来关闭自动提交开启手动模式,手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。

二、SpringBoot 操作 Kafka 示例

1、Maven 引入 Kafka 相关组件

1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4    <modelVersion>4.0.0</modelVersion>
5    <parent>
6        <groupId>org.springframework.boot</groupId>
7        <artifactId>spring-boot-starter-parent</artifactId>
8        <version>2.1.7.RELEASE</version>
9    </parent>
10
11    <groupId>club.mydlq</groupId>
12    <artifactId>springboot-kafka-demo</artifactId>
13    <version>0.0.1-SNAPSHOT</version>
14    <name>springboot-kafka-demo</name>
15
16    <properties>
17        <java.version>1.8</java.version>
18    </properties>
19
20    <dependencies>
21        <dependency>
22            <groupId>org.springframework.boot</groupId>
23            <artifactId>spring-boot-starter-web</artifactId>
24        </dependency>
25        <dependency>
26            <groupId>org.springframework.kafka</groupId>
27            <artifactId>spring-kafka</artifactId>
28        </dependency>
29    </dependencies>
30
31    <build>
32        <plugins>
33            <plugin>
34                <groupId>org.springframework.boot</groupId>
35                <artifactId>spring-boot-maven-plugin</artifactId>
36            </plugin>
37        </plugins>
38    </build>
39
40</project>

2、Topic 配置

配置 Topic,每次程序启动时检测 Kafka 中是否存在已经配置的 Topic,如果不存在就创建。

1import org.apache.kafka.clients.admin.AdminClientConfig;
2import org.apache.kafka.clients.admin.NewTopic;
3import org.springframework.context.annotation.Bean;
4import org.springframework.context.annotation.Configuration;
5import org.springframework.kafka.core.KafkaAdmin;
6import java.util.HashMap;
7import java.util.Map;
8
9@Configuration
10public class KafkaTopicConfig {
11
12    /**
13     * 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
14     */

15    @Bean
16    public KafkaAdmin kafkaAdmin() {
17        Map<String, Object> configs = new HashMap<>();
18        // 指定多个kafka集群多个地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
19        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
20        return new KafkaAdmin(configs);
21    }
22
23    /**
24     * 创建 Topic
25     */

26    @Bean
27    public NewTopic topicinfo() {
28        // 创建topic,需要指定创建的topic的"名称"、"分区数"、"副本数量(副本数数目设置要小于Broker数量)"
29        return new NewTopic("test"3, (short0);
30    }
31
32}

3、Producer 配置

(1)、创建 Producer 配置类

创建 Producer 配置类,对 Kafka 生产者进行配置,在配置中需要设置三个 Bean 分别为:

  • kafkaTemplate:kafka template 实例,用于 Spring 中的其它对象引入该 Bean,通过其向 Kafka 发送消息。

  • producerFactory:producer 工厂,用于对 kafka producer 进行配置。

  • producerConfigs:对 kafka producer 参数进行配置。

1import org.apache.kafka.clients.producer.ProducerConfig;
2import org.apache.kafka.common.serialization.StringSerializer;
3import org.springframework.context.annotation.Bean;
4import org.springframework.context.annotation.Configuration;
5import org.springframework.kafka.annotation.EnableKafka;
6import org.springframework.kafka.core.DefaultKafkaProducerFactory;
7import org.springframework.kafka.core.KafkaTemplate;
8import org.springframework.kafka.core.ProducerFactory;
9import java.util.HashMap;
10import java.util.Map;
11
12// 设置@Configuration、@EnableKafka两个注解,声明Config并且打开KafkaTemplate能力。
13@Configuration
14@EnableKafka
15public class KafkaProducerConfig {
16
17    /**
18     * Producer Template 配置
19     */

20    @Bean(name="kafkaTemplate")
21    public KafkaTemplate<String, String> kafkaTemplate() {
22        return new KafkaTemplate<>(producerFactory());
23    }
24
25    /**
26     * Producer 工厂配置
27     */

28    public ProducerFactory<String, String> producerFactory() {
29        return new DefaultKafkaProducerFactory<>(producerConfigs());
30    }
31
32    /**
33     * Producer 参数配置
34     */

35    public Map<String, Object> producerConfigs() {
36        Map<String, Object> props = new HashMap<>();
37        // 指定多个kafka集群多个地址
38        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
39        // 重试次数,0为不启用重试机制
40        props.put(ProducerConfig.RETRIES_CONFIG, 0);
41        // acks=0 把消息发送到kafka就认为发送成功
42        // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
43        // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
44        props.put(ProducerConfig.ACKS_CONFIG,"1");
45        // 生产者空间不足时,send()被阻塞的时间,默认60s
46        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
47        // 控制批处理大小,单位为字节
48        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
49        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
50        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
51        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
52        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
53        // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
54        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
55        // 键的序列化方式
56        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
57        // 值的序列化方式
58        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
59        // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
60        // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
61        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
62        return props;
63    }
64
65}

(2)、创建 Producer Service 向 kafka 发送数据

创建 Producer Service 引入 KafkaTemplate 对象,再创建 sendMessageSyncsendMessageAsync 两个方法,分别利用“同步/异步”两种方法向 kafka 发送消息。

1import org.springframework.beans.factory.annotation.Autowired;
2import org.springframework.kafka.core.KafkaTemplate;
3import org.springframework.kafka.support.SendResult;
4import org.springframework.stereotype.Service;
5import org.springframework.util.concurrent.ListenableFuture;
6import org.springframework.util.concurrent.ListenableFutureCallback;
7import java.util.concurrent.ExecutionException;
8import java.util.concurrent.TimeUnit;
9import java.util.concurrent.TimeoutException;
10
11@Service
12public class KafkaProducerService {
13
14    @Autowired
15    private KafkaTemplate kafkaTemplate;
16
17    /**
18     * producer 同步方式发送数据
19     * @param topic    topic名称
20     * @param message  producer发送的数据
21     */

22    public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
23        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
24    }
25
26    /**
27     * producer 异步方式发送数据
28     * @param topic    topic名称
29     * @param message  producer发送的数据
30     */

31    public void sendMessageAsync(String topic, String message) {
32        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);
33        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
34            @Override
35            public void onSuccess(SendResult<Integer, String> result) {
36                System.out.println("success");
37            }
38
39            @Override
40            public void onFailure(Throwable ex) {
41                System.out.println("failure");
42            }
43        });
44    }
45
46}

(3)、创建 Producer Controller 调用 Producer Service 产生数据

Spring Controller 类,用于调用 Producer Service 中的方法向 kafka 发送消息。

1import club.mydlq.springbootkafkademo.service.ProducerService;
2import org.springframework.beans.factory.annotation.Autowired;
3import org.springframework.web.bind.annotation.GetMapping;
4import org.springframework.web.bind.annotation.RestController;
5import java.util.concurrent.ExecutionException;
6import java.util.concurrent.TimeoutException;
7
8@RestController
9public class KafkaProducerController {
10
11    @Autowired
12    private KafkaProducerService producerService;
13
14    @GetMapping("/sync")
15    public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException {
16        producerService.sendMessageSync("test","同步发送消息测试");
17    }
18
19    @GetMapping("/async")
20    public void sendMessageAsync(){
21        producerService.sendMessageAsync("test","异步发送消息测试");
22    }
23
24}

4、Consumer 配置

(1)、创建 Consumer 配置类

创建 Consumer 配置类,对 Kafka 消费者进行配置,在配置中需要设置三个 Bean 分别为:

  • kafkaListenerContainerFactory:kafka container 工厂,负责创 建container,当使用@KafkaListener时需要提供。

  • consumerFactory:consumer 工厂,用于对 kafka consumer 进行配置。

  • consumerConfigs:对 kafka consumer 参数进行配置。

1import org.apache.kafka.clients.consumer.ConsumerConfig;
2import org.apache.kafka.common.serialization.StringDeserializer;
3import org.springframework.context.annotation.Bean;
4import org.springframework.context.annotation.Configuration;
5import org.springframework.kafka.annotation.EnableKafka;
6import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
7import org.springframework.kafka.config.KafkaListenerContainerFactory;
8import org.springframework.kafka.core.ConsumerFactory;
9import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
10import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
11import java.util.HashMap;
12import java.util.Map;
13
14@Configuration
15@EnableKafka
16public class KafkaConsumerConfig {
17
18    @Bean
19    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
20        ConcurrentKafkaListenerContainerFactory<Integer, String>
21                factory = new ConcurrentKafkaListenerContainerFactory<>();
22        // 设置消费者工厂
23        factory.setConsumerFactory(consumerFactory());
24        // 消费者组中线程数量
25        factory.setConcurrency(3);
26        // 拉取超时时间
27        factory.getContainerProperties().setPollTimeout(3000);
28        return factory;
29    }
30
31    @Bean
32    public ConsumerFactory<Integer, String> consumerFactory() {
33        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
34    }
35
36    @Bean
37    public Map<String, Object> consumerConfigs() {
38        Map<String, Object> propsMap = new HashMap<>();
39        // Kafka地址
40        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
41        // 是否自动提交offset偏移量(默认true)
42        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
43        // 自动提交的频率(ms)
44        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
45        // Session超时设置
46        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
47        // 键的反序列化方式
48        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
49        // 值的反序列化方式
50        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
51        // offset偏移量规则设置:
52        // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
53        // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
54        // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
55        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
56        return propsMap;
57    }
58
59}

(2)、创建 Consumer Service 监听 Kafka 数据

1import org.springframework.kafka.annotation.KafkaListener;
2import org.springframework.stereotype.Service;
3
4@Service
5public class KafkaConsumerService {
6
7    @KafkaListener(topics = {"test"},groupId = "group1", containerFactory="kafkaListenerContainerFactory")
8    public void kafkaListener(String message){
9        System.out.println(message);
10    }
11
12}





● SpringBoot 多种读取配置文件中参数的方式

● SpringBoot 操作 ElasticSearch 详解

● SpringBoot 使用 Caffeine 本地缓存

● Github推出了GitHub CLI

● (很全面)SpringBoot 集成 Apollo 配置中心

● 你知道如何成为一名靠谱的架构师不?

● Tomcat 在 SpringBoot 中是如何启动的?

● SpringBoot 深度调优,让你的项目飞起来!

● 8种经常被忽视的SQL错误用法,你有没有踩过坑?

● Java面试应该知道之深入理解Java的接口和抽象类



    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存