查看原文
其他

Spring Boot RabbitMQ 优先级队列

Anoyi SpringForAll社区 2020-10-17

Anoyi

读完需要

10分钟

速读仅需4分钟



   

Docker With RabbitMQ

官方 Docker 镜像仓库地址

  • https://hub.docker.com/_/rabbitmq

本地运行 RabbitMQ

  1. docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

访问可视化面板

  • 地址:http://127.0.0.1:15672/

  • 默认账号:guest

  • 默认密码:guest



   

Spring Boot With RabbitMQ

Spring Boot 集成 RabbitMQ

  1. <dependency>

  2. <groupId>org.springframework.boot</groupId>

  3. <artifactId>spring-boot-starter-amqp</artifactId>

  4. </dependency>


基本参数配置

  1. # host & port

  2. spring.rabbitmq.host=127.0.0.1

  3. spring.rabbitmq.port=5672

Queue / Exchange / Routing 配置

  1. /**

  2. * RabbitMQ 配置

  3. */

  4. @Configuration

  5. public class RabbitMQConfig {


  6. private static final String EXCHANGE = "priority-exchange";


  7. public static final String QUEUE = "priority-queue";


  8. private static final String ROUTING_KEY = "priority.queue.#";


  9. /**

  10. * 定义优先级队列

  11. */

  12. @Bean

  13. Queue queue() {

  14. Map<String, Object> args= new HashMap<>();

  15. args.put("x-max-priority", 100);

  16. return new Queue(QUEUE, false, false, false, args);

  17. }


  18. /**

  19. * 定义交换器

  20. */

  21. @Bean

  22. TopicExchange exchange() {

  23. return new TopicExchange(EXCHANGE);

  24. }


  25. @Bean

  26. Binding binding(Queue queue, TopicExchange exchange) {

  27. return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);

  28. }


  29. }


priority queue 定义参考官方文档:https://www.rabbitmq.com/priority.html


Spring Boot 应用启动后,会自动创建 Queue 和 Exchange ,并相互绑定,优先级队列会有如图所示标识。




   

RabbitMQ Publisher

Spring Boot 相关配置

  1. # 是否开启消息发送到交换器(Exchange)后触发回调

  2. spring.rabbitmq.publisher-confirms=false

  3. # 是否开启消息发送到队列(Queue)后触发回调

  4. spring.rabbitmq.publisher-returns=false

  5. # 消息发送失败重试相关配置

  6. spring.rabbitmq.template.retry.enabled=true

  7. spring.rabbitmq.template.retry.initial-interval=3000ms

  8. spring.rabbitmq.template.retry.max-attempts=3

  9. spring.rabbitmq.template.retry.max-interval=10000ms

  10. spring.rabbitmq.template.retry.multiplier=1

发送消息

  1. @Component

  2. @AllArgsConstructor

  3. public class FileMessageSender {


  4. private static final String EXCHANGE = "priority-exchange";


  5. private static final String ROUTING_KEY_PREFIX = "priority.queue.";


  6. private final RabbitTemplate rabbitTemplate;


  7. /**

  8. * 发送设置有优先级的消息

  9. *

  10. * @param priority 优先级

  11. */

  12. public void sendPriorityMessage(String content, Integer priority) {

  13. rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,

  14. message -> {

  15. message.getMessageProperties().setPriority(priority);

  16. return message;

  17. });

  18. }


  19. }



   

RabbitMQ Consumer

Spring Boot 相关配置

  1. # 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)

  2. spring.rabbitmq.listener.simple.acknowledge-mode=AUTO

  3. # 最小线程数量

  4. spring.rabbitmq.listener.simple.concurrency=10

  5. # 最大线程数量

  6. spring.rabbitmq.listener.simple.max-concurrency=10

  7. # 每个消费者可能未完成的最大未确认消息数量

  8. spring.rabbitmq.listener.simple.prefetch=1


消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。


监听消息

  1. @Slf4j

  2. @Component

  3. public class MessageListener {


  4. /**

  5. * 处理消息

  6. */

  7. @RabbitListener(queues = "priority-queue")

  8. public void listen(String message) {

  9. log.info(message);

  10. }


  11. }


   

番外补充

1、自定义消息发送确认的回调

  • 配置如下:

  1. # 开启消息发送到交换器(Exchange)后触发回调

  2. spring.rabbitmq.publisher-confirms=true

  3. # 开启消息发送到队列(Queue)后触发回调

  4. spring.rabbitmq.publisher-returns=true

  • 自定义 RabbitTemplate.ConfirmCallback 实现类


  1. @Slf4j

  2. public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{

  3. @Override

  4. public void confirm(CorrelationData correlationData, boolean ack, String cause) {

  5. log.info("消息唯一标识: {}", correlationData);

  6. log.info("确认状态: {}", ack);

  7. log.info("造成原因: {}", cause);

  8. }

  9. }

  • 自定义 RabbitTemplate.ConfirmCallback 实现类


  1. @Slf4j

  2. public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {

  3. @Override

  4. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

  5. log.info("消息主体: {}", message);

  6. log.info("回复编码: {}", replyCode);

  7. log.info("回复内容: {}", replyText);

  8. log.info("交换器: {}", exchange);

  9. log.info("路由键: {}", routingKey);

  10. }

  11. }

  • 配置 rabbitTemplate

  1. @Component

  2. @AllArgsConstructor

  3. public class RabbitTemplateInitializingBean implements InitializingBean {

  4. private final RabbitTemplate rabbitTemplate;

  5. @Override

  6. public void afterPropertiesSet() {

  7. rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());

  8. rabbitTemplate.setReturnCallback(new RabbitReturnCallback());

  9. }

  10. }

2、RabbitMQ Exchange 类型

  • 中文:RabbitMQ Exchange类型详解

  • English: RabbitMQ Tutorials


推荐: Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现

上一篇:在Spring Boot中格式化JSON日期



 关注公众号

点击原文阅读更多


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存