高并发:RocketMQ 削峰实战!
你关注的就是我关心的!
来源:https://juejin.im/post/5ea159e4f265da47f0794da5
Producer:生产发送消息 Broker:存储Producer发送过来的消息 Consumer:从Broker拉取消息并进行消费 NameServer:为Producer或Consumer路由到Broker
RocketMQ的Consumer获取消息是通过向Broker发送拉取请求获取的,而不是由Broker发送Consumer接收的方式。 Consumer每次拉取消息时消息都会被均匀分发到消息队列再进行传输,所以RocketMQ中的很多参数都是针对队列而不是Topic的(这个是重点,顺便吐槽下源码的文档讲的真不清晰,很多都需要自己试错,但Dashboard做得很好),其中每个Broker消息队列(ConsumeQueue)的数量都可以通过RocketMQ DashBoard实时更改调整。
rocketmq-spring-boot-starter用法简介
rocketmq-spring-boot-starter
相关类:RocketMQListener
接口:消费者都需实现该接口的消费方法onMessage(msg)
。RocketMQPushConsumerLifecycleListener
接口:当@RocketMQMessageListener
中的配置不足以满足我们的需求时,可以实现该接口直接更改消费者类DefaultMQPushConsumer
配置@RocketMQMessageListener
:被该注解标注并实现了接口RocketMQListener
的bean为一个消费者并监听指定topic队列中的消息,该注解中包含消费者的一些常用配置(大部分按默认即可),一般只需更改consumerGroup(消费组)与topic。RocketMQMessageListener
中的属性配置是可以使用Placeholder(占位符)从配置文件或配置中心获取的,如下图:
业务案例
环境配置
文章例子环境:1NameServer + 2Broker + 1Consumer
添加maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
复制代码
application.yml配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: praise-group
server:
port: 10000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: tiger
url: jdbc:mysql://localhost:3306/wilson
swagger:
docket:
base-package: io.rocket.consumer.controller
复制代码
点赞接口
PraiseRecord(点赞记录):
@Data
public class PraiseRecord implements Serializable {
private Long id;
private Long uid;
private Long liveId;
private LocalDateTime createTime;
}
复制代码
MessageController(简单的测试接口):
RestController
@RequestMapping("/message")
public class MessageController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/praise")
public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
return ServerResponse.success();
}
// ......
}
复制代码
sendOneyWay()
进行消息发送。RocketMQ的消息发送方式主要含syncSend()同步发送、asyncSend()异步发送、sendOneWay()三种方式,sendOneWay()也是异步发送,区别在于不需等待Broker返回确认,所以可能会存在信息丢失的状况,但吞吐量更高,具体需根据业务情况选用。 性能:sendOneWay > asyncSend > syncSend RocketMQTemplate的send()方法默认是同步(syncSend)的,更多可看源码实现。
PraiseListener:点赞消息消费者
@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {
@Resource
private PraiseRecordService praiseRecordService;
@Override
public void onMessage(PraiseRecordVO vo) {
praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 每次拉取的间隔,单位为毫秒
consumer.setPullInterval(2000);
// 设置每次从队列中拉取的消息数为16
consumer.setPullBatchSize(16);
}
}
MessageStoreConfig.maxTransferCountOnMessageInMemory
(默认为32)值限制,即若想要消费者从队列拉取的消息数大于32有效(pullBatchSize>32)则需更改Broker的启动参数maxTransferCountOnMessageInMemory
值。在MQ削峰的配置参数里,以下几个DefaultMQPushConsumer
的参数是需要注意一下的:pullInterval:每次从Broker拉取消息的间隔,单位为毫秒 pullBatchSize:每次从Broker队列拉取到的消息数,该参数很容易让人误解,一开始我以为是每次拉取的消息总数,但测试过几次后确认了实质上是从每个队列的拉取数(源码上的注释文档真的很差,跟没有一样),即Consume每次拉取的消息总数如下: EachPullTotal=所有Broker上的写队列数和(writeQueueNums=readQueueNums) * pullBatchSize
consumeMessageBatchMaxSize:每次消费(即将多条消息合并为List消费)的最大消息数目,默认值为1,rocketmq-spring-boot-starter 目前不支持批量消费(2.1.0版本)
上线了但消费效率预估失误如何动态更改消费效率 ?
如何使用RocketMQ批量消费 ?
DefaultMQPushConsumer
并配置其consumeMessageBatchMaxSize
属性。consumeMessageBatchMaxSize
属性默认值为1,即每次只消费一条消息,需要注意的是该属性也会受pullBatchSize
影响,如果consumeMessageBatchMaxSize
为32但pullBatchSize
只为12,那么每次批量消费的最大消息数也就只有12。如下为个人测试批量消费Consumer的测试bean:@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
// 设置每次消息拉取的时间间隔,单位毫秒
consumer.setPullInterval(1000);
// 设置每个队列每次拉取的最大消息数
consumer.setPullBatchSize(24);
// 设置消费者单次批量消费的消息数目上限
consumer.setConsumeMessageBatchMaxSize(12);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)
-> {
List<UserInfo> userInfos = new ArrayList<>(msgs.size());
Map<Integer, Integer> queueMsgMap = new HashMap<>(8);
msgs.forEach(msg -> {
userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val);
});
log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos);
/*
处理批量消息,如批量插入:userInfoMapper.insertBatch(userInfos);
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
return consumer;
}
consumeMessageBatchMaxSize
与pullBatchSize
,且pullBatchSize
较小,所以每次消费的消息数最大值为12,如下图:附本文相关信息
确保mqnamesrv与mqbroker已启动成功,如该文章环境的启动: mqnamesrv -n 127.0.0.1:9876
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-a.properties
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-b.propertiesRocketMQ DashBoard启动流程可参考官方github文档或到我的资源里下载jar包运行 源码地址(https://github.com/Wilson-He/spring-boot-series/tree/master/spring-rocketmq),2m-noslave目录是该文章中例子中的2master broker配置与启动脚本,spring-boot-consumer-peak目录为包含该文章相关代码的实际例子