基于消息中间件RabbitMQ实现简单的RPC服务
小Hub领读:
基于RabbitMq,怎么弄一个RPC框架,来看看代码,应该能看懂哈!
作者:zhaoyh
原文地址 https://blog.csdn.net/ZYH920521/article/details/88974607
RPC(Remote Procedure Call, 远程过程调用),是一种计算机通信协议。对于两台机器而言,就是 A 服务器上的应用程序调用 B 服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。
1. RPC 框架
我们首先通过一张图理解 RPC 的工作流程:
因此,实现一个最简单的 RPC 服务,只需要 Client、Server 和 Network,本文就是利用消息中间件 RabbitMQ 作为 Network 载体传输信息,实现简单的 RPC 服务。简单原理可如下图所示:
即:当 Client 发送 RPC 请求时,Client 端是消息生产者,Server 端是消息消费者;当 Server 返回结果时,Server 端是消息生产者,Client 是消息消费者;发送和返回使用不同的队列。
接下来我们通过代码,详细展示一个计算斐波那契数列的 RPC 服务。
2. RPCServer 实现
2.1 Server 初始化
/**
* 队列名、交换机名、路由键
*/
private static final String EXCHANGE_NAME = "rpc_exchange";
private static final String QUEUE_NAME = "request_rpc_queue";
private static final String ROUTING_KEY = "rpc_routing_key";
private Connection connection = null;
private Channel channel = null;
private QueueingConsumer consumer = null;
/**
* Server的构造函数
*/
private RPCServer() {
try {
//创建链接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Config.HOST);
factory.setPort(Config.PORT);
factory.setUsername(Config.USER);
factory.setPassword(Config.PASSWORD);
connection = factory.newConnection();
//创建信道
channel = connection.createChannel();
//设置AMQP的通信结构
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//设置消费者
consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);
} catch (Exception e) {
LOG.error("build connection failed!", e);
}
}
初始化就是声明 RabbitMQ 的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了 AMQP 的通信结构。
2.2 监听队列并反馈
/**
* 开启server
*/
private void startServer() {
try {
LOG.info("Waiting for RPC calls.....");
while (true) {
//获得文本消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
//返回消息的属性
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId())
.build();
long receiveTime = System.currentTimeMillis();
JSONObject json = new JSONObject();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
LOG.info("Got a request: fib(" + message + ")");
json.put("status", "success");
json.put("result", fib(n));
} catch (Exception e) {
json.put("status", "fail");
json.put("reason", "Not a Number!");
LOG.error("receive message failed!", e);
} finally {
long responseTime = System.currentTimeMillis();
json.put("calculateTime", (responseTime - receiveTime));
channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
} catch (Exception e) {
LOG.error("server failed!", e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
LOG.error("close failed!", e);
}
}
}
}
在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的 nextDelivery 方法来获得 RabbitMQ 队列的最新一条消息。同时通过 getProperties 获取到消息中的反馈信息属性,用于标记客户端 Client 的属性。然后计算斐波那契数列的结果。
最后通过 basicAck 使用消息信封向 RabbitMQ 确认了该消息。
到这里就实现了计算斐波那契数列 RPC 服务的 Server 端。
3. RPCClient 实现
3.1 初始化 CLient
/**
* 消息请求的队列名、交换机名、路由键
*/
private static final String EXCHANGE_NAME = "rpc_exchange";
private static final String QUEUE_NAME = "request_rpc_queue";
private static final String ROUTING_KEY = "rpc_routing_key";
/**
* 消息返回的队列名、交换机名、路由键
*/
private static final String RESPONSE_QUEUE = "response_rpc_queue";
private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";
/**
* RabbitMQ的实体
*/
private Connection connection = null;
private Channel channel = null;
private QueueingConsumer consumer = null;
/**
* 构造客户端
* @throws Exception
*/
private RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Config.HOST);
factory.setPort(Config.PORT);
factory.setUsername(Config.USER);
factory.setPassword(Config.PASSWORD);
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);
consumer = new QueueingConsumer(channel);
channel.basicConsume(RESPONSE_QUEUE, true, consumer);
}
这里声明 AMQP 结构体的方式和 Server 端类似,只不过 Client 端需要多声明一个队列,用于 RPC 的 response。
3.2 发送 / 接收消息
/**
* 请求server
* @param message
* @return
* @throws Exception
*/
private String requestMessage(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(),"UTF-8");
break;
}
}
return response;
}
BasicProperties 用于存储你请求消息的属性,这里我设置了 correlationId 和 replyTo 属性,用于 Server 端的返回识别。
4. 运行测试
Client 端发送:
Server 端接收并处理:
Client 收到计算结果:
由于我运行 RabbitMQ 的服务器是租用的阿里云的,差不多传输时延在 60ms 左右,如果把 RPC 服务和消息中间件同机房部署的话延时基本上就在 ms 级别。
5. FAQ
5.1 说明
需要体验完整的过程,你需要如下环境:
JDK1.6以上 + Maven+ RabbitMQ
5.2 源代码
完整代码代码请戳:github:https://github.com/chadwick521/rabbitmqdemo
其中 Server 的代码在:
rpc.RPCServer
Client 端的代码位置:
rpc.RPCClient
以上内容就是关于基于消息中间件 RabbitMQ 实现简单的 RPC 服务的全部内容了,谢谢你阅读到了这里!
(完)
MarkerHub文章索引:(点击阅读原文直达)
https://github.com/MarkerHub/JavaIndex
【推荐阅读】
【项目实践】SpringBoot三招组合拳,手把手教你打出优雅的后端接口
使用spring boot+WebSocket 实现定时消息推送(基于注解)