查看原文
其他

图文加多个测试带你彻底搞懂Netty ChannelPipeline的执行顺序(附源码)

weihubeats 小奏技术
2024-09-06

netty version

  • 4.1.65.Final

ChannelPipeline 是什么

Pipeline,管道、流水线,类似于责任链模式。基本上我们使用Netty开发程序需要编写的就是ChannelPipeline中的各个ChannelHandler

ChannelPipeline就是用来组合所有的ChannelHandler

我们今天重点讨论的对象是ChannelPipeline添加多个ChannelHandler他的执行顺序是什么

测试demo

这里我们直接用测试代码来看看

NettyServer

public class NettyServer {
 public static void main(String[] args) throws InterruptedException {
  // 启动服务器
  startServer();
  
 }

 private static void startServer() throws InterruptedException {
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  ServerBootstrap b = new ServerBootstrap();

  b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 1024)
    .childHandler(new ChannelInitializer<SocketChannel>() 
{
     @Override
     protected void initChannel(SocketChannel ch) {
      ChannelPipeline pipeline = ch.pipeline();
      pipeline.addLast(new CustomInboundHandler1());
      pipeline.addLast(new CustomInboundHandler2());
      pipeline.addLast(new CustomOutboundHandler1());
      pipeline.addLast(new CustomOutboundHandler2());
      pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
       @Override
       protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        //buf.readableBytes()获取缓冲区可读的字节数
        byte[] req = new byte[msg.readableBytes()];
        // 将缓冲区的字节数组复制到新的byte数组中
        msg.readBytes(req);
        String body = new String(req, StandardCharsets.UTF_8);

        System.out.println("Server received: " + body);
        ByteBuf firstMessage;
        byte[] req1 = "你好客户端".getBytes();
        firstMessage = Unpooled.buffer(req1.length);
        firstMessage.writeBytes(req1);
        System.out.println("开始给客户端发送消息");
        ctx.writeAndFlush(firstMessage);
       }
       @Override
       public void channelRegistered(ChannelHandlerContext ctx) {
        System.out.println("连接上来了");
        ctx.fireChannelRegistered();
       }
      });
     }
    });

  ChannelFuture future = b.bind(8888).sync();
  future.channel().closeFuture().sync();
 }
}

NettyClient

public class NettyClient {

 public static void main(String[] args) throws Exception {
  // 启动客户端
  startClient();
 }

 public static void startClient() throws InterruptedException {
  Bootstrap clientBootstrap = new Bootstrap();
  NioEventLoopGroup group = new NioEventLoopGroup();

  clientBootstrap.group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() 
{
     @Override
     protected void initChannel(SocketChannel ch) {
      ChannelPipeline pipeline = ch.pipeline();
      pipeline.addLast(new CustomOutboundHandler2());
      pipeline.addLast(new CustomOutboundHandler1());
      pipeline.addLast(new CustomInboundHandler2());
      pipeline.addLast(new CustomInboundHandler1());
      pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
       @Override
       public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf firstMessage;
        byte[] req = "你好服务器".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
        System.out.println("------------ 开始发送消息 ------------");
        ctx.writeAndFlush(firstMessage);
       }

       @Override
       protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        //buf.readableBytes()获取缓冲区可读的字节数
        byte[] req = new byte[msg.readableBytes()];
        // 将缓冲区的字节数组复制到新的byte数组中
        msg.readBytes(req);
        String body = new String(req, StandardCharsets.UTF_8);

        System.out.println("Client received: " + body);
       }

      });
     }
    });

  ChannelFuture future = clientBootstrap.connect("localhost"8888).sync();
  future.channel().closeFuture().sync();
 }
}

测试

我们这里先启动NettyServer,然后启动 NettyClient

控制台输出信息如下

  • NettyClient

  • NettyServer

首先我们可以看到我们在 Netty 添加Handler顺序如下

pipeline.addLast(new CustomOutboundHandler2());
      pipeline.addLast(new CustomOutboundHandler1());
      pipeline.addLast(new CustomInboundHandler2());
      pipeline.addLast(new CustomInboundHandler1());

client Handler 执行顺序

所以对于client执行的顺序是CustomOutboundHandler1CustomOutboundHandler2CustomInboundHandler2CustomInboundHandler1

通俗的讲就是我们client发送消息的时候就是如下一个顺序

只会执行OutboundHandler

client接受到server返回的消息后的handler执行顺序如下

只会执行InboundHandler

server Handler 执行顺序

服务端的顺序也是类似,由于client发送消息到server,所以对于服务端来说是接受消息

所以服务端首先执行的顺序就是接受消息

所以打印了

CustomInboundHandler1 - channelRead

CustomInboundHandler2 - channelRead

Server received: 你好服务器

server在处理完数据后会返回消息给client,这里server就是发送数据

所以就是这个顺序

CustomOutboundHandler2 - write

CustomOutboundHandler1 - write

图解handler顺序

基于上面的demo演示,我们就总结了一个handler的执行顺序

横看就是这个顺序,当然我们也可以通过上面的箭头看。

如果是发送消息 就是 从下到上

如果是接受消息就是从上到下

再谈ctx.channel().writeAndFlushctx.writeAndFlush

现在我们所有的handerl一般调用的方法都是ctx.channel().writeAndFlush

那么ctx.channel().writeAndFlushctx.writeAndFlush方法有什么区别呢

我们知道ChannelPipeline是一个双向链表结构。 比如我们server添加handler的顺序如下

ch.pipeline().addLast(new InboundHandler1());  
ch.pipeline().addLast(new InboundHandler2()); 
ch.pipeline().addLast(new OutboundHandler1());  
ch.pipeline().addLast(new OutboundHandler2());

那么在链表中的顺序就是如下

head->in1->in2->out1->out2->tail

如果我们在InboundHandler2中执行ctx.channel().writeAndFlush

那么输出的顺序就是

InboundHandler1 
InboundHandler2 
OutboundHandler2
OutboundHandler1

如果在InboundHandler2中执行ctx.writeAndFlush,执行顺序是

InboundHandler1 
InboundHandler2 

可以看到执行ctx.writeAndFlush他不会从tail节点向前找OutboundHandler

ctx.channel().writeAndFlush则是从tail节点开始向前找OutboundHandler

如果我们将handler顺序修改下改成如下

ch.pipeline().addLast(new OutboundHandler1());  
ch.pipeline().addLast(new OutboundHandler2());  
ch.pipeline().addLast(new InboundHandler1());  
ch.pipeline().addLast(new InboundHandler2());

然后在InboundHandler2执行ctx.writeAndFlush,由于InboundHandler2在链表tail节点的前一个节点,所以输出的结果和执行ctx.channel().writeAndFlush一样 都是

InboundHandler1
InboundHandler2
OutboundHandler2 
OutboundHandler1

总结

从上面大量的用例我们可以看出如下规律

  1. ChannelPipeline是双向链表结构,包含ChannelInboundHandlerChannelOutboundHandler两种处理器.也有处理器既是ChannelInboundHandler又是ChannelOutboundHandler,但是也属于这两种
  2. ctx.writeAndFlush只会从当前的handler位置开始,往前找outbound执行
  3. ctx.pipeline().writeAndFlushctx.channel().writeAndFlush会从tail的位置开始,往前找outbound执行
  4. InboundHandler的执行顺序一般是从上到下Head -> Tail
  5. OutboundHandler的执行顺序一般是从下到上,Tail -> Head

源码

  • 源码: https://github.com/weihubeats/weihubeats_demos/tree/master/java-demos/netty-demo/src/test/java/com/weihubeats/netty/demo/channelPipeline


继续滑动看下一个
小奏技术
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存