消息可靠性、重复消息、消息积压、利用消息实现分布式事务
一、如何确保消息不丢失?
1、检测消息丢失的方法
2、确保消息可靠传递
2.1、生产阶段
try {
producer.send(record).get();
System.out.println("消息发送成功");
} catch (Exception e) {
System.out.println("消息发送失败");
System.out.println(e);
}
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
System.out.println(exception);
}
}
});
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
System.out.println(exception);
}
});
2.2、存储阶段
2.3、消费阶段
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable = "${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable = "${spring.rabbitmq.listener.order.exchange.durable}", type = "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}"))
@ RabbitHandler
//Order需要实现序列化接口
public void onMessage(@Payload Order order, @Headers Map < String, Object > headers, Channel channel) throws Exception {
//处理业务逻辑
System.out.println("消费端:" + order);
//手工ACK
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
3、小结
二、如何处理消费过程中的重复消息?
1、消息重复的情况必然存在
2、用幂等性解决重复消息问题
3、几种常用的设计幂等操作的方法
三、消息积压了该如何处理?
1、优化性能来避免消息积压
2、消息积压了该如何处理?
四、如何利用事务消息实现分布式事务?
1、什么是分布式事务?
2、消息队列是如何实现分布式事务的?
3、RocketMQ中的分布式事务实现
文章不错?点个【在看】吧! 👇