查看原文
其他

阿里资深数据专家:RocketMQ底层通信机制(赠书)

杨开元 中生代技术 2021-09-05


程序员突破成长的好伙伴连接技术 接力价值

本文为杨开元先生投稿至中生代技术(ID:freshmanTechnology)


责编 | 姜新城

 第  714 篇技术好文:4999字 |11分钟阅读


作者介绍

杨开元,阿里巴巴数据专家,毕业于北京大学,有10年IT行业研发经验。对RocketMQ有深入的研究,是RocketMQ源码贡献者。曾就职于甲骨文和猎豹移动,专注于大数据和实时计算。在大量的工作实践中,对MySQL、J2EE、JVM、Spring、Hadoop、Kafka、Storm、Flink都有深入研究。喜欢剖析源码,分析原理,为开源项目贡献代码。


分布式系统各个角色间的通信效率很关键,通信效率的高低直接影响系统性能,基于Socket实现一个高效的Tcp通信协议是个很有挑战的事情,本节说明RocketMQ是如何解决这个问题的

01

Remoting模块

_____

RocketMQ的通信相关代码在Remoting模块里,先来看看主要类结构。

图1-1 Remoting模块的类继承关系

RemotingService为最上层接口,定义了三个方法:

void start();

void shutdown();

void registerRPCHook(RPCHookrpcHook);

RemotingClient,RemotingServer继承RemotingService接口, 并增加了自己特有的方法。

代码清单1-1  RemotingClient主要函数定义

1void registerProcessor(final int requestCode, finalNettyRequestProcessor processor,final ExecutorService executor);
2RemotingCommand invokeSync(final String addr, final RemotingCommandrequest, final long timeoutMillis);
3void invokeAsync(final String addr, final RemotingCommand request,final long timeoutMillis,final InvokeCallback invokeCallback);
4void invokeOneway(final String addr, final RemotingCommand request,final long timeoutMillis);
5void updateNameServerAddressList(final List<String> addrs); 

然后看看具体的实现类,NettyRemotingClient和NettyRemotingServer分别实现了RemotingClient和RemotingServer, 而且都继承了NettyRemotingAbstract类.

通过上面的封装,RocketMQ各个模块间的通信,可以通过发送统一格式的自定义消息(RemotingCommand)来完成的,各个模块间的通信实现简洁明了。

比如NameServer模块中,NameServerController有个remotingServer变量,NameServer在启动时初始化好各个变量,然后启动remotingServer即可,剩下NameServer要做的是专心实现好处理RemotingCommand的逻辑。

代码清单1-2  NameServer处理主流程代码

1@Override
2public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
3    if (log.isDebugEnabled()){
4       log.debug("receive request, {} {} {}",
5            request.getCode(),
6           RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
7            request);
8    }
9    switch (request.getCode()){
10        caseRequestCode.PUT_KV_CONFIG:
11            returnthis.putKVConfig(ctx, request);
12        caseRequestCode.GET_KV_CONFIG:
13            returnthis.getKVConfig(ctx, request);
14        caseRequestCode.DELETE_KV_CONFIG:
15            returnthis.deleteKVConfig(ctx, request);
16        caseRequestCode.REGISTER_BROKER:
17            VersionbrokerVersion = MQVersion.value2Version(request.getVersion());
18            if (brokerVersion.ordinal()>= MQVersion.Version.V3_0_11.ordinal()) {
19                returnthis.registerBrokerWithFilterServer(ctx, request);
20            } else {
21                returnthis.registerBroker(ctx, request);
22            }
23        caseRequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
24            returnthis.getHasUnitSubUnUnitTopicList(ctx, request);
25        caseRequestCode.UPDATE_NAMESRV_CONFIG:
26            returnthis.updateConfig(ctx, request);
27        caseRequestCode.GET_NAMESRV_CONFIG:
28            returnthis.getConfig(ctx, request);
29        default:
30            break;
31    }
32    return null;
33}

在Consumer的源码中,获取消息的底层的通信部分也是发送一个RemotingComand 请求,返回的response也是个RemotingCommand类型。

代码清单1-3  Consumer请求消息底层实现代码

1private PullResult pullMessageSync(//
2    final String addr, // 1
3    final RemotingCommandrequest, // 2
4    final long timeoutMillis//3
5) throws RemotingException, InterruptedException, MQBrokerException{
6    RemotingCommand response =this.remotingClient.invokeSync(addr, request, timeoutMillis);
7    assert response != null;
8    returnthis.processPullResponse(response);
9}

从源码中可以看出,RocketMQ中复杂的通信过程,被RemotingCommand统一起来,大部分的逻辑都是通过发送Command,接受并处理Command完成。

02

协议设计和编解码

_____

RocketMQ自己定义了一个通信协议,使得模块间传输的二进制消息和有意义的内容之间互相转换。协议格式如图1-2所示。

 图1-2 RocketMQ的通信协议

(1)第一部分是大端4个字节整数,值等于第二,三,四部分长度总和

(2)第二部分是大端4个字节整数,值等于第三部分的长度

(3)第三部分是通过json 序列化的数据

(4)第四部分是通过应用自定义二进制序列化的数据

消息的解码过程在RomotingCommand的decode函数里。

代码清单1-4  消息解码函数

1public static RemotingCommand decode(final ByteBuffer byteBuffer) {
2    int length =byteBuffer.limit();
3    int oriHeaderLen =byteBuffer.getInt();
4    int headerLength =getHeaderLength(oriHeaderLen);
5    byte[] headerData = newbyte[headerLength];
6   byteBuffer.get(headerData);
7    RemotingCommand cmd =headerDecode(headerData, getProtocolType(oriHeaderLen));
8    int bodyLength = length - 4 - headerLength;
9    byte[] bodyData = null;
10    if (bodyLength > 0) {
11        bodyData = newbyte[bodyLength];
12       byteBuffer.get(bodyData);
13    }
14    cmd.body = bodyData;
15    return cmd;
16}

对应的消息编码过程在RemotingCommand的encode函数中。

代码清单1-5  消息编码函数

1public ByteBuffer encode() {
2    // 1> header lengthsize
3    int length = 4;
4    // 2> header datalength
5    byte[] headerData =this.headerEncode();
6    length +=headerData.length;
7    // 3> body data length
8    if (this.body != null) {
9        length += body.length;
10    }
11    ByteBuffer result =ByteBuffer.allocate(4 + length);
12    // length
13    result.putInt(length);
14    // header length
15   result.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));
16    // header data
17    result.put(headerData);
18    // body data;
19    if (this.body != null) {
20        result.put(this.body);
21    }
22    result.flip();
23    return result;
24}


03

Netty库

_____

RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的通信实现的,Netty是个事件驱动的网络编程框架,它屏蔽了Java Socket,Nio等复杂细节,用户只需用好Netty,就可以实现一个网络编程专家+并发编程专家水平的Server、Client网络程序。应用Netty有一定的门槛,需要了解它的EventLoopGroup,Channel,Handler模型以及各种具体的配置。RocketMQ利用Netty实现的通信类是NettyRemotingServer和NettyRemotingClient,用户也可以参考这两个类的实现来学习使用Netty。


本文摘选自以下图书:


RocketMQ实战与原理解析

作者:杨开元

定价:59.00

·RocketMQ由阿里开源,Apache开源项目,经受多年流量峰值考验,在多个性能指标上远超同类产品

·作者是阿里资深数据专家,对RocketMQ有深入的研究,并有大量的实践经验。在写这本书之前,作者不仅系统、深入地阅读了RocketMQ的源代码,而且还向RocketMQ的官方开发团队深入了解了它的诸多设计细节。作者结合自己多年使用RocketMQ的经验,从开发和运维两个维度,给出了大部分场景下的优秀实践,能帮助读者在学会使用和用好RocketMQ的同时,尽量少“踩坑”。同时,本书也结合源码分析了分布式消息队列的原理,使读者可以在复杂业务场景下定制有特殊功能的消息队列。

全书共13章,在逻辑上分为两大部分:

第一部分(第1~8章):RocketMQ实战

第1~2章详细讲解了RocketMQ如何快速入门,以及在生产环境下的配置和使用;

第3~4章具体讲解了不同类型生产者和消费者的特点,以及分布式消息队列的协调者NameServer;

第5章从消息的存储、发送、复制和高可用等多个维度讲解了RocketMQ的内部机制;

第6章讨论了消息的可靠性,如何让消息队列在满足业务逻辑需求的同时稳定、可靠地长期运行;

第7章讨论了在大流量场景下,吞吐量优先时RocketMQ的使用方法;

第8章介绍RocketMQ与SpringBoot、Spark、Flink以及自定义的运维工具等其它系统的对接方法;

第二部分(第9~13章):RocketMQ原理

首先对RocketMQ的源码结构进行了整体介绍,然后深入地分析了NameServer、各种常用消费类、主从同步机制,以及基于Netty的通信的源码实现。掌握这些源代码以后,读者可以快速定制属于自己的具有特殊功能的消息中间件。


·云栖社区官方出品,得到RocketMQ官方研发团队以及业界的多位专家的肯定和推荐


京东购买链接:

(感谢华章科技,为本文读者提供了3本图书赠书,文末留言对RocketMQ的理解或建议,有意义的精选留言,点赞数前三名即可获得图书赠送)


 

目前60000+人已关注加入我们

       

       

推荐阅读
盒子科技刘恒:聚合支付系统演讲
手把手教你搭建一个基于Java的分布式爬虫系统
知识付费时代,程序员,你的知识在哪里?
深入浅出分布式缓存的通用方法
蚂蚁金服开源 | 在 Spring Boot 中集成 SOFABoot 类隔离能力

中生代技术

每天早上7点,推送有营养的干货文章;

总覆盖会员60000+人;资深架构、总监等职位以上3000+人。

定期在线分享超过100期,线下技术沙龙超过70次、覆盖20多个等城市!

关注技术架构、研发管理、互联网金融、电商、大数据、区块链、人工智能等方向!


加入中生代技术群聊,请添加白明微信:zsdwyq,注明姓名、职称和技术方向,通过后加入中生代技术群,和群友们共同学习成长!


↙↙↙点击“阅读原文”查跳转京东链接
: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存