查看原文
其他

设计消息中间件时我关心什么?(解密电商数据一致性与完整性实现,含PPT)

2016-07-04 余昭辉 高可用架构

导读:应对高可用及极端峰值,每个技术团队都有自己的优秀经验,但是这些方法远没有得到体系化的讨论。高可用架构在 6 月 25 日举办了『高压下的架构演进』专题活动,进行了闭门私董会研讨及对外开放的四个专题的演讲,期望能促进业界对应对峰值的方法及工具的讨论,本文是去哪儿网余昭辉介绍设计电商消息中间件的设计经验。


余昭辉去哪儿网 基础架构部架构师,2011 年加入去哪儿网,经历过去哪儿网从小到壮大,服务拆分的过程。现负责去哪儿网基础架构部,参与设计和开发去哪儿大大小小各种基础组件。本人对互联网电商中间件,并发和异步编程尤为感兴趣,是一个自我标榜 Clean Coder。热衷于与同行技术交流。


我来自去哪儿的基础架构部,我们部门负责公司的公共组件和基础服务,包括敏感信息存储、发号器、身份证认证、监控中心、任务调度、Redis 等。




今天主要给大家分享一下消息队列基础组件的设计。


我们是 2012 年初开始自研消息队列和消息中间件的,当时也是契合公司背景,原来公司有一些庞大的单模块系统,如机票交易系统和酒店交易系统等。为了对系统进行拆分,面临着系统拆分之后事务处理的问题,于是自研了消息中间件。


(小编:上文滴滴passport设计之道:帐号体系高可用的7条经验也多次提到了大系统做小的经验与实践,感兴趣的读者可以参阅)


现在去哪儿网基本所有交易环节都通过消息的方式流转,使用了一种消息驱动的架构,像订单的流转、支付等,消息中间件已经成为核心基础设施,对交易系统非常关键。最初设计消息中间件是为了满足交易场景,后来大家觉得 API 使用非常方便,现在其他业务包括部分搜索等等功能也切到这个上面来。


截止目前,除了部分搜索场景是 AMQ,公司其他业务的都使用了自研的消息中间件,一些基本数据如下:


  • 承载公司 1 万多个 subject

  • 平均接收消息量 QPS 12 万+

  • 峰值 QPS 50 万

  • 最多的一个消息 subject 有 180 个消费组来消费


消息中间件模型说简单也很简单,最小单元就是一条消息,所以伸缩、扩展非常容易,只要根据消息进行 hash。当中间件承受不住压力的候,扩展是非常简单的。另外一方面说它复杂也很复杂,消息中间件作为一个公司的基础组件,如果它出问题就是一个很严重的事情。消息中间件出一次故障,就是 6 ~ 7 个部门报 P1 故障。


这便是它的复杂所在,如何保证它的正常运行,那么在介绍内容之前,先说明一下上下文:今天讲的仅仅是适用交易环节的消息中间件,跟通常所说的社交领域的消息中间件有很大的不同。


在交易环节,需要考虑 3 个方面 :


  1. 不能丢消息。丢消息意味着掉单,意味着支付成功但是没给人家出票,这是不能接受的。

  2. 稳定。消息中间件一旦出问题,交易不能进行,也是严重的故障。

  3. 性能


在电商的场景前面两条要高于性能要求,也是今天要重点讨论的部分。


典型的消息中间件包含 3 部分 :producer(发布者)、broker(消息中间件)、consumer(消费者),是一个比较简单的模型,下图展示了把消息发送给 consumer 的全过程。 




Producer 发布端设计


Producer 消息发布端主要关注一致性、容灾、性能。


分布式事务一致性的难题



上图是一个订票的订单服务,生成的新订单。如果订单持久化成功(上面的红框 ),消息发送失败( 下面的红框 )。那么用户看到下单成功,假设代理商服务订阅这个消息是否给用户出票,那么现在的情况就是票没出来,这会引起用户投诉。但是如果先发消息,然后再持久化订单,那可能就是订单出票了,但其实这个订单还没下呢,这就会造成公司的损失。


这种一致性问题怎么解决?


大家可能会想到分布式事务,比如 2PC(Two phase commit)。业内已经有很多文章介绍了分布式事务的利弊,它的成本还是比较高,在此不做讨论,下面主要介绍电商系统中应用比较多的另外一种方法。


数据库的一致性


先来看一下数据库中的事务




在一个 DB 实例中,比如 3306 这个,使用同一个连接,对多个库的操作是可以放在同一个事务里的。这样是可以保证数据的一致性,这是由数据库决定。比如图中的业务 DB1,业务 DB2 和一个消息 DB 都在同一个实例里,是可以放在同一个事务里的。


业务数据的一致性保证


有了数据库这层保证,就可以用这种方式来实现业务操作和消息发送了。


先将订单持久化在同一个事务里,共享订单操作的数据库连接,在这个连接里将消息持久化到同一个实例里的消息库里,然后在事务提交之后将消息发送到消息的 server。如果事务回滚了,消息就不会发送出去了。




详细看一下流程图 :




  1. 开启事务。

  2. 业务操作,比如说订单进行持久化等等动作。

  3. 生成消息,并存储,这和业务操作是在同一个事务里。

  4. 事务提交。

  5. 消息真正发到出去。消息如果发送成功了,会将消息表里的消息删除,而此时如果消息发送失败了,后台有一个任务会把消息表里面发送失败的消息重新进行发送,这样最终达到一致性,保证业务操作成功了,消息一定能发出去。


(小编:更多了解分布式事务的实现,可参看文末推荐阅读的文章)


现在看看这种模型的优缺点。这种模型 API 非常简单,业务开发只需要使用 sendMessage 这个简单的 API,不需要关心事务等。同时运维也非常简单,我们的做法是公司的 DBA 给所有 DB 实例上预初始化一个消息库,业务根本不用关心,对业务完全是透明,API 把这些封装在底下,使用起来还是非常简单。 但是这样有另外一个问题,就是存储成本。本来只有业务操作访问 DB,然后还要持久化消息。原来承受一个 QPS 现在可能只能承受一半了,所以对数据库操作还是略重一些


另外,有的场景中,可能不仅做数据库操作,还调用了 RPC。这样的动作是不能放在一个数据库事务里的,所以对于这种场景就不能满足了。现在遇到这种情况只有把 RPC 这种操作拆出去了。所以这种模型的优点就是使用方便,但是有些限制。


还有另外一种实现一致性的方法。



  1. 发新的消息,直接发给 broker,这个消息发给 broker 并不立即将消息投递出去。

  2. 做本地的操作

  3. 再调 broker 的接口,这一步真正把消息发送出去。如果这个时候,即使第二步操作成功,第三步发送失败了,第一步发送给 broker 的消息就是一个未决状态。

  4. broker 反过来询问 producer,那条消息是发还是不发出去呢?这种模型就不需要一个将一个消息库放在业务库同实例了,比较灵活,成本也更低些。但是业务使用的复杂度可能要高一些,需要提供一个接口供 broker 反查。 


容错


讲完了一致性,再来看看容错。broker 会有不同的集群,producer 发消息有一个优先级,默认消息优先发到本机房集群,本机房宕掉或者其他什么原因不可用再向别的机房进行发送。本机房出故障自动不向本机房发送,自动熔断故障机房。后台系统里面可以按照 subject 指定路由到特定的 broker 集群。


Broker 消息中间件设计


消息中间件要支持非常多的 subject,全公司都在使用消息中间件,各业务开发水平也参差不齐,如果有的系统弄了一个死循环,疯狂的发消息就会给系统带来不可控的压力,所以中间层需要做好隔离。其次,业务使用消息中间件可能会遇到各种各样的问题,需要辅助工具进行诊断。最后还需要全面的监控能力。 


隔离


隔离包括配额和调度。Producer 给 broker 发消息的时候,每一个 subject 需要给它多少配额,QPS 一旦高于这个配额,做什么处理?


这个地方我们也踩了一个小坑。假设给每个 subject 3000 QPS 限制,最初 producer 端的设计没有考虑配额这种情况,配额生效之后,达到 3000 QPS broker 开始拒绝消息,也就是返回异常。 Producer 一般的设计遇到这种异常时候就不断地重试,这种一拒绝 producer 就不断地重试,雪上加霜,带宽都要打满了。


公司业务部门比较多,因为不能要求所有 producer 都立即配合升级,于是我们做了改进,producer 达到 QPS 不是立即拒绝发送过来的消息,而是拖一会儿,这样来避免将中间层拖垮。当然最好的方式是 producer 进行配合,当 broker 超配额,producer 降低发送速率。


还有一种情况,我们有很多个 subject,有 180 个 consumer group 进行订阅,如果 QPS 达到 100,就是 1.8 万,如果 QPS 达到 1000,就是 18 万,180 倍的增长,所以怎么与其它 subject 进行隔离很重要。 


我们第一版做的很简单,用线程池隔离,每个 subject 分配一个线程池,这种做法隔离效果是很好,但是 subject 不断地增长,资源就不够用了。这个问题抽象来看就像操作系统的 Scheduler 线程调度器,每一个队列可以想象成 OS 里的线程,然后系统用一些来发送这些队列,这些线程就对应 CPU 的 core。我们就模仿 Linux 的 Scheduler 实现了个调度,可能实现水平关系,但是最后测试发现效果很差,量一大起来,队列就堵住了,完全发不出去消息。


最后我们看 actor 这种模型,系统里可以跑成千上万的 actor,它肯定也有一个调度器,最后就模仿 akka 的调度器,它叫 dispatcher,实现了调度的策略。


可治理


像刚才配额都是可以动态调整,不能消息量突然上来了,重启消息系统来调整。还有消息可靠级别,当消息上线时,我们要关心消息 QPS 能达到,能容忍多少丢失?如果这个时候消息中间件出问题了,我们就可以根据上线时可靠级别给有的消息降级。


降级也有很多种策略,比如仅仅给投递一次,如果中间件出问题,这种消息就投递一次算了,不管你是否消费成功,都不给你重发。还有重发次数,比如有的消费者那边有问题,消费不成功,比如消费格式变了,重发一天也消费不了,就可以实时的去调整消息的重发次数。还有可以按多少比例给它发消息,比如说 50%,那么 50% 的消息就给抛弃。


还有日志,为了好查问题,每条消息都有轨迹日志,出问题的时候就可以有选择的是否保存这些日志了。一个公司不可能所有消息都要求 100% 可靠。比如订单支付的消息级别是最高的,但是搜索,比如说现在有报价,代理商帮旗下所有的酒店价格变了一下,关心它的系统就要受到价格的更新,这是通过消息广播出去的。这个消息,它的变动是比较频繁,QPS 也很高,丢了一两条消息,可能又被后面的消息覆盖了,它的可靠级别就比订单的级别要低,这个时候遇到问题,为了保护订单消息,肯定首先对它进行降级。


辅助工具


消息的发送投递轨迹可视化,消息回溯、消息补发等等。发一条消息过来,这条消息什么时候接收到?什么时候进行投递,投递到哪些消费者是要有可视化,这样便于用户查找问题。


消息回溯。比如消费者把消息的内容理解错了,几号到几号之间所有的消息都要进行重新发送,这样就要回溯这段时间的消息。


消息补发。漏发的消息,把消息重新补发一下,用户可以上传一个文件,将这些文件里的内容解析成消息然后发送。


显示消息的发送和消费的关系。系统发出了哪些 topic 的消息,系统订阅了哪些 topic 的消息,有哪些消费有哪些订阅了,这些都非常重要。


监控


监控分为两块。一个是指标监控,比如像 QPS 监控,耗时等。可以细化到 subject、consumer 等粒度。第二个是链路监控、全链路跟踪,这是另外一个产品 QTracer 做的,可以根据消息 id 来查这个消息所关联的链路,来看看这个消息的情况。


Consumer 消费端设计


上下线控制


上下线的策略,如果消费端应用还没有启动成功的时候,消息就已经过来,这是不能接受的。我们使用了一个和 nginx 差不多的方法,利用一个 healthcheck.html,如果有这个文件,就把消费者上线。发布系统将应用发布之后,会检查应用是否 ready,ready 之后就会 touch 一下这个文件,然后 consumer 就上线了。另外就是可以手动屏蔽消费端,比如一个消费组有多台机器,可以屏蔽其中几台。


幂等


幂等分两种实现。有的业务是可以处理幂等的,借助消息里的业务字段然后根据业务场景。但有的业务可能不太好实现幂等,我们的客户单默认提供了幂等的措施,比如基于 Redis、MySQL 等。


关于顺序


消息中间件是不严格保证顺序的,只是尽量保证有序,一般情况下先发送的消息先到,但并不做出这种承诺。要保证顺序对实现方式和成本都是不小的挑战。


使用方一般怎么来保证顺序呢?


一种是状态机。涉及交易的系统一般都有状态机。比如订单流转,假设现在订单状态是待支付,业务收到支付成功的消息,订单就流转成支付成功,这个时候收到了订单完成或者出票成功这样的消息,这个消息不是对应的当前状态,都会进行拒绝,拒绝后消息的 server 稍后会重发。 


另外一种方法是 producer 在消息里面携带一个版本号。Consumer 收到以后会和自己当前的版本号进行比较,接到消息的版本号如果小于数据库的版本号,这个消息就不消费了,直接吞掉,这里要注意,这样的消息就不是拒绝了。


Q & A


Q:消息入库本地库,后台应用扫描数据库重发消息,会不会导致消息重发?

余昭辉:有几层保证。首先,消息一旦发送成功,就把消息给删除了。而后台应用扫描也是扫描指定时间之前的消息,但这可能还是不能完全杜绝重发,比如在删除之前,被扫描到了,就会导致这个问题。我们在 server 端会根据消息的 id 进行一个去重,不过去重也是有个限制的,也就是只保证多长时间内的消息不重复,而不是永久。如果有的业务觉得这还不够,就要自己去实现幂等了。

    

Q:看你们实现了跨机房,如果中间件在两个机房,其中一个机房出问题,会不会有影响,机房做容灾?

余昭辉:这个是没做的,如果一个机房出现不可恢复的故障,需要人工进行恢复。消息收到之后,首先落本地库,还会保存一份到 HBase。你刚才说机房宕掉的,那些存储把消息恢复回来,没有做自动容灾。

    

Q:幂等需要业务上做本地支持?

余昭辉:对。如果业务完全不能接受重复消息,就必须实现幂等。


Q:消息队列是基于 Kafka 吗?如果自研底层是怎么样实现的

余昭辉:这块是自己研发实现的,消息存储直接用 MySQL,开发语言用 Java,去哪儿网主流的语言就是 Java,公司里其他语言很少。关于存储的分区,因为模型非常简单,分区就是用消息的 id,hash 进行分区。

    

Q:broker 做服务中心,如果 broker 重启,消息持久化的情况怎么处理? 

余昭辉:brocker 进行重启,先参看最开始的那个模型。消息发到 broker,broker 如果回成功了,说明消息一定落地了,只有落地成功了,broker 才会回成功。返回成功,producer 就可以把本地消息删除掉。如果你发一条消息正好碰到服务重启,存储没落地,broker 肯定不会回消息,消息就在本地库里面,稍后,后台应用又会把消息扫出来重发。

    

Q:从第一版到现在有没有对架构方面的考量或者重新调整设计? 

余昭辉:架构上面变动不多,主要是里面细节在不断调整。比如说最初的时候,网络这一块直接用的是一个 RPC 框架,没有自己实现网络传输。在 2013 年,碰到一些问题,因此又把网络这块完全重写了。比如上文提到队列隔离调度的地方,也是重新设计了。


本期沙龙相关文章

(点击标题直接阅读)



本文相关 PPT 链接如下,也可点击阅读原文直接下载

http://pan.baidu.com/s/1eRMW4zK


本文编辑邓启明,王杰。想更多了解本期『高压下的架构演进』沙龙内容,请关注「ArchNotes」微信公众号以阅读后续文章。申请城市圈可以更及时了解后续活动信息。转载请注明来自高可用架构及包含以下二维码。


高可用架构

改变互联网的构建方式


长按二维码 关注「高可用架构」公众号



* 回复『城市圈』进一步了解

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存