架构师带你学习延时消息常见实现方案
上一篇:RabbitMQ、RocketMQ、Kafka 三元归一
大家好,我是顶级架构师。
前言
实现方案
1. 基于外部存储实现的方案
这里讨论的外部存储指的是在 MQ 本身自带的存储以外又引入的其他的存储系统。
基于 数据库(如MySQL)
CREATE TABLE `delay_msg` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`delivery_time` DATETIME NOT NULL COMMENT '投递时间',
`payloads` blob COMMENT '消息内容',
PRIMARY KEY (`id`),
KEY `time_index` (`delivery_time`)
)
实现简单;
B+Tree索引不适合消息场景的大量写入;
基于 RocksDB
RocksDB LSM 树很适合消息场景的大量写入;
实现方案较重,如果你采用这个方案,需要自己实现 RocksDB 的数据容灾逻辑;
基于 Redis
本方案来源于:基于Redis实现延时队列服务
Messages Pool 所有的延时消息存放,结构为KV结构,key为消息ID,value为一个具体的message(这里选择Redis Hash结构主要是因为hash结构能存储较大的数据量,数据较多时候会进行渐进式rehash扩容,并且对于HSET和HGET命令来说时间复杂度都是O(1)) Delayed Queue是16个有序队列(队列支持水平扩展),结构为ZSET,value 为 messages pool中消息ID,score为过期时间**(分为多个队列是为了提高扫描的速度)** Worker 代表处理线程,通过定时任务扫描 Delayed Queue 中到期的消息
Redis ZSET 很适合实现延时队列 性能问题,虽然 ZSET 插入是一个 O(logn) 的操作,但是Redis 基于内存操作,并且内部做了很多性能方面的优化。
定时线程检查的缺陷与改进
2. 开源 MQ 中的实现方案
RocketMQ
RocketMQ 开源版本支持延时消息,但是只支持 18 个 Level 的延时,并不支持任意时间。只不过这个 Level 在 RocketMQ 中可以自定义的,所幸来说对普通业务算是够用的。默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
通俗的讲,设定了延时 Level 的消息会被暂存在名为SCHEDULE_TOPIC_XXXX
的topic中,并根据 level 存入特定的queue,queueId = delayTimeLevel – 1,**即一个queue只存相同延时的消息,保证具有相同发送延时的消息能够顺序消费。**broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
下面是整个实现方案的示意图,红色代表投递延时消息,紫色代表定时调度到期的延时消息:
Level 数固定,每个 Level 有自己的定时器,开销不大 将 Level 相同的消息放入到同一个 Queue 中,保证了同一 Level 消息的顺序性;不同 Level 放到不同的 Queue 中,保证了投递的时间准确性; 通过只支持固定的Level,将不同延时消息的排序变成了固定Level Topic 的追加写操作
Level 配置的修改代价太大,固定 Level 不灵活 CommitLog 会因为延时消息的存在变得很大
Pulsar
内存开销: 维护延时消息索引的队列是放在堆外内存中的,并且这个队列是以订阅组(Kafka中的消费组)为维度的,比如你这个 Topic 有 N 个订阅组,那么如果你这个 Topic 使用了延时消息,就会创建 N 个 队列;并且随着延时消息的增多,时间跨度的增加,每个队列的内存占用也会上升。(是的,在这个方案下,支持任意的延时消息反而有可能让这个缺陷更严重) 故障转移之后延时消息索引队列的重建时间开销: 对于跨度时间长的大规模延时消息,重建时间可能会到小时级别。(摘自 Pulsar 官方公众号文章) 存储开销:延时消息的时间跨度会影响到 Pulsar 中已经消费的消息数据的空间回收。打个比方,你的 Topic 如果业务上要求支持一个月跨度的延时消息,然后你发了一个延时一个月的消息,那么你这个 Topic 中底层的存储就会保留整整一个月的消息数据,即使这一个月中99%的正常消息都已经消费了。
QMQ
如果对时间轮不熟悉的可以阅读笔者的这篇文章 从 Kafka 看时间轮算法设计
时间轮算法适合延时/定时消息的场景,省去延时消息的排序,插入删除操作都是 O(1) 的时间复杂度; 通过多级时间轮设计,支持了超大时间跨度的延时消息; 通过延时加载,内存中只会有最近要消费的消息,更久的延时消息会被存储在磁盘中,对内存友好; 延时消息单独存储(schedule log),不会影响到正常消息的空间回收;
总结
参考
blog.itpub.net/31555607/viewspace-2672190
www.cnblogs.com/hzmark/p/mq-delay-msg.html
mp.weixin.qq.com/s/_wnwBgZgQhjLP14APlQTkA
github.com/qunarcorp/qmq/blob/master/docs/cn/arch.md
github.com/apache/rocketmq
欢迎大家进行观点的探讨和碰撞,各抒己见。如果你有疑问,也可以找我沟通和交流。扩展:接私活儿
最后给读者整理了一份BAT大厂面试真题,需要的可扫码回复“面试题”即可获取。
「顶级架构师」建立了读者架构师交流群,大家可以添加小编微信进行加群。欢迎有想法、乐于分享的朋友们一起交流学习。
扫描添加好友邀你进架构师群,加我时注明【姓名+公司+职位】
版权申明:内容来源网络,版权归原作者所有。如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。
猜你还想看
Spring Boot 配置 HTTPS 详细流程,还有谁不会?