查看原文
其他

开发者如何玩转 RocketMQ?附最全源码解读

胡宗棠 CSDN 2018-08-13

借用一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键。

前排友情提示:这是一篇很硬的干货。


RocketMQ中Remoting通信模块概览


RocketMQ消息队列的整体部署架构如下图所示: 

先来说下RocketMQ消息队列集群中的几个角色:

  • NameServer:在MQ集群中做的是做命名服务,更新和路由发现 broker服务;

  • Broker-Master:broker 消息主机服务器;

  • Broker-Slave:broker 消息从机服务器;

  • Producer:消息生产者;

  • Consumer:消息消费者。

其中,RocketMQ集群的一部分通信如下:

  • Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定期向NameServer上报Topic路由信息;

  • 消息生产者Producer作为客户端发送消息时候,需要根据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取;

  • 消息生产者Producer根据所获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。

从上面可以看出在消息生产者,在Broker和NameServer间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终性能。

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-server、rocketmq-namesrv)所依赖和引用。

为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

鉴于RocketMQ的通信模块是建立在Netty基础之上的,因此在阅读RocketMQ的源码之前,读者最好先对Netty的多线程模型、JAVA NIO模型均有一定的了解,这样子理解RocketMQ源码会较为快一些。

本文使用的RocketMQ版本是4.2.0, 依赖的netty版本是4.0.42.Final. RocketMQ的代码结构图如下:

 

源码部分主要可以分为rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模块,通信框架就封装在rocketmq-remoting模块中。

本文主要从RocketMQ的协议格式,消息编解码,通信方式(同步/异步/单向)和具体的发送/接收消息的通信流程来进行阐述等。



RocketMQ中Remoting通信模块的具体实现


1、Remoting通信模块的类结构图

从类层次结构来看:

  • RemotingService:为最上层的接口,提供了三个方法: 

1void start();
2void shutdown();
3void registerRPCHook(RPCHook rpcHook);
  • RemotingClient/RemotingSever:两个接口继承了最上层接口—RemotingService,分别各自为Client和Server提供所必需的方法,下面所列的是RemotingServer的方法:

1/**
2     * 同RemotingClient端一样
3     *
4     * @param requestCode
5     * @param processor
6     * @param executor
7     */

8    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
9        final ExecutorService executor)
;
10
11    /**
12     * 注册默认的处理器
13     *
14     * @param processor
15     * @param executor
16     */

17    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
18
19    int localListenPort();
20
21    /**
22     * 根据请求code来获取不同的处理Pair
23     *
24     * @param requestCode
25     * @return
26     */

27    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
28
29    /**
30     * 同RemotingClient端一样,同步通信,有返回RemotingCommand
31     * @param channel
32     * @param request
33     * @param timeoutMillis
34     * @return
35     * @throws InterruptedException
36     * @throws RemotingSendRequestException
37     * @throws RemotingTimeoutException
38     */

39    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
40        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
41        RemotingTimeoutException
;
42
43    /**
44     * 同RemotingClient端一样,异步通信,无返回RemotingCommand
45     *
46     * @param channel
47     * @param request
48     * @param timeoutMillis
49     * @param invokeCallback
50     * @throws InterruptedException
51     * @throws RemotingTooMuchRequestException
52     * @throws RemotingTimeoutException
53     * @throws RemotingSendRequestException
54     */

55    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
56        final InvokeCallback invokeCallback) throws InterruptedException,
57        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException
;
58
59    /**
60     * 同RemotingClient端一样,单向通信,诸如心跳包
61     *
62     * @param channel
63     * @param request
64     * @param timeoutMillis
65     * @throws InterruptedException
66     * @throws RemotingTooMuchRequestException
67     * @throws RemotingTimeoutException
68     * @throws RemotingSendRequestException
69     */

70    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
71        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
72        RemotingSendRequestException
;
  • NettyRemotingAbstract:Netty通信处理的抽象类,定义并封装了Netty处理的公共处理方法; 

  • NettyRemotingClient/NettyRemotingServer:分别实现了RemotingClient和RemotingServer,都继承了NettyRemotingAbstract抽象类。RocketMQ中其他的组件(如client、nameServer、broker在进行消息的发送和接收时均使用这两个组件)。

2、消息的协议设计与编码解码

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。 

RemotingCommand类的部分成员变量如下:

Header字段类型Request说明Response说明
codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0表示成功,非0则表示各种错误
languageLanguageCode请求方实现的语言应答方实现的语言
versionint请求方程序的版本应答方程序的版本
opaqueint相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回
flagint区分是普通RPC还是onewayRPC得标志区分是普通RPC还是onewayRPC得标志
remarkString传输自定义文本信息传输自定义文本信息
extFieldsHashMap,>请求自定义扩展信息响应自定义扩展信息

这里展示下Broker向NameServer发送一次心跳注册的报文:

1[
2code=103,//这里的103对应的code就是broker向nameserver注册自己的消息
3language=JAVA,
4version=137,
5opaque=58,//这个就是requestId
6flag(B)=0,
7remark=null,
8extFields={
9    brokerId=0,
10    clusterName=DefaultCluster,
11    brokerAddr=ip1: 10911,
12    haServerAddr=ip1: 10912,
13    brokerName=LAPTOP-SMF2CKDN
14},
15serializeTypeCurrentRPC=JSON

下面来看下RocketMQ通信协议的格式: 

可见传输内容主要可以分为以下4部分: 

  • 消息长度:总长度,四个字节存储,占用一个int类型; 

  • 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度; 

  • 消息头数据:经过序列化后的消息头数据;

  • 消息主体数据:消息主体的二进制字节数据内容。

消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成,消息解码decode方法是编码的逆向过程。

3、消息的通信方式和通信流程

在RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)和单向(oneway)这三种。

其中“同步”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。本文将主要介绍RocketMQ的异步通信流程(限于篇幅,读者可以按照同样的模式进行分析同步通信流程)。 

下面先给出了RocketMQ异步通信的整体流程图: 

 

下面两小节内容主要介绍了Client端发送请求消息、Server端接收消息的具体实现并简要分析的Client端的回调。

3.1 Client发送请求消息的具体实现

当客户端调用异步通信接口—invokeAsync时候,先由RemotingClient的实现类—NettyRemotingClient根据addr获取相应的channel(如果本地缓存中没有则创建),随后调用invokeAsyncImpl方法,将数据流转给抽象类NettyRemotingAbstract处理(真正做完发送请求动作的是在NettyRemotingAbstract抽象类的invokeAsyncImpl方法里面)。

具体发送请求消息的源代码如下所示:

1    /**
2     * invokeAsync(异步调用)
3     * 
4     */

5    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
6        final InvokeCallback invokeCallback)
7        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException 
{
8        //相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1
9
10        final int opaque = request.getOpaque();
11        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
12        if (acquired) {
13            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
14            //根据request ID构建ResponseFuture
15            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
16            //将ResponseFuture放入responseTable
17            this.responseTable.put(opaque, responseFuture);
18            try {
19                //使用Netty的channel发送请求数据
20                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
21                    //消息发送后执行
22                    @Override
23                    public void operationComplete(ChannelFuture f) throws Exception {
24                        if (f.isSuccess()) {
25                            //如果发送消息成功给Server,那么这里直接Set后return
26                            responseFuture.setSendRequestOK(true);
27                            return;
28                        } else {
29                            responseFuture.setSendRequestOK(false);
30                        }
31
32                        responseFuture.putResponse(null);
33                        responseTable.remove(opaque);
34                        try {
35                            //执行回调
36                            executeInvokeCallback(responseFuture);
37                        } catch (Throwable e) {
38                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
39                        } finally {
40                            //释放信号量
41                            responseFuture.release();
42                        }
43
44                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
45                    }
46                });
47            } catch (Exception e) {
48                //异常处理
49                responseFuture.release();
50                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
51                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
52            }
53        } else {
54            if (timeoutMillis <= 0) {
55                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
56            } else {
57                String info =
58                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
59                        timeoutMillis,
60                        this.semaphoreAsync.getQueueLength(),
61                        this.semaphoreAsync.availablePermits()
62                    );
63                log.warn(info);
64                throw new RemotingTimeoutException(info);
65            }
66        }
67    }

在Client端发送请求消息时有个比较重要的数据结构需要注意下:

  • responseTable—保存请求码与响应关联映射

1protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable 

opaque表示请求发起方在同个连接上不同的请求标识代码,每次发送一个消息的时候,可以选择同步阻塞/异步非阻塞的方式。无论是哪种通信方式,都会保存请求操作码至ResponseFuture的Map映射—responseTable中。

  • ResponseFuture—保存返回响应(包括回调执行方法和信号量)

1public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
2        SemaphoreReleaseOnlyOnce once) {
3        this.opaque = opaque;
4        this.timeoutMillis = timeoutMillis;
5        this.invokeCallback = invokeCallback;
6        this.once = once;
7    }

对于同步通信来说,第三、四个参数为null;而对于异步通信来说,invokeCallback是在收到消息响应的时候能够根据responseTable找到请求码对应的回调执行方法,semaphore参数用作流控,当多个线程同时往一个连接写数据时可以通过信号量控制permit同时写许可的数量。

  • 异常发送流程处理—定时扫描responseTable本地缓存

在发送消息时候,如果遇到异常情况(比如服务端没有response返回给客户端或者response因网络而丢失),上面所述的responseTable的本地缓存Map将会出现堆积情况。这个时候需要一个定时任务来专门做responseTable的清理回收。在RocketMQ的客户端/服务端启动时候会产生一个频率为1s调用一次来的定时任务检查所有的responseTable缓存中的responseFuture变量,判断是否已经得到返回, 并进行相应的处理。

1public void scanResponseTable() {
2        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
3        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
4        while (it.hasNext()) {
5            Entry<Integer, ResponseFuture> next = it.next();
6            ResponseFuture rep = next.getValue();
7
8            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
9                rep.release();
10                it.remove();
11                rfList.add(rep);
12                log.warn("remove timeout request, " + rep);
13            }
14        }
15
16        for (ResponseFuture rf : rfList) {
17            try {
18                executeInvokeCallback(rf);
19            } catch (Throwable e) {
20                log.warn("scanResponseTable, operationComplete Exception", e);
21            }
22        }
23    }

3.2 Server端接收消息并进行处理的具体实现

Server端接收消息的处理入口在NettyServerHandler类的channelRead0方法中,其中调用了processMessageReceived方法(这里省略了Netty服务端消息流转的大部分流程和逻辑)。

其中服务端最为重要的处理请求方法实现如下:

1public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
2    //根据RemotingCommand中的code获取processor和ExecutorService
3    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
4    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
5    final int opaque = cmd.getOpaque();
6
7    if (pair != null) {
8        Runnable run = new Runnable() {
9            @Override
10            public void run() {
11                try {
12                    //rpc hook
13                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
14                    if (rpcHook != null) {
15                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
16                    }
17                    //processor处理请求
18                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
19                    //rpc hook
20                    if (rpcHook != null) {
21                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
22                    }
23
24                    if (!cmd.isOnewayRPC()) {
25                        if (response != null) {
26                            response.setOpaque(opaque);
27                            response.markResponseType();
28                            try {
29                                ctx.writeAndFlush(response);
30                            } catch (Throwable e) {
31                                PLOG.error("process request over, but response failed", e);
32                                PLOG.error(cmd.toString());
33                                PLOG.error(response.toString());
34                            }
35                        } else {
36
37                        }
38                    }
39                } catch (Throwable e) {
40                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
41                        .equals(e.getClass().getCanonicalName())) {
42                        PLOG.error("process request exception", e);
43                        PLOG.error(cmd.toString());
44                    }
45
46                    if (!cmd.isOnewayRPC()) {
47                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
48                            RemotingHelper.exceptionSimpleDesc(e));
49                        response.setOpaque(opaque);
50                        ctx.writeAndFlush(response);
51                    }
52                }
53            }
54        };
55
56        if (pair.getObject1().rejectRequest()) {
57            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
58                "[REJECTREQUEST]system busy, start flow control for a while");
59            response.setOpaque(opaque);
60            ctx.writeAndFlush(response);
61            return;
62        }
63
64        try {
65            //封装requestTask
66            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
67            //想线程池提交requestTask
68            pair.getObject2().submit(requestTask);
69        } catch (RejectedExecutionException e) {
70            if ((System.currentTimeMillis() % 10000) == 0) {
71                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
72                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
73                    + pair.getObject2().toString() //
74                    + " request code: " + cmd.getCode());
75            }
76
77            if (!cmd.isOnewayRPC()) {
78                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
79                    "[OVERLOAD]system busy, start flow control for a while");
80                response.setOpaque(opaque);
81                ctx.writeAndFlush(response);
82            }
83        }
84    } else {
85        String error = " request type " + cmd.getCode() + " not supported";
86        //构建response
87        final RemotingCommand response =
88            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
89        response.setOpaque(opaque);
90        ctx.writeAndFlush(response);
91        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
92    }
93}

上面的请求处理方法中根据RemotingCommand的请求业务码来匹配到相应的业务处理器;然后生成一个新的线程提交至对应的业务线程池进行异步处理。

  • processorTable—请求业务码与业务处理、业务线程池的映射变量 

1    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
2        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

我想RocketMQ这种做法是为了给不同类型的请求业务码指定不同的处理器Processor处理,同时消息实际的处理并不是在当前线程,而是被封装成task放到业务处理器Processor对应的线程池中完成异步执行。

在RocketMQ中能看到很多地方都是这样的处理,这样的设计能够最大程度的保证异步,保证每个线程都专注处理自己负责的东西。

3.3 Client端异步回调执行的实现分析

看到这里可能有一些同学会疑问Client端的异步回调究竟在哪里执行的?从上面“RocketMQ异步通信的整体时序图”来看,回调执行处理的流程的确是放在了Client端来完成,而rocketmq-remoting通信模块中只是给异步回调处理提供了接口。

这里需要结合3.1节的内容和NettyRemotingAbstract抽象类的processResponseCommand方法,便可以明白Client端实现异步回调的大致流程了。在Client端发送异步消息时候(rocketmq-client模块最终调用sendMessageAsync方法时),会将InvokeCallback的接口注入,而在Server端的异步线程由上面所讲的业务线程池真正执行后,返回response给Client端时候才会去触发执行。NettyRemotingAbstract抽象类的processResponseCommand方法的具体代码如下:

1public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
2        //从RemotingCommand中获取opaque值
3        final int opaque = cmd.getOpaque();‘
4        //从本地缓存的responseTable这个Map中取出本次异步通信连接对应的ResponseFuture变量
5        final ResponseFuture responseFuture = responseTable.get(opaque);
6        if (responseFuture != null) {
7            responseFuture.setResponseCommand(cmd);
8
9            responseTable.remove(opaque);
10
11            if (responseFuture.getInvokeCallback() != null) {
12                //在这里真正去执行Client注入进来的异步回调方法
13                executeInvokeCallback(responseFuture);
14            } else {
15                //否则释放responseFuture变量
16                responseFuture.putResponse(cmd);
17                responseFuture.release();
18            }
19        } else {
20            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
21            log.warn(cmd.toString());
22        }
23    }

以上主要介绍了RocketMQ的协议格式,消息编解码,通信方式(同步/异步/单向)、消息发送/接收以及异步回调的主要通信流程。而下面将主要对RocketMQ消息队列RPC通信部分的Netty多线程模型进行重点介绍。


为何要使用Netty作为高性能的通信库?


在看RocketMQ的RPC通信部分时候,可能有不少同学有这样子的疑问,RocketMQ为何要选择Netty而不直接使用JDK的NIO进行网络编程呢?这里有必要先来简要介绍下Netty。

Netty是一个封装了JDK的NIO库的高性能网络通信开源框架。它提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

下面主要列举了下一般系统的RPC通信模块会选择Netty作为底层通信库的理由(作者认为RocketMQ的RPC同样也是基于此选择了Netty):

  • Netty的编程API使用简单,开发门槛低,无需编程者去关注和了解太多的NIO编程模型和概念;

  • 对于编程者来说,可根据业务的要求进行定制化地开发,通过Netty的ChannelHandler对通信框架进行灵活的定制化扩展;

  • Netty框架本身支持拆包/解包,异常检测等机制,让编程者可以从JAVA NIO的繁琐细节中解脱,而只需要关注业务处理逻辑;

  • Netty解决了(准确地说应该是采用了另一种方式完美规避了)JDK NIO的Bug(Epoll bug,会导致Selector空轮询,最终导致CPU 100%);

  • Netty框架内部对线程,selector做了一些细节的优化,精心设计的reactor多线程模型,可以实现非常高效地并发处理;

  • Netty已经在多个开源项目(Hadoop的RPC框架avro使用Netty作为通信框架)中都得到了充分验证,健壮性/可靠性比较好。



RocketMQ中RPC通信的Netty多线程模型


RocketMQ的RPC通信部分采用了"1+N+M1+M2"的Reactor多线程模式,对网络通信部分进行了一定的扩展与优化,这一节主要让我们来看下这一部分的具体设计与实现内容。

4.1 Netty的Reactor多线程模型设计概念与简述

这里有必要先来简要介绍下Netty的Reactor多线程模型。Reactor多线程模型的设计思想是分而治之+事件驱动。

  • 分而治之

一般来说,一个网络请求连接的完整处理过程可以分为接受(accept)、数据读取(read)、解码/编码(decode/encode)、业务处理(process)、发送响应(send)这几步骤。Reactor模型将每个步骤都映射成为一个任务,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是这个任务,且采用以非阻塞方式执行。

  • 事件驱动

每个任务对应特定网络事件。当任务准备就绪时,Reactor收到对应的网络事件通知,并将任务分发给绑定了对应网络事件的Handler执行。

4.2 RocketMQ中RPC通信的1+N+M1+M2的Reactor多线程设计与实现

  • RocketMQ中RPC通信的Reactor多线程设计与流程

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。下面先给出一张RocketMQ的RPC通信层的Netty多线程模型框架图,让大家对RocketMQ的RPC通信中的多线程分离设计有一个大致的了解。

从上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接后丢给Reactor 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),它负责将建立好连接的socket 注册到 selector上去(RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(defaultEventExecutorGroup,即上面的“M1”,源码中默认设置为8)。 

为了更为高效地处理RPC的网络请求,这里的Worker线程池是专门用于处理Netty网络通信相关的(包括编码/解码、空闲链接管理、网络连接管理以及网络请求处理)。

而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

下面以表格的方式列举了下上面所述的“1+N+M1+M2”Reactor多线程模型:

线程数线程名线程具体说明
1NettyBoss_%dReactor 主线程
NNettyServerEPOLLSelector_%d_%dReactor 线程池
M1NettyServerCodecThread_%dWorker线程池
M2RemotingExecutorThread_%d业务processor处理线程池
  • RocketMQ中RPC通信的Reactor多线程的代码具体实现

说完了Reactor多线程整体的设计与流程,大家应该就对RocketMQ的RPC通信的Netty部分有了一个比较全面的理解了,那接下来就从源码上来看下一些细节部分(在看该部分代码时候需要读者对JAVA NIO和Netty的相关概念与技术点有所了解)。

在NettyRemotingServer的实例初始化时,会初始化各个相关的变量包括serverBootstrap、nettyServerConfig参数、channelEventListener监听器并同时初始化eventLoopGroupBoss和eventLoopGroupSelector两个Netty的EventLoopGroup线程池(这里需要注意的是,如果是Linux平台,并且开启了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll;否则就用Java NIO的NioEventLoopGroup)。代码如下:

1public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
2        final ChannelEventListener channelEventListener) 
{
3        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
4        this.serverBootstrap = new ServerBootstrap();
5        this.nettyServerConfig = nettyServerConfig;
6        this.channelEventListener = channelEventListener;
7      //省略部分代码
8      //初始化时候nThreads设置为1,说明RemotingServer端的Disptacher链接管理和分发请求的线程为1,用于接收客户端的TCP连接
9        this.eventLoopGroupBoss = new NioEventLoopGroup(1new ThreadFactory() {
10            private AtomicInteger threadIndex = new AtomicInteger(0);
11
12            @Override
13            public Thread newThread(Runnable r) {
14                return new Thread(r, String.format("NettyBoss_%d"this.threadIndex.incrementAndGet()));
15            }
16        });
17
18        /**
19         * 根据配置设置NIO还是Epoll来作为Selector线程池
20         * 如果是Linux平台,并且开启了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll;否则,就用Java NIO的NioEventLoopGroup。
21         * 
22         */

23        if (useEpoll()) {
24            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
25                private AtomicInteger threadIndex = new AtomicInteger(0);
26                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
27
28                @Override
29                public Thread newThread(Runnable r) {
30                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
31                }
32            });
33        } else {
34            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
35                private AtomicInteger threadIndex = new AtomicInteger(0);
36                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
37
38                @Override
39                public Thread newThread(Runnable r) {
40                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
41                }
42            });
43        }
44        //省略部分代码 

在NettyRemotingServer实例初始化完成后,就会将其启动。Server端在启动阶段会将之前实例化好的1个acceptor线程(eventLoopGroupBoss),N个IO线程(eventLoopGroupSelector),M1个worker 线程(defaultEventExecutorGroup)绑定上去。

这里需要说明的是,Worker线程拿到网络数据后,就交给Netty的ChannelPipeline(其采用责任链设计模式),从Head到Tail的一个个Handler执行下去,这些 Handler是在创建NettyRemotingServer实例时候指定的。NettyEncoder和NettyDecoder 负责网络传输数据和 RemotingCommand 之间的编解码。NettyServerHandler 拿到解码得到的 RemotingCommand 后,根据 RemotingCommand.type 来判断是 request 还是 response来进行相应处理,根据业务请求码封装成不同的task任务后,提交给对应的业务processor处理线程池处理。

从上面的描述中可以概括得出RocketMQ的RPC通信部分的Reactor线程池模型框图。

整体可以看出RocketMQ的RPC通信借助Netty的多线程模型,其服务端监听线程和IO线程分离,同时将RPC通信层的业务逻辑与处理具体业务的线程进一步相分离。时间可控的简单业务都直接放在RPC通信部分来完成,复杂和时间不可控的业务提交至后端业务线程池中处理,这样提高了通信效率和MQ整体的性能。

其中抽象出NioEventLoop来表示一个不断循环执行处理任务的线程,每个NioEventLoop有一个selector,用于监听绑定在其上的socket链路。



总结


刚开始看RocketMQ源码—RPC通信模块可能觉得略微有点复杂,但是只要能够抓住Client端发送请求消息、Server端接收消息并处理的流程以及回调过程来分析和梳理,那么整体来说并不复杂。

RPC通信部分也是RocketMQ源码中重要的部分之一,想要对其中的全过程和细节有更为深刻的理解,还需要多在本地环境Debug和分析对应的日志。

限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。

作者:胡宗棠,中移(苏州)软件技术有限公司,云计算软件高级研发工程师,从事公有云产品平台研发、架构设计;目前专注于大型分布式系统的高并发、高可用设计。曾就职于蚂蚁金服支付宝,甲骨文中国研发中心,个人公众号:匠心独运的博客。

声明:本文为作者个人投稿,版权归作者所有。


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存