Spring Cloud Stream整合Rabbit之重复投递
(给ImportNew加星标,提高Java技能)
SpringCloudStream 整合Rabbit 时,消费端在处理失败时,如果需要进行重试,可以有如下几种重试机制:
方法1(默认):
当消费端在处理消息时抛出异常,那么默认会在当前线程的3次的Retry。该方法是默认的,可以通过修改配置文件,指定channel下的参数,例如:
| 1 2 3 4 5 6 7 8 9 10 | spring: cloud: stream: bindings: input-test-event: destination: test-event group: test-group binder: rabbit consumer: max-attempts: 1 |
其中:
max-attempts如果等于1,就是不重试;max-attempts如果大于1,其值就是重试次数。
当消息重试超过最大次数,如果未配置启用DLQ ,消息将会被丢弃。该方法默认是无法设置重试的时间间隔的。
方法2:
方法1是在当前线程进行重试,相当于阻塞了后面的消息,有时我们不想阻塞,则可以利用死信队列(Dead Letter Queue, 缩写DLQ ),进行异步重试。
先看一下DLQ 的逻辑。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | spring: cloud: stream: bindings: input-test-event: destination: test-event group: test-group binder: rabbit rabbit: bindings: input-test-event: consumer: autoBindDlq: true |
设置spring.cloud.stream.rabbit.bindings.<channelName>.consumer.autoBindDlq 参数为true,将自动创建对应channel的DLQ ,绑定死信交换机(Dead Letter Exchange, 缩写DLX )。默认该queue的名字就是其对应destination.group 后追加.dlq ,同时,该进入该queue 的消息的routingKey 即为原destination 。
按上面的配置,消息进入DLQ 以后,因为没有任何的消费者,消息会一直存储于DLQ 中,可以添加dlqTtl 参数设置消息在DLQ中生存的时间,在无消费者的情况下,默认到期后会删除该消息。
如果想指定DLQ的名称,可以用deadLetterQueueName 参数指定。
重试的逻辑其实就是利用DLQ ,给其设置一个默认的exchange ,在TTL 时间到期后,消息会再度转到指定的exchange 对应的queue 中。
为了实现该逻辑,需要配置三个参数:
autoBindDql设置为true,启用DLQdlqTtl设置一个死信消息超时时间,变相实现了重试的间隔时间dlqDeadLetterExchange增加该参数后,留空即为设置默认值。在默认值情况下,DLQ中的消息将会按照其routingKey的值(也可由deadLetterRoutingKey参数指定),将消息投递到给名称对应该值的quque,实现消息的重新消费。
例如:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | spring: cloud: stream: bindings: input-test-event: destination: test-event group: test-group binder: rabbit rabbit: bindings: input-test-event: consumer: autoBindDlq: true dlqTtl: 5000 dlqDeadLetterExchange: |
注意:
因为在配置中,设置了
group这个参数,当该参数使用时,默认rabbit的durable参数是启用的,即该channel的exhange和queue是持久化的,应用退出后,不会自动清除,并且保存创建时的参数。所以当改变了channel的参数后,需要将queue删除,让其自动重建,否则新改的不会生效,则无法实现自动重试。
按照上面配置,删除旧queue 后重新启动应用,创建queue 信息如下:
消息重新投递后,在其header 里,会增加一些重试的信息,如下图所示:
deliveryAttempt值代表在当前线程的重试次数,即方法一的重试逻辑x-death头记录了重试循环的一些详细信息,尤其是值count记录了经由DLQ异步重试的次数。
但有时,我们想知道上一次错误的具体异常,此时可以增加republishToDlq 参数,当设置为true时,会在消息头里增加详细的异常和异常堆栈信息。
注:
当
republishToDlq设置为不同值时,routingKey的取值逻辑不同。当为false时,取的是x-death头中第一个的routing-keys值;当为true时,取得是X_ORIGINAL_ROUTING_KEY_HEADER这个Header的值。
此时,该消息将不断重复queue -> DLQ -> queue的循环(假设消费端一直拒绝或抛异常)。如果我们想设置重试次数大于3就不再重试,可以抛出ImmediateAcknowledgeAmqpException 这个异常,则该消息被丢弃,不再进入DLQ。
关于消息的拒绝
前面对于消息的拒绝,都是采用抛异常,但是这个异常不能乱抛。不同的异常,框架处理的方式不同:
普通的异常,等同于
AmqpRejectAndDontRequeueException,会导致消息重试ImmediateAcknowledgeAmqpException这个异常,会导致消息被丢弃不触发重试
有时候,我们不期望在生产的日志中出现重试的ERROR,可以考虑用下面的方案:
将消费端的
acknowledgeMode从默认的自动改为手动,即acknowledgeMode: MANUAL将channel注入到消费端,手动处理,例如:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | @Component @EnableBinding(TestSink.class) public class TestConsumer { private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class); private static final Long MAX_RETRY = Long.valueOf(3L); @StreamListener(TestSink.INPUT) public void consume(Message message, @Header(name = AmqpHeaders.CHANNEL, required = false) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag, @Header(name = "x-death", required = false) Map<?,?> death) throws IOException { logger.info("收到消息:{}", message); if(death!=null && death.get("count")!=null && Long.valueOf(death.get("count").toString()).compareTo(MAX_RETRY)>=0){ logger.error("放弃该消息"); channel.basicAck(deliveryTag, false); return; } //c channel.basicReject(deliveryTag, false); } } |
转自:Edison Xu
链接:http://edisonxu.com/2022/01/28/spring-cloud-stream-rabbit.html
- EOF -
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
点赞和在看就是最大的支持❤️