其他
Dubbo 编解码那些事
作者:vivo 互联网服务器团队-Sun wen
一、背景
怀揣着好奇,对于Dubbo的编解码做了几次的Debug学习,在此分享一些学习经验。
比如引入facade包后出现jar包冲突、服务无法启动,更新facade包后某个类找不到等等问题。引入jar包,导致消费方和提供方在某种程度上有了一定耦合。
正是这种耦合,在提供者修改了Facade包类的路径后,习惯性认为会引发报错,而实际上并没有。最初认为很奇怪,仔细思考后才认为理应这样,调用方在按照约定的格式和协议基础上,即可与提供方完成通信。并不应该关注提供方本身上下文信息。(认为类的路径属于上下文信息)接下来揭秘Dubbo的编码解码过程。
二、Dubbo编解码
Dubbo默认用的netty作为通信框架,所有分析都是以netty作为前提。涉及的源码均为Dubbo - 2.7.x版本。在实际过程中,一个服务很有可能既是消费者,也是提供者。为了简化梳理流程,假定都是纯粹的消费者、提供者。
编解码 = dubbo内部编解码链路 + 序列化层
本文旨在梳理从Java对象到二进制流,以及二进制流到Java对象两种数据格式之间的相互转换。在此目的上,为了便于理解,附加通信层内容,以encode,decode为入口,梳理dubbo处理链路。又因Dubbo内部定义为Encoder,Decoder,故在此定义为"编解码"。
无论是序列化层,还是通信层,都是Dubbo高效、稳定运行的基石,了解底层实现逻辑,能够帮助我们更好的学习和使用Dubbo框架。
同理,提供者在NettyServer#doOpen方法提供服务,初始化ServerBootstrap时,会添加编解码器。(adapter.getDecoder()- 解码器,adapater.getEncoder() - 编码器)。
NettyClient
/**
* Init bootstrap
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {
bootstrap = new Bootstrap();
// ...
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ...
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
// ...
}
});
}
/**
* Init and start netty server
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// ...
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ...
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// ...
}
发送消息
ChannelInboundHandler
...
NettyCodecAdapter#getEncoder()
->NettyCodecAdapter$InternalEncoder#encode
->DubboCountCodec#encode
->DubboCodec#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeRequest
DubboCountCodec类实际引用的是DubboCodec,因DubboCodec继承于ExchangeCodec,并未重写encode方法,所以实际代码跳转会直接进入ExchangeCodec#encode方法
接收响应
NettyCodecAdapter#getDecoder()
->NettyCodecAdapter$InternalDecoder#decode
->DubboCountCodec#decode
->DubboCodec#decode
->ExchangeCodec#decode
->DubboCodec#decodeBody
...
MultiMessageHandler#received
->HeartbeatHadnler#received
->AllChannelHandler#received
...
ChannelEventRunnable#run
->DecodeHandler#received
->DecodeHandler#decode
->DecodeableRpcResult#decode
解码链路相对复杂,过程中做了两次解码,在一次DubboCodec#decodeBody内,并未实际解码channel的数据,而是构建成DecodeableRpcResult对象,然后在业务处理的Handler里通过异步线程进行实际解码。
2.4 提供端链路
提供者在接收消息时解码,回复响应时编码。
接收消息
NettyCodecAdapter#getDecoder()
->NettyCodecAdapter$InternalDecoder#decode
->DubboCountCodec#decode
->DubboCodec#decode
->ExchangeCodec#decode
->DubboCodec#decodeBody
...
MultiMessageHandler#received
->HeartbeatHadnler#received
->AllChannelHandler#received
...
ChannelEventRunnable#run
->DecodeHandler#received
->DecodeHandler#decode
->DecodeableRpcInvocation#decode
提供端解码链路与消费端的类似,区别在于实际解码对象不一样,DecodeableRpcResult 替换成 DecodeableRpcInvocation。
体现了Dubbo代码里的良好设计,抽象处理链路,屏蔽处理细节,流程清晰可复用。
回复响应
NettyCodecAdapter#getEncoder()
->NettyCodecAdapter$InternalEncoder#encode
->DubboCountCodec#encode
->DubboCodec#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeResponse
与消费方发送消息链路一致,区别在于最后一步区分Request和Response,进行不同内容编码
2.5 Dubbo协议头
dubbo采用定长消息头 + 不定长消息体进行数据传输。以下是消息头的格式定义
2byte:magic,类似java字节码文件里的魔数,用来标识是否是dubbo协议的数据包。
1byte:消息标志位,5位序列化id,1位心跳还是正常请求,1位双向还是单向,1位请求还是响应;
1byte:响应状态,具体类型见com.alibaba.dubbo.remoting.exchange.Response;
8byte:消息ID,每一个请求的唯一识别id;
4byte:消息体body长度。
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// set request id.
Bytes.long2bytes(req.getId(), header, 4);
// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
// body length
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
三、Hessian2
在此以消费端发送消息序列化对象,接收响应反序列化为案例,看看hessian2的处理细节,同时解答前言问题。
@Overrideprotected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; out.writeUTF(version); // https://github.com/apache/dubbo/issues/6138 String serviceName = inv.getAttachment(INTERFACE_KEY); if (serviceName == null) { serviceName = inv.getAttachment(PATH_KEY); } out.writeUTF(serviceName); out.writeUTF(inv.getAttachment(VERSION_KEY)); out.writeUTF(inv.getMethodName()); out.writeUTF(inv.getParameterTypesDesc()); Object[] args = inv.getArguments(); if (args != null) { for (int i = 0; i < args.length; i++) { out.writeObject(encodeInvocationArgument(channel, inv, i)); } } out.writeAttachments(inv.getObjectAttachments());}
从头查看编码的调用链路,ExchangeCodec#encodeRequest内有如下代码:
ExchangeCodec
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// ...
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
// ...
}
out对象来自于serialization对象,顺着往下看。在CodecSupport类有如下代码:
CodecSupport
public static Serialization getSerialization(URL url) {
return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
}
可以看到,这里通过URL信息,基于Dubbo的SPI选择Serialization对象,默认为hessian2。再看看serialization.serialize(channel.getUrl(),bos)方法:
Hessian2Serialization
@Override
public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
return new Hessian2ObjectOutput(out);
}
DubboCodec
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response...
try {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
} catch (Throwable t) {
// ...
}
return res;
} else {
// decode request...
return req;
}
}
关键点1)对于解码请求还是解码响应做了区分,对于消费端而言,就是解码响应。对于提供端而言,即是解码请求。
2)为什么会出现两次解码?具体见这行:
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
那看看解码业务对象的代码,还记得在哪儿吗?DecodeableRpcResult#decode
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
// ...
case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
handleValue(in);
handleAttachment(in);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
// ...
default:
throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag);
}
// ...
return this;
}
private void handleValue(ObjectInput in) throws IOException {
try {
Type[] returnTypes;
if (invocation instanceof RpcInvocation) {
returnTypes = ((RpcInvocation) invocation).getReturnTypes();
} else {
returnTypes = RpcUtils.getReturnTypes(invocation);
}
Object value = null;
if (ArrayUtils.isEmpty(returnTypes)) {
// This almost never happens?
value = in.readObject();
} else if (returnTypes.length == 1) {
value = in.readObject((Class<?>) returnTypes[0]);
} else {
value = in.readObject((Class<?>) returnTypes[0], returnTypes[1]);
}
setValue(value);
} catch (ClassNotFoundException e) {
rethrow(e);
}
}
这里出现了ObjectInput,那底层的序列化框架选择逻辑是怎么样的呢?如何保持与消费端的序列化框架一致?
请求时,序列化框架是根据Url信息进行选择,默认是hessian2
传输时,会将序列化框架标识写入协议头,具体见 ExchangeCodec#encodeRequest#218
提供收到消费端的请求时,会根据这个id使用对应的序列化框架。
四、常见问题
在dubbo - 2.7.x版本中,该问题已解决。解决方案也比较简单,在编码侧传输时,通过 Collections.reverse(fields)反转字段顺序。
public JavaSerializer(Class cl, ClassLoader loader) {
introspectWriteReplace(cl, loader);
// ...
List fields = new ArrayList();
fields.addAll(primitiveFields);
fields.addAll(compoundFields);
Collections.reverse(fields);
// ...
}
五、写在最后
END
猜你喜欢
vivo互联网技术
vivo移动互联网是基于vivo 智能手机所建立的完整移动互联网生态圈,围绕vivo大数据运营,打造包括应用、游戏、资讯、品牌、电商、内容、金融、搜索的全方位服务生态,满足海量用户的多样化需求。
点一下,代码无 Bug