【064期】面试官问:RabbitMQ 本身不支持延迟队列,那么如何实现?
>>号外:关注“Java精选”公众号,回复“面试资料”,免费领取资料!“”小程序,3000+ 道面试题在线刷,最新、最全 Java 面试题!
RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL。
以下介绍下相关概念及方法
Dead Letter Exchanges
消息在队列满足达到一定的条件,会被认为是死信消息(dead-lettered),这时候,RabbitMQ会重新把这类消息发到另外一个的exchange,这个exchange称为Dead Letter Exchanges.
以下任一条件满足,即可认为是死信:
消息被拒绝消费(basic.reject or basic.nack)并且设置了requeue=fasle 消息的TTL到了(消息过期) 达到了队列的长度限制
需要注意的是,Dead letter exchanges (DLXs) 其实就是普通的exchange,可以和正常的exchange一样的声明或者使用。
死信消息路由
队列中可以设置两个属性:
x-dead-letter-exchange x-dead-letter-routing-key
当这个队列里面的消息成为死信之后,就会投递到x-dead-letter-exchange指定的exchange中,其中带着的routing key就是中指定的值x-dead-letter-routing-key。,内涵3000+道高级面试题,点击即可进入(微信搜索:Java精选面试题,也可以),支持在线随时刷题。
而如果使用默认的exchange(routing key就是希望指定的队列),则只需要把x-dead-letter-exchange设置为空(不能不设置),类似下面
死信消息的路由则会根据x-dead-letter-routing-key所指定的进行路由,如果这个值没有指定,则会按照消息一开始发送的时候指定的routing key进行路由
例如,如果一开始你对exchange X发送消息,带着routing key “foo”,进入了队列 Q然后消息变死信后,他会被重新发送到 dead letter exchange ,其中发给dead letter exchange带着的routing key 还是foo。但如果这个队列Q本身是设置了x-dead-letter-routing-key bar, 那么他发送到 dead letter exchange的时候,带着的routing key 就是bar。,内涵3000+道高级面试题,点击即可进入(微信搜索:Java精选面试题,也可以),支持在线随时刷题。
需要注意的是,当死信消息重新路由到新的队列的时候,在死信目标队列确认收到这条死信消息之前,原来队列的消息是不会删除的,也就是说在某些异常场景下例如broker突然shutdown,是有机会存在说一个消息既存在于原队列,又存在于死信目标队列。具体可参考官方说明:
Time-To-Live(TTL)
开头我们说过,实现延迟队列除了用死信消息外,还需要利用消息过期的TTL机制,因为只要消息过期了,就会触发死信。
RabbitMQ有两种方法让设置消息的TTL:
直接在消息上设置
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
为队列设置消息过期TTL
注意,队列还有一个队列TTL,x-expires,这个的意思是队列空置经过一段时间(没有消费者,没有被重新声明,没有人在上面获取消息(basic.get))后,整个队列便会过期删除,不要混淆
如果同时设置了消息的过期和队列消息过期属性,则取两个较小值。
设计延迟队列:
例如,我们需要触发一个推送新闻,30分钟后统计这个新闻的下发情况,我们就需要一个延迟队列,新闻推送后,往延迟队列发送一个消息,这个队列的消息在30分钟后被消费,这时候触发即可统计30分钟的下发情况。我们可以这样设计:
定义一个正常的队列:ARRIVAL_STAT,统计程序监听此队列,进行消费。
定义一个“延迟队列”(RabbitMQ没有这样的队列,这里只是人为的制造一个这样的队列):DELAY_ARRIVAL_STAT,其中设置好对应的x-dead-letter-exchange,x-dead-letter-routing-key。为了简单说明,我使用默认的exchange,那么配置如下:
x-dead-letter-exchange=“”
x-dead-letter-routing-key=“ARRIVAL_STAT”
意思是,消息当这个队列DELAY_ARRIVAL_STAT的消息变死信之后,就会带着routing key “ARRIVAL_STAT”发送默认的空exchange,即队列ARRIVAL_STAT。
并且这个队列不能有消费者消费消息。
这样我们就实现了消息的死信转发。下一步,只需要让消息在这个DELAY_ARRIVAL_STAT在30分钟后过期变死信即可。按照上文所说,有两种方法,我们可以为队列的消息设置30分钟TTL,或者发送消息的时候指定消息的TTL为30分钟即可。,内涵3000+道高级面试题,点击即可进入(微信搜索:Java精选面试题,也可以),支持在线随时刷题。
示例如下:
“延迟队列”的堵塞缺陷
由于设置了x-dead-letter-exchange的队列本身也是普通队列,其过期的顺序是按照队列头部顺序的过期的。也就是说,如果你队列头的消息A过期时间是5分钟,后面对这个队列发送消息B的带着过期时间1分钟,那么后面的队列B要等队列A过期了才会触发过期:
所以,对于此类多延迟时间的,可以考虑设置多级延迟队列。例如1分钟,5分钟,10分钟,20分钟这样多级的延迟队列,使得延迟相近的尽量放到同一个队列中减少拥堵的最坏情况。
作者:薛定谔的风口猪
jaskey.github.io/blog/2018/08/15/rabbitmq-delay-queue/
【054期】面试官问:为什么 Kafka 比其他 MQ 消息队列效率高?
【055期】面试官问:分布式集群环境中如何解决定时任务多次执行的问题?
【056期】谈谈关于 IO 同步、异步、阻塞、非阻塞有什么区别?
【058期】面试官问:Java 线程中如何保证通信,都有哪些方式?
【059期】面试官问:序列化是什么,为什么要序列化,如何实现?
【060期】面试官问:如何快速实现不同 Object 对象中相同属性赋值?
【061期】面试中经常被问到 Java 引用类型原理,深入源码剖析