查看原文
其他

从1到亿,如何玩好异步消息?CQRS架构下的异步事件治理实践

王旭 哔哩哔哩技术 2024-03-20

本期作者



王旭

哔哩哔哩资深开发工程师


引言


哔哩哔哩已有接近一亿的日均活跃用户,用户互动非常频繁,这也为后端系统带来巨大挑战,为了实现更好的架构扩展性,我们采用了微服务+ CQRS的架构,在这种架构下,又会带来哪些问题,我们又是如何解决的呢?本文介绍异步事件处理railgun平台,其已经帮助近800个业务应用构建高性能、高稳定性的异步系统,接下来,我们将从一个简单的业务例子开始,一步步介绍异步事件的演进与治理之路。


从「1」开始


我们就拿一个业务系统普遍存在的点赞功能作为例子,有一天老板说:“我们要做一个视频点赞的功能,这个功能看着很简单嘛,让校招生小明做一下好了。”

而你是校招生小明,心想,不就是个点赞按钮嘛,我们先来分析一下产品功能:



点赞的功能核心有两点:

  1. 统计点赞计数

  2. 查询点赞状态

于是你设计了如下的数据库表结构:


CREATE TABLE `counts` (  `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',  `video_id` bigint(20) UNSIGNED NOT NULL default 0 COMMENT '视频id',  `count` bigint(20) NOT NULL default 0 COMMENT '点赞计数',  `mtime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',  `ctime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  PRIMARY KEY (`id`)) COMMENT='计数表'; CREATE TABLE `actions` (  `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',  `video_id` bigint(20) UNSIGNED NOT NULL default 0 COMMENT '视频id',  `user_id` bigint(20) UNSIGNED NOT NULL default 0 COMMENT '用户id',  `state` tinyint(4) UNSIGNED default 0 NOT NULL COMMENT '点赞状态',  `mtime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',  `ctime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  PRIMARY KEY (`id`)) COMMENT='行为表';


点赞系统流程:



这里增加了一个点赞的服务,用户请求到达点赞服务后,点赞服务则请求后台的数据库集群,将点赞记录修改并返回,点赞服务本身无状态,方便水平扩展,状态记录维护在数据库中。

在用户流量非常小的时候这种架构简单、直接、有效。


CPU资源问题


随着用户请求量逐步增加,扩展性的问题逐步暴露出来:

首先是点赞服务CPU、内存资源到达瓶颈,这个问题很好解决,因为点赞服务本身是无状态的,只要增加点赞服务处理节点就行,我们可以借助K8S的能力将将点赞服务快速水平扩容。

接着随之而来的是数据库的CPU吃满,由于所有用户请求都会透传到DB,大量消耗数据库的CPU,数据库的CPU可是比无状态的服务节点"昂贵"的多,而且我们很难像无状态服务一样按需随意扩容,所以最好尽量减少到数据库层面的压力,由应用层承担计算逻辑,在这个业务场景是典型的读多写少的场景,我们引入缓存,将架构调整一下,如图:



用户点赞写入的时候,我们采用适合读多写少的经典模式Cache Aside方式,先写数据库,写入成功后再更新缓存。

查询的时候优先从缓存查询,只有数据不在缓存时才需要从数据库中查询,查询到之后再回塞缓存,避免下次重复查询数据库。

由于我们增加了一层缓存,大量的读请求被缓存承担,数据库的压力大大降低了。

然而好景不长,有一天晚上DBA给你打电话说,说数据库告警,“连接过多,连接池快满了”。


连接数问题


DBA告诉你:数据库能承载的客户端连接是有限的,连接数过多,数据库服务器负载会增加,性能下降,所以在会在数据库服务器设置一个相对安全的连接池大小,一但达到数据库配置的连接池上限,新请求无法获取到连接,就会引起连接超时或者连接拒绝的问题。


你思考了一下,由于每个点赞服务的实例都要与后端MySQL实例建立连接,所以点赞服务的节点一多,那连接数自然就多了,而我们有了一层缓存,回到数据库的请求没有那么多了,没必要都每个节点都连数据库,那么你调整一下架构,加一个数据库访问代理服务:



数据库锁竞争问题


某一天晚上,一个百万粉UP主发了一个非常火爆的视频,大家纷纷对视频点赞,你在后台看着监控同时点赞量达到了一个新的峰值,还没来得及高兴,这时候你手机响了,电话报警说数据库大量超时不可用,粉丝们看着手机弹窗的报错"点赞失败,服务器内部错误",纷纷在微博留言发帖B站服务器又炸了,你赶紧起床复盘现在的服务架构。

这次事故是因为架构存在两个问题:

  1. 数据库锁竞争

  2. 用户点赞路径对数据库是强依赖

当大家同时给一个视频点赞的时候,在数据库层面修改的是一条记录,引起了数据库的行锁竞争,导致数据库连接等待、性能下降,最终导致接口请求超时失败,其次是强依赖的问题,在查询点赞状态的路径下,大量请求被缓存抗住,回到数据库的请求比较少,用户感知不明显,而在点赞的写路径下,所有的请求直接到数据库,数据库发生的任何故障都会被用户明显感知。

这时候你请教了公司里面的架构师,架构师给你说了一个词CQRS,你通过了解知道了这是一种将命令(写操作)和查询(读操作)分离的一种模式,使用这种模式能规避锁竞争以及解决写流程直接依赖DB的问题,你赶紧改造了一下系统架构:



在写入流程中,你加入了业界普遍选用的消息队列Kafka,用户点赞时,点赞服务将消息投递给Kafka后就直接返回,又新增了一个Job服务负责消费异步事件,从Kafka中获取到消息后处理和写入至后续的MySQL和缓存中,在这种架构下,就算DB再次故障用户点赞也不会直接报错了,用户操作时,投递到消息队列就算成功,至于数据,可以等数据库恢复后,在Job里面重新消费。

至于数据库锁竞争的问题,只要避免对同一条记录同时写入就可以规避 ,设置Kafka的生产者分区策略为Key-Ordering,然后将视频id当做消息键,这样同一个视频的消息只会被一个单个job消费串行处理,竞争问题解决了。



重复消费问题


新的架构上线没几天,你在巡检服务的时候发现日志中有一些kafka is rebalancing的日志,原来从kafka消费数据的时候不能保证只消费一次,消息在一些场景下可能会重复投递,最常见的原因是job消费节点重启时触发rebalance,这时消息会重复投递,你赶紧确认了一下在这种情况下是否会重复计算点赞数,发现还好,用户点赞的时候会先从行为表中检查是否已经点赞过了,幸好只是虚惊一场,如果没有行为表,就要设计一下该如何检查了,你想了想,这种情况可以引入一个外部存储,比如缓存,当做临时行为表的角色,记录消息是否已经处理过了,处理过的事件记录一下,当下次再次拿到消息时检查一下消费状态即可。


消费能力不足


新需求来了,老板让你为用户评论也加入点赞功能,上线后,点赞服务流量大了好几倍,新的问题出现了,用户反馈,点赞了之后很久之后计数才变更,你检查了一下后端服务,发现由于接入评论后,点赞应用处理不完这么多的消息,导致数据堆积,而新的数据还不断涌入,按照现在的处理速度永远也消费不完,那怎么解决呢?


增加消费节点

你临时想了一个解决方案,既然是消费能力不足那么扩容嘛,去增加job消费节点,在容器管理平台操作增加了一倍的job应用数量,然而发现消费速度并没有什么变化,怎么回事呢,调查发现原来已经达到了Kafka的分区数量限制,新的job节点无法被分配到分区,拿不到消息,自然无法处理了。你紧急跟运维同学打电话,增加一倍的Kafka分区数量,这时新增的job应用终于获取到了数据,消费速度上来了,这次问题临时缓解了。

提升单节点消费能力

你回顾了一下问题,这次是通过增加分区数量来解决的,那么分区数量是可以无限增加的吗?你问了一下运维同学,得到答复,不可以,原因是:

增加kafka集群复杂度和机器资源消耗

影响kafka集群可用性,增加集群恢复时间

并且告诉你现在已经达到最大限制,不能增加分区了,你想,那么有没有一种在分区不变的情况下提升消费能力的方式?你反思了一下现在的消费模式:



Job收到事件后排队挨个进行处理,假设处理一个事件10毫秒的话,那么单个应用一秒钟最多只能处理100个事件,这时候你想到可以使用多线程来做,提升并发能力,那么我们改造一下处理方式:



为了避免线程数过多导致OOM,你增加一个线程池,为了增加事件下发速度并且维持事件前后顺序,配合增加了一些内存队列,当收到事件后分发到不同的内存队列中,每个内存队列绑定独立的处理线程,这下,只要修改线程数大小就可以提升消费能力了,对于事件前后顺序,只要根据消息key选择合适的内存队列即可,然而事情往往不会这么简单,你测试了一下发现一个严重的问题:

应用重启的时候会丢失消息

原因是分布在不同线程的数据处理完后都会对消息标记ACK,而对kafka来说,只会记录当前分区最大位置,重启前如果先处理了新消息,老的消息还在其他的线程中没有被处理时,重启后,未被处理的消息也不会被获取到了,那要怎么解决呢?你想了一晚上,设计了如下的方案:



job应用在获取到消息后,将事件用链表的方式有序串联起来,并在每个事件上增加处理标记,当消息被处理后,将消息标记成为已处理,定期检查链表状态,某个消息前面所有的事件都被处理后,才将消息设置ACK状态,通过这种方式,解决了事件丢失的问题。

数据聚合

除了上面的方法,还能怎么样提升消费能力呢?你分析了一下点赞的处理流程:

先将用户点赞事件写入Likes表

再修改count表计数+1

其实第二步还有优化空间,只要将对同一个视频的N个点赞聚合起来,一次性加N,就能大大减少数据库的修改次数,说干就干,你设计了以下的结构:



job获取到消息后,先将消息放入内存队列中,被独立的线程处理,这时候进行幂等处理和写入Likes表的操作,然后接下来将事件发往下一个内存队列,在这个队列中有单独的线程负责事件聚合,在聚合到一定的数量或者时间后,将事件批量打包,一次性更新数据库,在程序关闭的时候做好优雅关闭将聚合事件处理完再退出,并做好panic保护,避免进程崩溃的可能,采用这种聚合模式,大大减少了写入DB的请求次数,性能又提高了。

减少ACK

通过测试发现还有一个问题,程序每次处理完事件之后都要对事件进行ACK,告知Kafka已经处理结束,消息越多,ACK通信越多,而对于Kafka来说,并没有维护每个消息的ACK状态,只会标记单个分区的最大确认位置,那么我们可以做几个优化:

  1. 将同步ACK改异步ACK

  2. 当消息ACK时,只修改内存对应分区的位置信息,将网络IO变为内存操作

  3. 减少ACK次数,改为定时同步ACK状态

通过上面这5种方式的优化,经过压测发现,job的处理速度提升了10倍以上。


流量控制


新架构上线没多久,又遇到了新的问题,在一次流量高峰,后端数据库崩了,由于之前的架构消费能力太强,导致数据库扛不住那么大量的写入速度,虽然提升数据库处理能力是长久之计,但是当务之急是限制消费速度,于是你调研了一下限流方案:

  1. 使用redis实现全局限流

  2. 使用令牌桶、漏桶等限流算法单机限流

你思考了一下,如果使用redis限流的话消费事件都会增加网络调用的成本,一旦redis超时、抖动会影响消费速率,在这个场景不太适合这种方案,如果是第二种方案,虽然避免了网络问题,但是一旦每个节点的流量差异比较大的话,会有提前限流的问题,于是你结合了两者的特点,利用本地令牌桶的方式来限制消费速度,如果消费速度超过限制大小,就等待新的令牌生成,再配合一个中央控制节点管理每个节点的令牌生成速率,实现了高可用的全局限流,加上这层保护后,稳定性更高了。


热点事件问题


自认为的完美架构,往往在现实面前不堪一击,没错,系统又出问题了,还是消费延迟,用户点赞后计数迟迟不更新,跟上次不一样,这次的原因是大量用户聚集在同一个视频下面点赞,产生了严重的数据倾斜,而在上述的架构中同个视频的点赞只会被单个线程处理,影响到了同一节点下的其他视频,扩大了影响范围,于是你思考了一下解决方式,首先是提升单个视频下的处理性能、优化处理逻辑,另一方面还可以对热点问题做一下事件隔离,避免影响到其他的视频,流程如下:



当发现队列阻塞,根据处理耗时和出现频率自动分析和统计出最热的key,将这些key转发到独立的热点队列中去,然后启动处理线程从新的热点队列中消费数据,隔离热点,避免其他视频受到影响。

经过以上优化,再也不用担心热点拖垮其他视频了。


错误重试


在每周巡检的时候你发现事件处理成功率是99.99%,有万分之一的事件处理失败,这往往是依赖的数据库、接口偶发的超时导致的,虽然百分比不高,但累积起来一天的量也不少了,那如何优雅的重试呢,有两种策略:

  1. 就地重试

  2. 消息重投

在就地重试中,为了减少重试抖动,你使用指数退避的算法来缓解重试压力,在消息重投你使用了跟热点转移类似的方式将失败消息重投到新的队列,并设置了重试队列的消费延迟时间,在这次错误重试策略调整后,已经基本没有失败的情况了。


MQ故障


系统稳定运行了半年后,新的黑天鹅来了,MQ系统故障,所有用户点赞失效,这次事故是一个MQ系统隐藏很深的bug,在晚上被外部因素偶然触发,导致MQ完全不可用,虽然搭建了灾备集群,由于灾备系统也是一样的代码逻辑,运维操作切换集群也无法恢复,看着运维同学忙忙碌碌查找了很久才将问题恢复,你想业务系统在这种场景下能怎么做呢,有什么办法快速止损、去除对异步系统的强依赖,尽量减少对用户影响呢?

现在的架构,MQ系统虽然是多副本高可用的,但是总有例外会导致失效,从整体架构上来说,MQ系统确实也是一个逻辑单点,同构的降级方案总是不太靠谱,虽好是完全异构的方式,那么是否可以引入一个新的MQ,作为降级手段呢?你想了一下这种方案问题太多了,比如维护、机器成本翻倍、怎么确保故障时可用、故障时敢去切换呢?使用这种方式成本很高,于是你设想了一种新的降级方式:



正常情况下生产方发送消息到kafka,被job节点消费,当kafka故障时,发送方自动切换到降级模式,将消息直接推向消费方,进入消费方的内存队列中,利用job现成的多级内存队列作为缓冲区,通过这种方式保障在主通道故障时消息链路的可用性。

这种方式有几个条件:

  1. 发送方需要能自动化的感知下游变更,包括增减下游处理方,以及下游处理节点变化

  2. 需要尽可能的保障消息的前后顺序关系,可以利用一致性Hash的方式选择下游节点,或者配合原Kafka的分区分配情况作为选择节点的辅助信息

  3. 消费方日常要留有一定容量冗余,避免容量不足导致的降级失效

上线这个降级方案后,成功规避多次kafka不可用导致的问题,大大降低了消息队列故障引发的系统风险。


处理流程全景图


处理流程如下,小明的故事讲完了,这也是我们实际曾经发生和处理过的问题总结。



平台化


为了帮助其他业务更好的聚焦于业务逻辑,避免以上问题反复出现,我们将上述能力进行了抽象和增强,定义了一套统一的消息编程模型,辅以控制面保证配置的实时下发与生效,成为了一个统一的异步事件管控平台。

统一消息编程模型

我们将Kafka、Pulsar、内部Databus等消息组件抽象为消息发送与处理两部分,使用时无须关注具体组件的代码差异,并提供了极简的接入方式:

消息发送示例:


producer, err := railgun.NewProducer("id") // 发送Bytes格式的消息   err = producer.SendBytes(ctx, "test", []byte("test"))   // 发送string格式的消息   err = producer.SendString(ctx, "test", "test")   // json格式编码   err = producer.SendJSON(ctx, "test", struct{ Demo string }{Demo: "test"})   // 批量发送消息   err = producer.SendBatch(ctx, []*message.Message{{Key: "test", Value: []byte("test")}})


事件处理示例:

提供了并发消费、聚合消费等多样的消费SDK套件,使用时只需要提供消息解析与处理的方法,前文提到的所有消费模式都能在控制面按需控制。


// 按需创建多样的处理器processor := single.New(unpack, do) // or batch.New(unpack, preDo, batchDo) or something...// 执行consumer, err := railgun.NewConsumer("id", processor)


控制面

控制面支持了以下功能:

1. 自定义调整消息处理模式、以及提供多种的数据聚合处理策略

2. 基于外部存储的幂等判断

3. Kafka、Pulsar、Databus等多种数据源的切换能力

4. 分布式全局消费速度管理

5. 告警管理

6. 全链路压测的功能控制

7. 可观测性方面

  • 可视化的消息查看与发送的调试能力

  • 消息生命周期血缘图

  • 在线节点信息展示

  • 落后差值与速率预估

  • 运行状态可视化

8. 数十种常见问题的自动化诊断与干预

  • 消息分区不均强制打散

  • 无限重试的逻辑的干预

  • 消费瓶颈检测

  • 降级问题排查

  • 分区情况排查

我们的平台也在不断的完善中,未来计划在以下方面进行增强:

  1. 更加精细化的热key识别与干预

  2. 功能模板与运行时托管能力


总结


我们通过定义统一的消息编程模型,实现统一控制面并提供适配SDK来解决了异步事件处理的常见问题,希望本文能对大家设计高性能、高可靠的异步系统有所帮助。


以上是今天的分享内容,如果你有什么想法或疑问,欢迎大家在留言区与我们互动,如果喜欢本期内容的话,欢迎点个“在看”吧!


往期精彩指路

继续滑动看下一个
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存