查看原文
其他

几年前,我撸了一套RabbitMQ的客户端

ImportNew 2022-09-23

The following article is from 四猿外 Author 四猿外

(给ImportNew加星标,提高Java技能)

转自:四猿外

我在上篇文章说过,如果使用 RabbitMQ,尽可能使用框架,而不要去使用 RabbitMQ 提供的 Java 版客户端。

细说起来,其实还是因为 RabbitMQ 客户端的使用有很多的注意事项,稍微不注意,就容易翻车。

我是 2013 年就开始用起了 RabbitMQ,一路使用,一路和它一起成长。当时,由于用的早,市面上也没有特别成熟的 RabbitMQ 客户端框架。所以,不得已之下,只好自己做了一套客户端。

在这其中,正好也有了许多独特的经验也和大家分享一下,以免后来者陷入“后人哀之而不鉴之,亦使后人而复哀后人也”的套娃中。

一、那么,就先从网络连接开始吧

1. 应该长久生存的连接

在 RabbitMQ 中,由于需要客户端和服务器端进行握手,所以导致客户端和服务器端的连接如果要成功创建,需要很高的成本。

每一个连接的创建至少需要 7 个 TCP 包,这还只是普通连接。如果需要 TLS 的参与,则 TCP 包会更多。

而且,RabbitMQ 中主要是以 Channel 方式通信,所以,每次创建完 Connection 网络连接,还得创建 Channel,这又需要 2 个 TCP 包。

如果,每次用完,再把连接关闭,首先还要关闭已经创建的 Channel,这也需要 2 个 TCP 包。

然后,再关闭已经建立好的 Connection 连接,又需要 2 个 TCP 包。

咱们算算,如果一个连接从创建到关闭,一共需要多少个 TCP 包?

7 + 2 + 2 + 2 = 13

一共需要 13 个包。这个成本是很昂贵的。

所以,在 RabbitMQ 中,连接最好缓存起来,重复使用更好。

2. Channel 还是独占好

在 RabbitMQ 自己的客户端中,Channel 出于性能原因,并不是线程安全的。

而如果咱们为了线程共用,给 Channel 人为的在外部加上锁,本身就和 RabbitMQ 的 Channel 设计意图是冲突的。

所以,最好的办法就是一个线程一个 Channel。

3. Channel 最好也别关

就像连接应该缓存起来那样,Channel 的打开和关闭也需要时间成本,而且没有必要去重新创建 Channel,所以,Channel 也应该缓存起来重用。

4. 别把消费和发送的连接搞在一起

把消费和发送的连接搞在一起,这是个很容易犯的错误!

我们用 RabbitMQ 的时候,我们自己的系统本身大部分都是既要发消息也要收消息的。对于这种情况,有很多程序员走了极端:

他们觉得 RabbitMQ 连接成本高,所以省着用。于是就把发消息和收消息的连接混在一起,使用同一个 TCP 连接。

这很可能会埋一个大雷。

因为,当我们发消息很频繁的时候,我们收消息也是走的同一个 TCP 通道,收完了消息,客户端还要给 RabbitMQ 服务器端一个 ACK。

RabbitMQ 服务器端,对于每个 TCP 连接都会分配专门的进程,如果遇到这个进程繁忙,这个 ACK 很可能被丢弃,又或者等待处理的时间过长。而这种情况又会导致 RabbitMQ 中的未确认消息会被堆积的越来越多,影响到整套系统。

所以,消费和发送的连接必须分开,各干各的事情。

5. 别搞太多连接和 Channel,RabbitMQ 的 Web 受不了

RabbitMQ 的 Web 插件会收集很多连接,和其对应 Channel 的相关数据。

如果连接和 Channel 堆积太多了,整个 Web 打开会非常慢,几乎无法对 RabbitMQ 进行管理。所以,要注意限制连接和 Channel 的数量。

二、消息很宝贵,千万别乱抛弃哦

用来通信的消息是很宝贵的。

因为每条消息都可能携带了关键的数据和信息。所以,保证消息不丢失,需要根据消息的重要性,采取很多的措施。

1. 小心,Queue 存在再发消息

一条消息,在 RabbitMQ 中会先发到 Exchange,再由 Exchange 交给对应的 Queue。

而当 Queue 不存在,或者没匹配到合适的 Queue 的时候,默认就会把消息发到系统中的 /dev/null 中。

而且还不会报错。

这个坑当年把我坑惨了!我猜这个坑无数人踩过吧。

所以,在发送消息的时候,最好通过 declare passive 这种方法去探测下队列是否存在,保证消息发送不会丢的莫名其妙。

2. 收到消息请告诉我

在使用 RabbitMQ 客户端的时候,发送消息,一定要考虑使用 confirm 机制。

这个机制就是当消息收到了,RabbitMQ 会往客户端发送一个通知,客户端收到这个通知后,如果存在一个 confirm 处理器,那么就会回调这个处理器处理。这时候,我们就能确保消息是被中间件收到了。

所以,一定要考虑使用 confirm 处理器去确保消息被 RabbitMQ 服务器收到。

3. 有时候消息出了问题我也需要知道

在某些业务里,可能需要知道消息发送失败的场景,以便执行失败的处理逻辑。这时候,就要考虑 RabbitMQ 客户端的 return 机制。

这个机制就是当消息在服务器端路由的时候出现了错误,比如没有 Exchange、或者 RoutingKey 不存在,则 RabbitMQ 会返回一个响应给客户端。客户端收到后会回调 return 的处理器。这时候,客户端所在系统就能感知到这种错误了,从而进行对应的处理。

4. 为了一定不丢消息我也是拼了

还有的时候,消息需要处理强一致性这种事务性质的业务。这时候,就必须开启 RabbitMQ 的事务模式。但是,这个模式会导致整体 RabbitMQ 的性能下降 250 倍。

一般没有必要,不建议开启。

5. 把消息写到磁盘上

一般来说,为了防止消息丢失,需要在 RabbitMQ 服务器收到消息的时候,先持久化消息到磁盘上,防止服务器状态出现问题,消息丢失。

但是,持久化消息,必须先持久化队列,持久化队列完还不行,还必须把消息的 delivery mode 设置为 2,这样才能把消息存到磁盘。但是,这种行为会让整个 RabbitMQ 的性能下降 60%。

这种可以根据实际情况进行抉择。

三、对于收消息这件事,别由着性子来

1. 能一次拿多个干嘛要一次只拿一个

很多时候,一些 RabbitMQ 的新手,觉得如果在一个 mainloop 类似的无限循环里,去主动获取消息,会更加及时的获取到消息,也会拥有更加出色的性能。所以,他们会使用 get 这种行为去取代 consume 这种行为。

这时候,他们其实已经踩进了大坑。

为了能主动 get 服务器消息,很多新手会去写一个无限循环,然后不断尝试去 RabbitMQ 服务器端获取消息。但是,get 方法,其实是只去获取了队列中的第一条消息。

而采用 consume 方式呢,它的默认方式是只要有消息,就会批量的拿,直到拿光所有还没消费过的消息。

一个是一条条拿,一个是批量拿,哪个效率更高一目了然。

所以,尽量采用 consume 方式获取消息。

2. 拿消息也要讲方法论的

消费消息的时候,其实最难掌握的就是:

一次我们到底要取多少条消息?

对于 RabbitMQ 来讲,如果我们不对消费行为做限制,他会有多少消息就获取多少消息。这就造成了一个问题:

如果消息过多,我们一次性把消息读取到内存,很可能就会把应用的内存挤崩掉。

所以,我们要对这种情况做一些限制。

这时候,需要限制一次获取消息的数量,一般来讲,当我们的业务是异步发送,异步消费,不需要实时给回响应的时候,经验数据是一次获取 1000 条。

当然,系统和系统不一样,硬件条件也不一样,大家可以根据实际的情况来设置一次性获取的消息数量。

重点要说说同步。

在很多时候,我们需要通过 RabbitMQ 传送消息,并能通过临时队列等技巧去实时返回处理结果。这时候,就没办法一次抓多条数据进行处理了,因为,有发送端在等处理结果,依次处理,再依次返回,黄花菜都凉了。

而且大部分时候,这种同步等待响应的业务是有顺序要求的。所以,也不能并行同时抓出多条信息处理。那么,彼时,设置每次只消费一条消息就是理所应当的了。

最后

从上面的内容中,你也看到了,RabbitMQ 客户端如果要使用,对新手是多可恶的一件事情,各种坑,各种复杂性。

所以,如果你觉得 Spring 之类的 AMQP 客户端框架合你心意,那么你就使用它。

但是,Spring 的东西有个毛病,如果你要用它,你的应用必须也都要用 Spring。有些时候,也没有这种必要。这时候,你就可以根据我说的这些注意事项和经验,自己开发一套 RabbitMQ 的封装框架,去降低 RabbitMQ 的使用门槛。


推荐阅读  点击标题可跳转

别人家的团队怎么用RabbitMQ:我总结的5点规范

在微服务中使用RabbitMQ也需要规范化

一个基于 RabbitMQ 的可复用的分布式事务消息架构方案!


看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

好文章,我在看❤️


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存