Spring Cloud Stream整合Rabbit之重复投递
(给ImportNew加星标,提高Java技能)
SpringCloudStream
整合Rabbit
时,消费端在处理失败时,如果需要进行重试,可以有如下几种重试机制:
方法1(默认):
当消费端在处理消息时抛出异常,那么默认会在当前线程的3次的Retry。该方法是默认的,可以通过修改配置文件,指定channel下的参数,例如:
1 2 3 4 5 6 7 8 9 10 | spring: cloud: stream: bindings: input-test-event: destination: test-event group: test-group binder: rabbit consumer: max-attempts: 1 |
其中:
max-attempts
如果等于1,就是不重试;max-attempts
如果大于1,其值就是重试次数。
当消息重试超过最大次数,如果未配置启用DLQ
,消息将会被丢弃。该方法默认是无法设置重试的时间间隔的。
方法2:
方法1是在当前线程进行重试,相当于阻塞了后面的消息,有时我们不想阻塞,则可以利用死信队列(Dead Letter Queue, 缩写DLQ
),进行异步重试。
先看一下DLQ
的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 | spring: cloud: stream: bindings: input-test-event: destination: test-event group: test-group binder: rabbit rabbit: bindings: input-test-event: consumer: autoBindDlq: true |
设置spring.cloud.stream.rabbit.bindings.<channelName>.consumer.autoBindDlq
参数为true,将自动创建对应channel的DLQ
,绑定死信交换机(Dead Letter Exchange, 缩写DLX
)。默认该queue
的名字就是其对应destination
.group
后追加.dlq
,同时,该进入该queue
的消息的routingKey
即为原destination
。
按上面的配置,消息进入DLQ
以后,因为没有任何的消费者,消息会一直存储于DLQ
中,可以添加dlqTtl
参数设置消息在DLQ
中生存的时间,在无消费者的情况下,默认到期后会删除该消息。
如果想指定DLQ
的名称,可以用deadLetterQueueName
参数指定。
重试的逻辑其实就是利用DLQ
,给其设置一个默认的exchange
,在TTL
时间到期后,消息会再度转到指定的exchange
对应的queue
中。
为了实现该逻辑,需要配置三个参数:
autoBindDql
设置为true,启用DLQ
dlqTtl
设置一个死信消息超时时间,变相实现了重试的间隔时间dlqDeadLetterExchange
增加该参数后,留空即为设置默认值。在默认值情况下,DLQ
中的消息将会按照其routingKey
的值(也可由deadLetterRoutingKey
参数指定),将消息投递到给名称对应该值的quque
,实现消息的重新消费。
例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | spring: cloud: stream: bindings: input-test-event: destination: test-event group: test-group binder: rabbit rabbit: bindings: input-test-event: consumer: autoBindDlq: true dlqTtl: 5000 dlqDeadLetterExchange: |
注意:
因为在配置中,设置了
group
这个参数,当该参数使用时,默认rabbit的durable
参数是启用的,即该channel的exhange
和queue
是持久化的,应用退出后,不会自动清除,并且保存创建时的参数。所以当改变了channel的参数后,需要将queue删除,让其自动重建,否则新改的不会生效,则无法实现自动重试。
按照上面配置,删除旧queue
后重新启动应用,创建queue
信息如下:
消息重新投递后,在其header
里,会增加一些重试的信息,如下图所示:
deliveryAttempt
值代表在当前线程的重试次数,即方法一的重试逻辑x-death
头记录了重试循环的一些详细信息,尤其是值count
记录了经由DLQ
异步重试的次数。
但有时,我们想知道上一次错误的具体异常,此时可以增加republishToDlq
参数,当设置为true时,会在消息头里增加详细的异常和异常堆栈信息。
注:
当
republishToDlq
设置为不同值时,routingKey
的取值逻辑不同。当为false时,取的是x-death
头中第一个的routing-keys
值;当为true时,取得是X_ORIGINAL_ROUTING_KEY_HEADER
这个Header的值。
此时,该消息将不断重复queue -> DLQ -> queue的循环(假设消费端一直拒绝或抛异常)。如果我们想设置重试次数大于3就不再重试,可以抛出ImmediateAcknowledgeAmqpException
这个异常,则该消息被丢弃,不再进入DLQ
。
关于消息的拒绝
前面对于消息的拒绝,都是采用抛异常,但是这个异常不能乱抛。不同的异常,框架处理的方式不同:
普通的异常,等同于
AmqpRejectAndDontRequeueException
,会导致消息重试ImmediateAcknowledgeAmqpException
这个异常,会导致消息被丢弃不触发重试
有时候,我们不期望在生产的日志中出现重试的ERROR,可以考虑用下面的方案:
将消费端的
acknowledgeMode
从默认的自动改为手动,即acknowledgeMode: MANUAL
将channel注入到消费端,手动处理,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | @Component @EnableBinding(TestSink.class) public class TestConsumer { private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class); private static final Long MAX_RETRY = Long.valueOf(3L); @StreamListener(TestSink.INPUT) public void consume(Message message, @Header(name = AmqpHeaders.CHANNEL, required = false) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag, @Header(name = "x-death", required = false) Map<?,?> death) throws IOException { logger.info("收到消息:{}", message); if(death!=null && death.get("count")!=null && Long.valueOf(death.get("count").toString()).compareTo(MAX_RETRY)>=0){ logger.error("放弃该消息"); channel.basicAck(deliveryTag, false); return; } //c channel.basicReject(deliveryTag, false); } } |
转自:Edison Xu
链接:http://edisonxu.com/2022/01/28/spring-cloud-stream-rabbit.html
- EOF -
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
点赞和在看就是最大的支持❤️