查看原文
其他

Spring Boot中使用Redis的发布订阅功能

翟永超 SpringForAll社区 2023-07-15


通过前面一篇集中式缓存的使用教程,我们已经了解了Redis的核心功能:作为K、V存储的高性能缓存。

接下来我们会分几篇来继续讲讲Redis的一些其他强大用法!如果你对此感兴趣,一定要关注收藏我哦!

社区福利进行中,传送门:http://spring4all.com/fuli-huodong

发布订阅模式

如果你看过之前我写的关于MQ的相关文章,那么对于发布订阅功能应该不会陌生。如果没看过,那也不要紧,这里先做一个简单介绍,已经了解的可以跳过直接看下一节内容。

什么是发布订阅模式?

在发布订阅模式中有个重要的角色,一个是发布者Publisher,另一个订阅者Subscriber。本质上来说,发布订阅模式就是一种生产者消费者模式,Publisher负责生产消息,而Subscriber则负责消费它所订阅的消息。这种模式被广泛的应用于软硬件的系统设计中。比如:配置中心的一个配置修改之后,就是通过发布订阅的方式传递给订阅这个配置的订阅者来实现自动刷新的。


不就是观察者模式吗?

看到这里,学过设计模式的同学可能很容易将它与设计模式中的观察者模式联系起来,你会觉得发布订阅模式中的两个概念与观察者模式中的两个概念似乎干的是一样的事情?所以:Publisher就是观察者模式中的Subject?Subscriber就是观察者模式中的Observer?

重要区别在哪里?

从这两种模式的角色任务来说确实是非常相似的,但从实现架构上来说有一个核心不同点!

我们通过下面的图示来理解,就很清晰了:

观察者模式

发布订阅模式

可以看到这里有一个非常大的区别就是:发布订阅模式在两个角色中间是一个中间角色来过渡的,发布者并不直接与订阅者产生交互

回想一下生产者消费者模式,这个中间过渡区域对应的就是是缓冲区。因为这个缓冲区的存在,发布者与订阅者的工作就可以实现更大程度的解耦。发布者不会因为订阅者处理速度慢,而影响自己的发布任务,它只需要快速生产即可。而订阅者也不用太担心一时来不及处理,因为有缓冲区在,可以一点点排队来完成(也就是我们常说的“削峰填谷”效果)。

而我们所熟知的RabbitMQ、Kafka、RocketMQ这些中间件的本质其实就是实现发布订阅模式中的这个中间缓冲区。而Redis也提供了简单的发布订阅实现,当我们有一些简单需求的时候,也是可以一用的!如果你已经理解了这个概念,那么就进入下一节,一起来做个例子吧!


动手试一试

下面的动手任务,我们将在Spring Boot应用中,通过接口的方式实现一个消息发布者的角色,然后再写一个Service来实现消息的订阅(把接口传过来的消息内容打印处理)。

第一步:创建一个基础的Spring Boot应用,如果还不会点这里

第二步pom.xml中加入必须的几个依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

第三步:创建一个接口,用来发送消息。

@SpringBootApplication
public class Chapter55Application {

    private static String CHANNEL = "didispace";

    public static void main(String[] args) {
        SpringApplication.run(Chapter55Application.class, args);
    }

    @RestController
    static class RedisController {

        private RedisTemplate<String, String> redisTemplate;

        public RedisController(RedisTemplate<String, String> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }

        @GetMapping("/publish")
        public void publish(@RequestParam String message) {
            // 发送消息
            redisTemplate.convertAndSend(CHANNEL, message);
        }

    }

}

这里为了简单实现,公用CHANNEL名称字段,我都写在了应用主类里。

第四步:继续应用主类里实现消息订阅,打接收到的消息打印处理

@Slf4j
@Service
static class MessageSubscriber {

    public MessageSubscriber(RedisTemplate redisTemplate) {
        RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection();
        redisConnection.subscribe(new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] bytes) {
                // 收到消息的处理逻辑
                log.info("Receive message : " + message);
            }
        }, CHANNEL.getBytes(StandardCharsets.UTF_8));

    }

}

第六步:验证结果

  1. 启动应用Spring Boot主类
  2. 通过curl或其他工具调用接口curl localhost:8080/publish?message=hello
  3. 观察控制台,可以看到打印了收到的message参数
2021-06-19 16:22:30.935  INFO 34351 --- [ioEventLoop-4-2] .c.Chapter55Application$MessageSubscribe : Receive message : hello

好了,今天的内容到这里结束了。本系列教程《Spring Boot 2.x基础教程》:http://blog.didispace.com/spring-boot-learning-2x/

代码示例

本文的完整工程可以查看下面仓库中的chapter5-5目录:

  • Github:https://github.com/dyc87112/SpringBoot-Learning/
  • Gitee:https://gitee.com/didispace/SpringBoot-Learning/

往期推荐

Arthas 居然还可以条件过滤进行 watch

Spring Boot 关于日期时间格式化处理方式总结

成为最差开发者的10条建议


如果你喜欢本文,欢迎关注我们

专注分享关于Spring的一切

关注我,加入Spring技术交流群

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存