查看原文
其他

消息中间件你该了解的秘密

回复 1024 有特别礼包


作者:黑白搬运工

来源:juejin.cn/post/6913807875806593038

上一篇:SpringBoot接口幂等性实现的4种方案!

1.前言

你知道的系统开发过程中会使用消息中间件进行消息的异步处理系统之间解耦系统流量削峰。在使用消息中间件的过程中我们需要了解以下场景:

  • 如何与我们的开发框架SpringBoot进行集成
  • 如何发送消息
  • 如何发送复杂消息
  • 如何保证发送消息的可靠性
  • 如何消费消息
  • 如何保证消费消息的可靠性
  • 如何保证消费者的可扩展性
  • 如何使用消费者进行流量削峰

以这些场景为基础开启本文的写作,本文是消息中间件RabbitMQ为例

2. 与SpringBoot集成

2.1 添加依赖

想要通过SpringBoot框架集成RabbitMQ是一件相对比较容易的事情,只需要在pom文件中添加对应的starter即可

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 添加MQ服务配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: boot-example

2.3 注入消息模板

@Autowired
private RabbitTemplate rabbitTemplate;

2.4 发送消息

public void sendMessage() {
    rabbitTemplate.convertAndSend("test""test""mq produce send a message");
}

2.5 消费消息

@Component
@Slf4j
public class MqConsumer {
    
    @RabbitListener(id = "consumerMessage1", queues = "test")
    public void consumeMessage1(Message message, Channel channel, String content) {
        log.info("receive message1 :{}", content);     
    }

3. 如何发送复杂的消息

系统开发过程中,与其它系统之间往往会以json的形式进行复杂对象交互,这就要求我们设置消息的序列化和反序列化转换器

在公众号顶级架构师后台回复“架构整洁”,获取一份惊喜礼包。

3.1 生产者设置消息转换器

@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    configurer.configure(rabbitTemplate, connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}

3.2 消费者设置消息转换器

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    return factory;
}

3.3 消费者指定监听容器工厂

@RabbitListener(queues = "test3", containerFactory = "rabbitListenerContainerFactory")
public void consumeComplexMessage(Order order) {
    log.info("receive complex message:{}", order);
}

4.发送消息可靠性

4.1 为什么要保证发送消息的可靠性

关于发送消息可靠性问题,先来看看官方文档的说明:

A RabbitMQ node can lose persistent messages if it fails before said messages are written to disk. For instance, consider this scenario:

  1. a client publishes a persistent message to a durable queue
  2. a client consumes the message from the queue (noting that the message is persistent and the queue durable), but confirms are not active,
  3. the broker node fails and is restarted, and
  4. the client reconnects and starts consuming messages

At this point, the client could reasonably assume that the message will be delivered again. This is not the case: the restart has caused the broker to lose the message. In order to guarantee persistence, a client should use confirms. If the publisher's channel had been in confirm mode, the publisher would not have received an ack for the lost message (since the message hadn't been written to disk yet).

大致的意思就是RabbitMQ消息服务器在将消息写到磁盘上之前可能由于宕机问题导致持久化消息丢失。如果生产者channcel设置确认模式,可以保证消息不丢失,因为只有写到磁盘上的消息,生产者才会收到ack通知。

4.2 如何保证发送消息可靠性

4.2.1 添加配置

rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true

4.2.2 指定回调函数

实现RabbitTemplate.ConfirmCallbackRabbitTemplate.ReturnCallback接口

@Configuration
@Slf4j
public class MqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    /**
     * 消息服务器返回的basic.ack
     *
     * @param correlationData 关联数据对象
     * @param ack ack
     * @param cause 异常信息
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("receive ack confirm:{} from broker server", ack);
    }

    /**
     * 消息服务器返回的basic.return
     *
     * @param message 消息对象
     * @param replyCode 响应code
     * @param replyText 响应文本
     * @param exchange 交换机
     * @param routingKey 路由key
     */

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("receive return message:{} from broker server,reply code:{},reply text:{}," +
                "exchange:{},routing key:{}", message.toString(), replyCode, replyText, exchange, routingKey);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        configurer.configure(rabbitTemplate, connectionFactory);
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

4.2.3 confirm和returnedMessage

乍一看comfirm()回调函数就可以实现发送消息的可靠性,returnedMessage()回调函数看上去似乎有点多余,这个观点在一定条件下是可以成立的,来看下官方文档的说明:

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack

消息首先会发送到exchange,exchange再根据相关规则将消息路由到相应队列,针对exchange无法路由的消息,消息服务器会先返回basic.return,接着在返回basic.ack,也就是会先回调returnedMessage()函数,再回调confirm()函数。正常情况下消息服务器将消息持久化之后会返回basic.ack并且comfirm()回调函数的ack参数为true。但是针对exchange无法路由消息的情况,confirm()回调函数参数中的ack返回的也是true,该情况下仅仅依靠comfirm()回调函数是无法保证消息的可靠性,需要结合returnedMessage()回调函数;如果能确保exchange与队列之间一定可以路由成功,confirm()回调函数就可以确保发送消息的可靠性。

5. 消息消息的可靠性

5.1 为什么要保证消费消息的可靠性

关于消费消息的可靠性问题,先来看看官方文档的说明:

在公众号编程技术圈后台回复“Java”,获取一份惊喜礼包。

When a node delivers a message to a consumer, it has to decide whether the message should be considered handled (or at least received) by the consumer. Since multiple things (client connections, consumer apps, and so on) can fail, this decision is a data safety concern. Messaging protocols usually provide a confirmation mechanism that allows consumers to acknowledge deliveries to the node they are connected to. Whether the mechanism is used is decided at the time consumer subscribes.

Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit ("manual") client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:

默认情况下消息服务器在将消息派发给消费者后会立马删除消息,此模式下可能由于connection断开、channel断开、消费者异常这些特殊场景从而导致消息丢失,无法保证消息的可靠性。想要保证消费消息的可靠性,需要引入一种确认机制,也就是消息服务器在派发消息后不立马删除消息,只有收到消费者的ack之后才删除消息,如果消费者与消息服务器断开连接,消息服务器需要将消息重新派发给其它消费者进行消费。

5.2 如何保证消费消息的可靠性

官方文档示例:

boolean autoAck = false;
// 将autoAck设置为false
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         
{
             long deliveryTag = envelope.getDeliveryTag();
             // negatively acknowledge, the message will
             // be discarded
             channel.basicReject(deliveryTag, false);
         }
     });

关键部分就是channel.basicConsume()的第二个参数需要设置为false

SpringBoot集成:

默认值选择的是自动模式,可以满足消息的可靠性,关于ack模式枚举值解释

如果使用SpringBoot就不需要担心消费消息可靠性的问题了

6.如何保证消费者的可扩展性

一个队列存在多个消费者时消息服务器会以轮询的方式将消息推送给消费者,基于此当消息服务器积压消息时,可以通过增加机器提升消费能力,消费者天然具备横向扩展的能力。

7. 如何使用消费者进行流量削峰

正常情况下消息服务器有消息就会将消息派发给消费者,如果派发的过程是无限制的,那么就会增加消费者系统的负载,严重情况下会导致消费者系统无法对外提供服务,消费者如何做到流量削峰呢?答案就是设置QoS

Because messages are sent (pushed) to clients asynchronously, there is usually more than one message "in flight" on a channel at any given moment. In addition, manual acknowledgements from clients are also inherently asynchronous in nature. So there's a sliding window of delivery tags that are unacknowledged. Developers would often prefer to cap the size of this window to avoid the unbounded buffer problem on the consumer end. This is done by setting a "prefetch count" value using the basic.qos method. The value defines the max number of unacknowledged deliveries that are permitted on a channel. Once the number reaches the configured count, RabbitMQ will stop delivering more messages on the channel unless at least one of the outstanding ones is acknowledged.

大致意思就是通过basic.qos来设置阀值,当消费者持有的未确认消息数超过阀值,消息服务器不会再将消息派发给该消费者,从而实现流量削峰

8.参考文献

  • Consumer Acknowledgements and Publisher Confirms




公众号后台回复 架构 或者 架构整洁 有惊喜礼包!顶级架构师交流群

 「顶级架构师」建立了读者架构师交流群,大家可以添加小编微信进行加群。欢迎有想法、乐于分享的朋友们一起交流学习。

扫描添加好友邀你进架构师群,加我时注明姓名+公司+职位】


版权申明:内容来源网络,版权归原作者所有。如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

猜你还想看

1.3 万亿条数据查询,如何做到毫秒级响应?
Nginx架构及基本数据结构分析
绝了!这款工具让SpringBoot不再需要Controller、Service、DAO、Mapper!
后端服务不得不了解之限流

点个在看少个 bug👇

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存