查看原文
其他

RabbitMQ 死信机制真的可以作为延时任务这个场景的解决方案吗?

MarkerHub 2022-11-21

小Hub领读:

娓娓道来的一篇好文章,贴合实际的运用场景,关于死信和延时队列的分析,我觉得挺好的,不知不觉我都看完了!


作者:编程妙妙屋

来源:https://www.skypyb.com/2020/01/jishu/1206/

关于延时任务,在业务场景中实在是太常见了。比如订单,下单 xx 分钟未支付就要将订单关闭。比如红包, XX 分钟未抢,则红包失效。

那么说起延时任务的实现方案的话,可能有很多人第一时间会想到轮询,即设置定时任务,而稍有经验的开发者就知道。轮询这机制会给数据库带来很大压力,小业务当然无所谓。如果是大量数据要处理的业务用轮询肯定是不行的。而且你如果要保证高可用,就又得牵扯出分布式定时任务。怎么搞都很麻烦。

很多小机灵鬼知道可以用消息队列来实现。确实,MQ 的异步性和解耦性在延时任务的这种场景下可以爆发出很强的战斗力。而 RabbitMQ 因其被广泛使用,关于如何实现延时任务自然也有其解决方案。

下面本文基于 SpringBoot 环境演示一下使用 RabbitMQ 实现延时任务的方案

用文字和 UML 活动图来讲一讲所谓 RabbitMQ 的 “死信” 机制如何实现延时消息的需求及其功能上的 不足

1、死信是什么

说起死信,balabala 的什么死信队列、死信交换机这种名词就出来了。

这个词语有点抽象,但也不是那么难以理解。死信死信,就当他死了~

比如你生产者发送一条消息到 MQ Broker ,这条消息因为各种原因没被消费掉,消息最终挂掉了 / 死了。就可以认为他是死信

那么死信队列呢?死信交换机呢?其实这两个东西 和普通的队列、交换机是一样的,并没有本质区别

不过可以通过对 RabbitMQ 的配置,将其设置为 “死信” 的处理者。就是一条消息因为种种原因没被消费掉,最终死了,那么就把这个消息转发给死信交换机、由他来对这个死亡的消息进行处理

这种设置、处理 在 RabbitMQ 中是点对点的,即一个普通队列 可以绑定一个死信交换机。

指定队列的死信交换机需要设置队列的属性参数 (arguments)

具体参数名:

绑定死信交换机 : x-dead-letter-exchange

路由死信时使用的 routingkey : x-dead-letter-routing-key

2、什么情况会产生死信

在 RabbitMQ 中,产生死信有这么几种情况

1、队列长度满了

2、消费者拒绝消费消息 (丢弃)

3、消息 TTL 过期

这里说到了 TTL ,那么就需要解释一下这是个什么东西。

TTL 是 time to live 的缩写,即生存时间。

RabbitMQ 中可以在队列上、单条消息上设置 TTL。如果是设置在队列上,则可以认为该条队列中所有消息的 TTL 为设定值。

队列 TTL 属性参数: x-message-ttl

单条消息 TTL 参数: expiration

如果设置了 TTL 值,消息待在队列中的时间超过 TTL 值后还未被消费的话,消息队列则会将消息丢弃,产生” 死信”。

产生死信后,若队列配置了死信交换机,则会将消息流转到绑定的死信交换机中,然后再由死信交换机路由到死信队列

死信队列再推送给这个队列的消费者

3、基于死信机制的延时任务实现方案

那么,根据上述 1、2 知识点,对应的延时任务实现方案自然就出来了。

具体方案:

1、创建一个没有消费者的队列,设置 TTL 值,并绑定死信交换机

2、所有需要延时的消息全部向这条队列发送。

3、死信交换机绑定对应的死信队列,其消费者即为处理延时消息的服务

根据以上方案逻辑,在发消息到队列后,必定会等待到消息过期后——即指定的延时时间后,才会有消费者对消息进行处理。

可以实现延时任务的需求。

活动图如下所示:

3、Spring 中 RabbitMQ 死信实现方式

既然知道了原理和机制,那么就先真实上手撸一个出来。

依赖的配置以及具体 application.yml 文件的书写就不在此进行说明了,想了解详情可以看我以前文章。

最重要最核心的是 RabbitMQ 的队列、交换机配置。

据上述知识点可以得出,只要配置好了 TTL、死信交换机,即可实现功能。

那么这里我就直接将我写的配置类贴出:

  1. @Configuration


  2. public class RabbitBindConfig {


  3.     public final static String SKYPYB_ORDINARY_EXCHANGE = "skypyb-ordinary-exchange";


  4.     public final static String SKYPYB_DEAD_EXCHANGE = "skypyb-dead-exchange";


  5.     public final static String SKYPYB_ORDINARY_QUEUE_1 = "skypyb-ordinary-queue";


  6.     public final static String SKYPYB_DEAD_QUEUE = "skypyb-dead-queue";


  7.     public final static String SKYPYB_ORDINARY_KEY = "skypyb.key.ordinary.one";


  8.     public final static String SKYPYB_DEAD_KEY = "skypyb.key.dead";


  9.     @Bean


  10.     public DirectExchange ordinaryExchange() {


  11.         return new DirectExchange(SKYPYB_ORDINARY_EXCHANGE, false, true);


  12.     }


  13.     @Bean


  14.     public DirectExchange deadExchange() {


  15.         return new DirectExchange(SKYPYB_DEAD_EXCHANGE, false, true);


  16.     }


  17.     @Bean


  18.     public Queue ordinaryQueue() {


  19.         Map arguments = new HashMap<>();


  20.         //TTL 5s


  21.         arguments.put("x-message-ttl", 1000 * 5);


  22.         // 绑定死信队列和死信交换机


  23.         arguments.put("x-dead-letter-exchange", SKYPYB_DEAD_EXCHANGE);


  24.         arguments.put("x-dead-letter-routing-key", SKYPYB_DEAD_KEY);


  25.         return new Queue(SKYPYB_ORDINARY_QUEUE_1, false, false, true, arguments);


  26.     }


  27.     @Bean


  28.     public Queue deadQueue() {


  29.         return new Queue(SKYPYB_DEAD_QUEUE, false, false, true);


  30.     }


  31.     @Bean


  32.     public Binding bindingOrdinaryExchangeAndQueue() {


  33.         return BindingBuilder.bind(ordinaryQueue()).to(ordinaryExchange()).with(SKYPYB_ORDINARY_KEY);


  34.     }


  35.     @Bean


  36.     public Binding bindingDeadExchangeAndQueue() {


  37.         return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(SKYPYB_DEAD_KEY);


  38.     }


  39. }

可以看到我定义了关于 普通队列相关 以及 死信队列相关 的几个常量。

并且基于这些常量实例化出了对应的交换机、队列,并设置了绑定关系。

在实例化普通队列时对其进行了特殊处理; 给普通队列绑定上了死信交换机,并指定好死信 routing key。指定好了其 TTL 值 (5s 过期) 后才进行实例化。

那么现在以这么一个配置,就已经实现了延时消息需要的所有条件了。

写个消费者、发送者来测试一下。

消费者:

  1. @RabbitListener(queues = {RabbitBindConfig.SKYPYB_DEAD_QUEUE})


  2. @Component


  3. public class DeadReceiver {


  4.     private Logger logger = LoggerFactory.getLogger(DeadReceiver.class);


  5.     @RabbitHandler


  6.     public void onDeadMessage(@Payload String message,


  7.                                @Headers Map headers,


  8.                                Channel channel) throws IOException {


  9.         logger.info("死信队列消费者接收消息: {}", message);


  10.         //delivery tag 可以从 headers 中 get 出来


  11.         Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);


  12.         try {


  13.             channel.basicAck(deliveryTag, false);


  14.         } catch (Exception e) {


  15.             System.err.println(e.getMessage());


  16.             boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);


  17.             channel.basicNack(deliveryTag, false, !redelivered);


  18.         }


  19.     }


  20. }

发送者:

  1. @RunWith(SpringRunner.class)


  2. @SpringBootTest(classes = Application.class)


  3. public class RabbitmqTest {


  4.     @Autowired


  5.     private RabbitTemplate rabbitTemplate;


  6.     private Logger logger = LoggerFactory.getLogger(RabbitmqTest.class);


  7.     @Test


  8.     public void testDead() {


  9.         rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,


  10.                 RabbitBindConfig.SKYPYB_ORDINARY_KEY, "消息体");


  11.         rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,


  12.                 RabbitBindConfig.SKYPYB_ORDINARY_KEY, "消息体");


  13.         logger.info("----- 消息发送完毕 -----");


  14.     }


  15. }

最终控制台结果, 确实实现了延时队列的功能:

2020-01-12 11:14:17.582 INFO 12032 — [main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-12 11:14:22.599 INFO 10576 — [cTaskExecutor-2] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

2020-01-12 11:14:22.599 INFO 10576 — [cTaskExecutor-1] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

除了队列 TTL 以外,粒度为消息级别的 TTL 也是可以设置的。

SpringAMQP 对单条消息的 TTL 设置,需要在 MessageProperties 类中进行,每个消息都会内置一个此类。

为了方便,SpringAMQP 在消息发送流程中提供了一个钩子可以让我们设置 Message 的属性,那就是 MessagePostProcessor

  1. @FunctionalInterface


  2. public interface MessagePostProcessor {


  3. Message postProcessMessage(Message message) throws AmqpException;


  4. default Message postProcessMessage(Message message, Correlation correlation) {


  5. return postProcessMessage(message);


  6. }


  7. }

既然他用了 @FunctionalInterface 注解,那为了方便我就用 lambda 表达式写一个,设置单个消息的 TTL 为 3 秒:

  1. @RunWith(SpringRunner.class)


  2. @SpringBootTest(classes = Application.class)


  3. public class RabbitmqTest {


  4.     @Autowired


  5.     private RabbitTemplate rabbitTemplate;


  6.     private Logger logger = LoggerFactory.getLogger(RabbitmqTest.class);


  7.     @Test


  8.     public void testDead() {


  9.         rabbitTemplate.convertAndSend(


  10.                 RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,


  11.                 RabbitBindConfig.SKYPYB_ORDINARY_KEY,


  12.                 "消息体",


  13.                 (msg) -> {


  14.                     msg.getMessageProperties().setExpiration("3000");


  15.                     return msg;


  16.                 });


  17.         rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,


  18.                 RabbitBindConfig.SKYPYB_ORDINARY_KEY, "消息体");


  19.         logger.info("----- 消息发送完毕 -----");


  20.     }


  21. }

将代码修改后再次发送,控制台输出:

2020-01-12 11:51:22.788 INFO 26232 — [main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-12 11:51:25.787 INFO 10576 — [cTaskExecutor-4] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

2020-01-12 11:51:27.784 INFO 10576 — [cTaskExecutor-5] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

可以看到,嘿 果不其然,消息接收的有时间差别了,正好符合设置的消息 TTL 3s 和队列 TTL 5s 。

但是,这个功能是有缺陷的

这是使用 RabbitMQ 死信机制来作为延时任务必定会出现的不足之处

下面解释一下

4、RabbitMQ 死信实现方式缺陷

将上边的发送消息代码,顺序调转一下,如下所示:

  1. @RunWith(SpringRunner.class)


  2. @SpringBootTest(classes = Application.class)


  3. public class RabbitmqTest {


  4.     @Autowired


  5.     private RabbitTemplate rabbitTemplate;


  6.     private Logger logger = LoggerFactory.getLogger(RabbitmqTest.class);


  7.     @Test


  8.     public void testDead() {


  9.         rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,


  10.                 RabbitBindConfig.SKYPYB_ORDINARY_KEY, "消息体");


  11.         rabbitTemplate.convertAndSend(


  12.                 RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,


  13.                 RabbitBindConfig.SKYPYB_ORDINARY_KEY,


  14.                 "消息体",


  15.                 (msg) -> {


  16.                     msg.getMessageProperties().setExpiration("3000");


  17.                     return msg;


  18.                 });


  19.         logger.info("----- 消息发送完毕 -----");


  20.     }


  21. }

运行代码,结果,执行偏离了想象… 控制台打印:

2020-01-12 15:00:19.371 INFO 9680 — [main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-12 15:00:24.380 INFO 10576 — [cTaskExecutor-1] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

2020-01-12 15:00:24.380 INFO 10576 — [cTaskExecutor-3] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

可以看到,消费者消费消息时,都等了整整 5s  !

◾ 这是为什么?

这是因为 RabbitMQ 的特性导致的。

RabbitMQ 的队列是一个 FIFO 的有序队列,投入的消息都顺序的压进 MQ 中。

而 RabbitMQ 也只会对队尾的消息进行超时判定,所以就出现了上述的情况。

即哪怕第二条在第 3 秒时就过期了,但由于第一条消息 5 秒过期,RabbitMQ 会等待到第一条被丢弃后,才对第二条进行判断。最终出现了第一条过期后第二条才跟着过期的结果。

结语

其实就平时可能遇见的场景而言,使用 RabbitMQ 的死信机制就已经足够了。

毕竟大部分延时任务都是固定时间的,比如下单后半小时未支付则关闭订单这种场景。

只要场景是有着固定时间的延时任务的话, RabbitMQ 无疑可以很好的承担起这个需求。

针对标题的疑问作出回答的话,可以说出:

RabbitMQ 死信机制能作为延时任务这个场景的解决方案

但是,由于 RabbitMQ 消息死亡并非异步化,而是阻塞的。所以无法作为复杂延时场景——需要每条消息的死亡相互独立这种场景 下的解决方案。


(完)

MarkerHub文章索引:(点击阅读原文直达)

https://github.com/MarkerHub/JavaIndex


【推荐阅读】

SpringBoot 全局日期格式化(基于注解)

理解这9大内置过滤器,才算是精通Shiro

花30分钟,用Jenkins部署码云上的SpringBoot项目

只需要6个步骤,springboot集成shiro,并完成登录

eblog项目讲解视频上线啦,长达17个小时!!



好文!必须点赞

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存