图文加多个测试带你彻底搞懂Netty ChannelPipeline的执行顺序(附源码)
netty version
4.1.65.Final
ChannelPipeline 是什么
Pipeline,管道、流水线,类似于责任链模式。基本上我们使用Netty开发程序需要编写的就是ChannelPipeline
中的各个ChannelHandler
ChannelPipeline
就是用来组合所有的ChannelHandler
我们今天重点讨论的对象是ChannelPipeline
添加多个ChannelHandler
他的执行顺序是什么
测试demo
这里我们直接用测试代码来看看
NettyServer
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 启动服务器
startServer();
}
private static void startServer() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CustomInboundHandler1());
pipeline.addLast(new CustomInboundHandler2());
pipeline.addLast(new CustomOutboundHandler1());
pipeline.addLast(new CustomOutboundHandler2());
pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
//buf.readableBytes()获取缓冲区可读的字节数
byte[] req = new byte[msg.readableBytes()];
// 将缓冲区的字节数组复制到新的byte数组中
msg.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println("Server received: " + body);
ByteBuf firstMessage;
byte[] req1 = "你好客户端".getBytes();
firstMessage = Unpooled.buffer(req1.length);
firstMessage.writeBytes(req1);
System.out.println("开始给客户端发送消息");
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("连接上来了");
ctx.fireChannelRegistered();
}
});
}
});
ChannelFuture future = b.bind(8888).sync();
future.channel().closeFuture().sync();
}
}
NettyClient
public class NettyClient {
public static void main(String[] args) throws Exception {
// 启动客户端
startClient();
}
public static void startClient() throws InterruptedException {
Bootstrap clientBootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
clientBootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CustomOutboundHandler2());
pipeline.addLast(new CustomOutboundHandler1());
pipeline.addLast(new CustomInboundHandler2());
pipeline.addLast(new CustomInboundHandler1());
pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf firstMessage;
byte[] req = "你好服务器".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
System.out.println("------------ 开始发送消息 ------------");
ctx.writeAndFlush(firstMessage);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
//buf.readableBytes()获取缓冲区可读的字节数
byte[] req = new byte[msg.readableBytes()];
// 将缓冲区的字节数组复制到新的byte数组中
msg.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println("Client received: " + body);
}
});
}
});
ChannelFuture future = clientBootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
}
}
测试
我们这里先启动NettyServer
,然后启动 NettyClient
控制台输出信息如下
NettyClient
NettyServer
首先我们可以看到我们在 Netty
添加Handler
顺序如下
pipeline.addLast(new CustomOutboundHandler2());
pipeline.addLast(new CustomOutboundHandler1());
pipeline.addLast(new CustomInboundHandler2());
pipeline.addLast(new CustomInboundHandler1());
client Handler 执行顺序
所以对于client
执行的顺序是CustomOutboundHandler1
、CustomOutboundHandler2
、CustomInboundHandler2
、 CustomInboundHandler1
通俗的讲就是我们client
发送消息的时候就是如下一个顺序
只会执行OutboundHandler
在client
接受到server
返回的消息后的handler
执行顺序如下
只会执行InboundHandler
server Handler 执行顺序
服务端的顺序也是类似,由于client
发送消息到server
,所以对于服务端来说是接受消息
所以服务端首先执行的顺序就是接受消息
所以打印了
CustomInboundHandler1 - channelRead
CustomInboundHandler2 - channelRead
Server received: 你好服务器
server
在处理完数据后会返回消息给client
,这里server
就是发送数据
所以就是这个顺序
CustomOutboundHandler2 - write
CustomOutboundHandler1 - write
图解handler顺序
基于上面的demo演示,我们就总结了一个handler
的执行顺序
横看就是这个顺序,当然我们也可以通过上面的箭头看。
如果是发送消息 就是 从下到上
如果是接受消息就是从上到下
再谈ctx.channel().writeAndFlush
和ctx.writeAndFlush
现在我们所有的handerl
一般调用的方法都是ctx.channel().writeAndFlush
那么ctx.channel().writeAndFlush
和ctx.writeAndFlush
方法有什么区别呢
我们知道ChannelPipeline
是一个双向链表结构。
比如我们server
添加handler的顺序如下
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
那么在链表中的顺序就是如下
head->in1->in2->out1->out2->tail
如果我们在InboundHandler2
中执行ctx.channel().writeAndFlush
那么输出的顺序就是
InboundHandler1
InboundHandler2
OutboundHandler2
OutboundHandler1
如果在InboundHandler2
中执行ctx.writeAndFlush
,执行顺序是
InboundHandler1
InboundHandler2
可以看到执行ctx.writeAndFlush
他不会从tail
节点向前找OutboundHandler
而ctx.channel().writeAndFlush
则是从tail
节点开始向前找OutboundHandler
如果我们将handler顺序修改下改成如下
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
然后在InboundHandler2
执行ctx.writeAndFlush
,由于InboundHandler2
在链表tail
节点的前一个节点,所以输出的结果和执行ctx.channel().writeAndFlush
一样
都是
InboundHandler1
InboundHandler2
OutboundHandler2
OutboundHandler1
总结
从上面大量的用例我们可以看出如下规律
ChannelPipeline
是双向链表结构,包含ChannelInboundHandler
和ChannelOutboundHandler
两种处理器.也有处理器既是ChannelInboundHandler
又是ChannelOutboundHandler
,但是也属于这两种ctx.writeAndFlush
只会从当前的handler位置开始,往前找outbound执行ctx.pipeline().writeAndFlush
与ctx.channel().writeAndFlush
会从tail
的位置开始,往前找outbound
执行InboundHandler
的执行顺序一般是从上到下Head -> Tail
OutboundHandler
的执行顺序一般是从下到上,Tail -> Head
源码
源码: https://github.com/weihubeats/weihubeats_demos/tree/master/java-demos/netty-demo/src/test/java/com/weihubeats/netty/demo/channelPipeline