RabbitMQ 的这些骚操作你知道吗?
The following article is from 阿飞的博客 Author 阿飞
RabbitMQ的Java客户端统一使用com.rabbitmq.client作为顶级包名。其中,最核心的类主要有:ConnectionFactory、Connection、Channel、Consumer、DefaultConsumer、BasicProperties。需要说明的是,本文不只是教你RabbitMQ客户端的基本玩法,还有一些你可能不知道的一些骚操作。
连接RabbitMQ
使用RabbitMQ第一步当然是连接RabbitMQ,这里就不说怎么搭建RabbitMQ环境了,本文假设你已经有RabbitMQ环境,连接RabbitMQ的代码如下:
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root123");
factory.setVirtualHost("/");
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection conn = factory.newConnection();
需要说明的是,如果你用的是默认vhost,即/。那么factory.setVirtualHost("/")这行代码可以省掉。那么,这里有一个有趣的问题:创建RabbitMQ连接最短的代码是怎样的?答案是只需要两行代码即可。这是为什么呢?因为创建连接的这几个字段都有默认值,用户名密码默认值默认为guest/guest,host和端口默认为localhost和5672(ConnectionFactory.java源码中有DEFAULT_开头命名的常量,就是默认值)。不过需要注意的是默认账户guest只能连接本地RabbitMQ环境:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
创建RabbitMQ连接还有另一种通过URI的方式,代码如下:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://username:password@hostName:port/virtualHost");
Connection conn = factory.newConnection();
每成功创建连接,在RabbitMQ服务端都有相应的日志:
2020-05-10 17:51:58.380 [info] <0.7312.0> accepting AMQP connection <0.7312.0> ([::1]:61390 -> [::1]:5672)
2020-05-10 17:51:58.509 [info] <0.7312.0> connection <0.7312.0> ([::1]:61390 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'
被动申明
如下这段代码所示,被动申明一个队列。它只检查队列是否存在,如果存在,那么不会有任何操作,并且返回和主动且成功创建队列一样的响应信息。如果队列不存在,那么就会抛出Channel级别的异常。所以,被动申明一般使用在一次性临时性Channel申明的地方:
Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.getMessageCount();
// returns the number of consumers the queue has
response.getConsumerCount();
如果队列不存在时,抛出的异常信息如下:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'queue-none' in vhost '/', class-id=50, method-id=10)
线程安全
强制要求一个线程一个Channel,不要多个线程共用一个Channel实例,否则会出现一些莫名其妙的错误。
消费者(Push模式)
消费者消费消息一般通过channel.basicConsume方法,这个方法有很多重载参数,不过我们常用的方法是下面这两个。官方更加推荐第一个带有consumerTag的方法,并且每个不同的消费者实例要有不同的consumerTag。强烈不建议一个连接上有相同的consumerTag,否则可能会导致automatic connection recovery
的问题,参考(https://www.rabbitmq.com/api-guide.html#connection-recovery):
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
consumerTag是消费者唯一标识符。如果是使用了不带consumerTag参数的方法,那么RabbitMQ会自动生成一个唯一Tag,这样的Tag没有业务参考意义。如下图所示:
如果我们自定义了consumerTag的值,那么,一看某个队列的消费者信息,就知道这些消费者来自哪里、是干嘛的,非常让人容易理解。如下图所示:
需要说明的是,这种Push模式,如果生产者产生的消息量超过消费者能承受的量,就会撑爆消费者。不过,RabbitMQ考虑到了这一点,可以通过方法channel.basicQos(1000)进行限流。basic.qos是针对Channel进行设置的,也就是说只有在channel建立之后才能发送basic.qos命令。在rabbitmq的实现中,每个channel都对应会有一个rabbit_limiter进程,当收到basic.qos命令后,在rabbit_limiter进程中记录信令中prefetch_count的值,同时记录的还有该channel未ack的消息个数,从而保证未ack的消息数量不超过prefetch_count的值(如果prefetch_count设置为0,表示没有任何限制)。
消费者(Pull模式)
Push模式对应的消费端方法是basicConsumer(),而Pull模式对应的消费端方法是basicGet()。每次获取一条消息,不能批量。这种方法效率非常低下,因为不知道队列中是否有消息,所以必须反复询问,即使大部分请求没有结果的情况下,这种方法非常不推荐使用。代码如下:
boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
// do something
channel.basicAck(method.deliveryTag, false);
}
高级连接方式
消费者线程默认被一个ExecutorService自动分配,当连接被关闭的时候,默认的ExecutorService会调用shutdown()。但是,如果创建连接的时候使用了用户自定义ExecutorService,必须手动调用shutdown()方法,否则,线程池中的线程可能会阻止JVM终止,除非kill -9。使用自定义线程池代码如下所示:
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
需要说明的是,这个特性只有当你明确知道在消费者callback碰到处理瓶颈的情况下才考虑使用,如果没有消费者callback,或者非常少量,那么默认的线程池完全足矣。
使用地址集合
在通过factory构造Connection的时候,允许配置多个地址集合,代码如下所示。它会首先尝试连接host1:post1,如果连接失败,会再尝试连接host2:post2,而且整个过程对用户无感知,只要有一个地址是可用的,就不会抛出任何异常:
Address[] addr = new Address[]{ new Address(host1, port1),
new Address(host2, port2)};
Connection conn = factory.newConnection(addr);
支持NIO
RabbitMQ的Java客户端从4.0开始支持Java NIO。NIO的目的不是为了比BIO更快,它只是为了方便用户更轻易的控制资源,比如线程等。默认的BIO模式下,每一个Connection连接都会用一个线程从网络Socket中读取数据。而在NIO模式下,我们是可以控制与网络Socket交互的线程数。
如果你的Java进程中使用了几十甚至上百个Connection,那么可以尝试使用NIO模式,因为它相比默认的BIO模式,可以节省很多的线程资源。并且在线程数设置合理的情况下,性能不会有任何衰减。开启NIO模式非常简单,如下所示:
ConnectionFactory factory = new ConnectionFactory();
// 默认nio模式线程数为1
factory.useNio();
另一种使用NIO的方式:
factory.setNioParams(new NioParams().setNbIoThreads(4));
网络故障自动恢复
在RabbitMQ服务器和Java客户端之间的网络故障是很常见的现象,RabbitMQ的Java客户端是支持自动恢复的,并且4.0以后该特性是默认开启的,证据在ConnectionFactory的源码中:
private boolean automaticRecovery = true;
private boolean topologyRecovery = true;
❝topology是什么意思?中文是拓扑的意思。在这里是指交换机、队列、绑定关系、消费者等。
我们也可以在new出来ConnectionFactory的时候,显示设置开启or关闭。如果恢复失败,RabbitMQ会固定时间间隔以后进行重试,默认为5秒钟(DEFAULT_NETWORK_RECOVERY_INTERVAL)。可以通过方法setNetworkRecoveryInterval()指定间隔时间。如果构造Connection时用的是地址集合,那么地址会被随机打乱,然后一个接一个进行重试:
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
那么故障恢复在什么时候触发呢?主要是如下这些情况,只要任意一个条件发生都会触发:
Connection上抛出IO异常、或者其他一些其他非预期的异常; scoket读取超时; 失去心跳;
如果是应用启动过程中初始化连接碰到RabbitMQ节点故障,这种情况下自动连接恢复是不会介入的。因为这种情况下,很可能RabbitMQ有一些故障或者问题,开发人员有责任排查问题原因。另外,如果显示调用connection.close()方法后,恢复机制也不会介入。
心跳机制
创建ConnectionFactory时,设置一个大于0的值就是开启心跳机制。如果设置等于0的值,就是关闭心跳机制:
ConnectionFactory cf = new ConnectionFactory();
// set the heartbeat timeout to 60 seconds
cf.setRequestedHeartbeat(60);
需要说明的是,如果设置心跳超时值太低的话,可能会由于一些原因比如瞬时网络故障等导致误报。这里给出一些经验数据:值低于5秒的话,很可能造成误报。值低于1秒的话,基本上都是误报。值在5~20秒之间对大部分环境来说,都是一个比较理想的值。如果是一个很大的值,例如1800秒,这时候心跳信息传送的少了,几乎没有实际的影响,就相当于关闭了心跳机制。
往期推荐
欢迎加入我的知识星球,聊技术、说职场、侃社会。
加入方式:长按下方二维码噢
我的星球是否适合你?
点击阅读原文看看我们都在聊啥