其他
Tars Java 客户端源码分析
作者:vivo 互联网服务器团队-Ke Shengkai
一、基本RPC框架简介
一般RPC架构,有至少三种结构,分别为注册中心,服务提供者和服务消费者。如图1.1所示,注册中心提供注册服务和注册信息变更的通知服务,服务提供者运行在服务器来提供服务,服务消费者使用服务提供者的服务。
服务提供者(RPC Server),运行在服务端,提供服务接口定义与服务实现类,并对外暴露服务接口。
注册中心(Registry),运行在服务端,负责记录服务提供者的服务对象,并提供远程服务信息的查询服务和变更通知服务。
服务消费者(RPC Client),运行在客户端,通过远程代理对象调用远程服务。
(RPC框架基本结构)
(RPC调用流程)
1)客户端调用客户端桩模块。该调用是本地过程调用,其中参数以正常方式推入堆栈。
2)客户端桩模块将参数打包到消息中,并进行系统调用以发送消息。打包参数称为编组。
3)客户端的本地操作系统将消息从客户端计算机发送到服务器计算机。
4)服务器计算机上的本地操作系统将传入的数据包传递到服务器桩模块。
5)服务器桩模块从消息中解包出参数。解包参数称为解组。
6)最后,服务器桩模块执行服务器程序流程。回复是沿相反的方向执行相同的步骤。
二、Tars Java客户端设计介绍
(Tars Java初始化过程)
1)先出创建一个CommunicatorConfig配置项,命名为communicatorConfig,其中按需设置locator, moduleName, connections等参数。
2)通过上述的CommunicatorConfig配置项,命名为config,那么调用CommunicatorFactory.getInstance().getCommunicator(config),创建一个Communicator对象,命名为communicator。
3)假设objectName="MESSAGE.ControlCenter.Dispatcher",需要生成的代理接口为Dispatcher.class,调用communicator.stringToProxy(objectName, Dispatcher.class)方法来生成代理对象的实现类。
4)在stringToProxy()方法里,首先通过初始化QueryHelper代理对象,调用getServerNodes()方法获取远程服务对象列表,并设置该返回值到communicatorConfig的objectName字段里。具体的代理对象的代码分析,见下文中的“2.3 代理生成”章节。
5)判断在之前调用stringToProxy是否有设置LoadBalance参数,如果没有的话,就生成默认的采用RR轮训算法的DefaultLoadBalance对象。
6)创建TarsProtocolInvoker协议调用对象,其中过程有通过解析communicatorConfig中的objectName和simpleObjectName来获取URL列表,其中一个URL对应一个远程服务对象,TarsProtocolInvoker初始化各个URL对应的ServantClient对象,其中一个URL根据communicatorConfig的connections配置项确认生成多少个ServantClient对象。然后使用ServantClients等参数初始化TarsInvoker对象,并将这些TarsInvoker对象集合设置到TarsProtocolInvoker的allInvokers成员变量中,其中每个URL对应一个TarsInvoker对象。上述分析表明,一个远程服务节点对应一个TarsInvoker对象,一个TarsInvoker对象包含connections个ServantClient对象,对于TCP协议,那么就是一个ServantClient对象对应一个TCP连接。
7)使用api, objName, servantProxyConfig,loadBalance,protocolInvoker, this.communicator参数生成一个实现JDK代理接口InvocationHandler的ObjectProxy对象。
8)生成ObjectProxy对象的同时进行初始化操作,首先会执行loadBalancer.refresh()方法刷新远程服务节点到负载均衡器中便于后续tars远程调用进行路由。
9)然后注册统计信息上报器,其中是上报方法采用JDK的ScheduledThreadPoolExecutor进行定时轮训上报。
10)注册服务列表刷新器,采用的技术方法和上述统计信息上报器基本一致。
Tars Java代码使用范例
// 先初始化基本Tars配置
CommunicatorConfig cfg = new CommunicatorConfig();
// 通过上述的CommunicatorConfig配置生成一个Communicator对象。
Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);
// 指定Tars远程服务的服务对象名、IP和端口生成一个远程服务代理对象。
HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000");
//同步调用,阻塞直到远程服务对象的方法返回结果
String ret = proxy.hello(3000, "Hello World");
System.out.println(ret);
//异步调用,不关注异步调用最终的情况
proxy.async_hello(null, 3000, "Hello World");
//异步调用,注册一个实现TarsAbstractCallback接口的回执处理对象,该实现类分别处理调用成功,调用超时和调用异常的情况。
proxy.async_hello(new HelloPrxCallback() {
@Override
public void callback_expired() { //超时事件处理
}
@Override
public void callback_exception(Throwable ex) { //异常事件处理
}
@Override
public void callback_hello(String ret) { //调用成功事件处理
Main.logger.info("invoke async method successfully {}", ret);
}
}, 1000, "Hello World");
在上述例子中,演示了常见的两种调用方式,分别为同步调用和异步调用。其中异步调用,如果调用方想捕捉异步调用的最终结果,可以注册一个实现TarsAbstractCallback接口的实现类,对tars调用的异常,超时和成功事件进行处理。
public final class ObjectProxy<T> implements ServantProxy, InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
InvokeContext context = this.protocolInvoker.createContext(proxy, method, args);
try {
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return this.toString();
} else if
//***** 省略代码 *****
} else {
// 在负载均衡器选取一个远程调用类,进行应用层协议的封装,最后调用TCP传输层进行发送。
Invoker invoker = this.loadBalancer.select(context);
return invoker.invoke(context);
}
} catch (Throwable var8) {
// ***** 省略代码 *****
}
}
}
具体相关逻辑如源码所示,ObjectProxyFactory是生成ObjectProxy的辅助工厂类,和ServantProxyFactory不同,其本身不缓存生成的代理对象。
class ServantProxyFactory {
private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap();
// ***** 省略代码 *****
public <T> Object getServantProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) {
Object proxy = this.cache.get(objName);
if (proxy == null) {
this.lock.lock(); // 加锁,保证只生成一个远程服务代理对象。
try {
proxy = this.cache.get(objName);
if (proxy == null) {
// 创建实现JDK的java.lang.reflect.InvocationHandler接口的对象
ObjectProxy<T> objectProxy = this.communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, servantProxyConfig, loadBalance, protocolInvoker);
// 使用JDK的java.lang.reflect.Proxy来生成实际的代理对象
this.cache.putIfAbsent(objName, this.createProxy(clazz, objectProxy));
proxy = this.cache.get(objName);
}
} finally {
this.lock.unlock();
}
}
return proxy;
}
/** 使用JDK自带的Proxy.newProxyInstance生成代理对象 */
private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz, ServantProxy.class}, objectProxy);
}
// ***** 省略代码 *****
}
如图下图所示,ClientA某个时刻的一次调用使用了Service3节点进行远程服务调用,而ClientB某个时刻的一次调用采用Service2节点。Tars Java提供多种负载均衡算法实现类,其中有采用RR轮训算法的RoundRobinLoadBalance,一致性哈希算法的ConsistentHashLoadBalance和普通哈希算法的HashLoadBalance。
(客户端按特定路由规则调用远程服务)
如下述源码所示,如果要自定义负载均衡器来定义远程调用的路由规则,那么需要实现com.qq.tars.rpc.common.LoadBalance接口,其中LoadBalance.select()方法负责按照路由规则,选取对应的Invoker对象,然后进行远程调用,具体逻辑见源码代理实现。由于远程服务节点可能发生变更,比如上下线远程服务节点,需要刷新本地负载均衡器的路由信息,那么此信息更新的逻辑在LoadBalance.refresh()方法里实现。
负载均衡接口
public interface LoadBalance<T> {
/** 根据负载均衡策略,挑选invoker */
Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;
/** 通知invoker列表的更新 */
void refresh(Collection<Invoker<T>> invokers);
}
假如此时出现一个网络事件,那么此时线程将会被唤醒,执行后续代码,其中一个代码是dispatcheEvent(key),也就是将进行事件的分发。
其中将根据对应条件,
调用acceptor.handleConnectEvent(key)方法来处理客户端连接成功事件,
或acceptor.handleAcceptEvent(key)方法来处理服务器接受连接成功事件,
或调用acceptor.handleReadEvent(key)方法从Socket里读取数据,
或acceptor.handleWriteEvent(key)方法来写数据到Socket 。
Reactor事件处理
public final class Reactor extends Thread {
protected volatile Selector selector = null;
private Acceptor acceptor = null;
//***** 省略代码 *****
public void run() {
try {
while (!Thread.interrupted()) {
// 阻塞直到有网络事件发生。
selector.select();
//***** 省略代码 *****
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (!key.isValid()) continue;
try {
//***** 省略代码 *****
// 分发传输层协议TCP或UDP网络事件
dispatchEvent(key);
//***** 省略代码 *****
}
}
//***** 省略代码 *****
}
//***** 省略代码 *****
private void dispatchEvent(final SelectionKey key) throws IOException {
if (key.isConnectable()) {
acceptor.handleConnectEvent(key);
} else if (key.isAcceptable()) {
acceptor.handleAcceptEvent(key);
} else if (key.isReadable()) {
acceptor.handleReadEvent(key);
} else if (key.isValid() && key.isWritable()) {
acceptor.handleWriteEvent(key);
}
}
}
(Tars-Java的网络事件处理模型)
上图中的处理读IO事件(Read Event)实现和写IO事件(Write Event)的线程池是在Communicator初始化的时候配置的。具体逻辑如源码所示,其中线程池参数配置由CommunicatorConfig的corePoolSize, maxPoolSize, keepAliveTime等参数决定。
private void initCommunicator(CommunicatorConfig config) throws CommunicatorConfigException {
//***** 省略代码 *****
this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config);
//***** 省略代码 *****
}
public class ClientPoolManager {
public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) {
//***** 省略代码 *****
clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig));
//***** 省略代码 *****
return clientPoolExecutor;
}
private static ThreadPoolExecutor createThreadPool(CommunicatorConfig communicatorConfig) {
int corePoolSize = communicatorConfig.getCorePoolSize();
int maxPoolSize = communicatorConfig.getMaxPoolSize();
int keepAliveTime = communicatorConfig.getKeepAliveTime();
int queueSize = communicatorConfig.getQueueSize();
TaskQueue taskqueue = new TaskQueue(queueSize);
String namePrefix = "tars-client-executor-";
TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory(namePrefix));
taskqueue.setParent(executor);
return executor;
}
}
下图描述了远程服务调用的流程情况。这里着重讲几个点,一个是如何写数据到网络IO。第二个是Tars Java通过什么方式进行同步或者异步调用,底层采用了什么技术。
(远程调用流程)
如图(底层代码写IO过程)所示,ServantClient将调用底层网络写操作,在invokeWithSync方法中,取得ServantClient自身成员变量TCPSession,调用TCPSession.write()方法,如图(底层代码写IO过程)和以下源码( 读写事件线程池初始化)所示,先获取Encode进行请求内容编码成IoBuffer对象,最后将IoBuffer的java.nio.ByteBuffer内容放入TCPSession的queue成员变量中,然后调用key.selector().wakeup(),唤醒Reactor中run()方法中的Selector.select(),执行后续的写操作。
(底层代码写IO过程)
具体Reactor逻辑见上文2.5 网络模型内容,如果Reactor检查条件发现可以写IO的话也就是key.isWritable()为true,那么最终会循环从TCPSession.queue中取出ByteBuffer对象,调用SocketChannel.write(byteBuffer)执行实际的写网络Socket操作,代码逻辑见源码中的doWrite()方法。
public class TCPSession extends Session {
public void write(Request request) throws IOException {
try {
IoBuffer buffer = selectorManager.getProtocolFactory().getEncoder().encodeRequest(request, this);
write(buffer);
//***** 省略代码 *****
}
protected void write(IoBuffer buffer) throws IOException {
//***** 省略代码 *****
if (!this.queue.offer(buffer.buf())) {
throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");
}
if (key != null) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
key.selector().wakeup();
}
}
protected synchronized int doWrite() throws IOException {
int writeBytes = 0;
while (true) {
ByteBuffer wBuf = queue.peek();
//***** 省略代码 *****
int bytesWritten = ((SocketChannel) channel).write(wBuf);
//***** 省略代码 *****
return writeBytes;
}
}
ServantClient的同步调用
public class ServantClient {
public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException {
//***** 省略代码 *****
ticket = TicketManager.createTicket(request, session, this.syncTimeout);
Session current = session;
current.write(request);
if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) {
//***** 省略代码 *****
response = ticket.response();
//***** 省略代码 *****
return response;
//***** 省略代码 *****
return response;
}
}
如代码所示,在执行完session.write()操作后,紧接着执行ticket.await()方法,该方法线程等待直到远程服务回复返回结果到客户端,ticket.await()被唤醒后,将执行后续操作,最终invokeWithSync方法返回response对象。其中Ticket的等待唤醒功能内部采用java.util.concurrent.CountDownLatch来实现。
public final class WorkThread implements Runnable {
public void run() {
try {
//***** 省略代码 *****
Ticket<Response> ticket = TicketManager.getTicket(resp.getTicketNumber());
//***** 省略代码 *****
ticket.notifyResponse(resp);
ticket.countDown();
TicketManager.removeTicket(ticket.getTicketNumber());
}
//***** 省略代码 *****
}
}
public class TicketManager {
//***** 省略代码 *****
static {
executor.scheduleAtFixedRate(new Runnable() {
long currentTime = -1;
public void run() {
Collection<Ticket<?>> values = tickets.values();
currentTime = System.currentTimeMillis();
for (Ticket<?> t : values) {
if ((currentTime - t.startTime) > t.timeout) {
removeTicket(t.getTicketNumber());
t.expired();
}
}
}
}, 500, 500, TimeUnit.MILLISECONDS);
}
}
三、总结
Tars与其他RPC框架,并没有什么本质区别,通过类比其他框架的设计理念,可以更加深入理解Tars Java设计理念。
四、参考文献
END
猜你喜欢
vivo互联网技术
vivo移动互联网是基于vivo 智能手机所建立的完整移动互联网生态圈,围绕vivo大数据运营,打造包括应用、游戏、资讯、品牌、电商、内容、金融、搜索的全方位服务生态,满足海量用户的多样化需求。
点一下,代码无 Bug