【Netty】从0到1(十):入门-Pipeline 与 ChannelHandler
前言
本篇博文是《从0到1学习 Netty》系列的第十篇博文,主要内容是介绍 Netty 中 Pipeline 与 ChannelHandler 的概念和作用,通过源码分析和应用案例进行详细讲解,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
Pipeline
在 Netty 中,pipeline
是一种机制,它由一系列的 ChannelHandler
组成。pipeline
负责处理进入或离开 Channel
的数据,并且将事件(比如连接建立、数据读取等)转发给正确的 handler
进行处理。
handler
是 pipeline
的节点,每个 handler
会接收来自前一个 handler
的处理结果,并进行自己的处理。然后,它将处理结果传递给下一个 handler
,直到最终达到 pipeline
的尾部。pipeline
的头部和尾部都是特殊的 handler
,头部负责处理 Inbound
操作,尾部则负责处理 Outbound
操作。
接下来进行具体操作:
1、通过 channel
获取 pipeline
:
ChannelPipeline pipeline = ch.pipeline();
2、添加处理器:
pipeline.addLast(){
...
};
3、服务端代码如下,完整代码见 Github:
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("Inbound1");
super.channelRead(ctx, msg);
}
});
// h2
// h3
...
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("Outbound4");
super.write(ctx, msg, promise);
}
});
// h5
// h6
...
}
})
4、客户端完整代码在博文 从0到1(八):入门-ChannelFuture 与 CloseFuture 中有详细讲解:
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress(7999));
Channel channel = channelFuture.sync().channel();
log.debug(channel.toString());
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("quit".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("Waiting Close...");
closeFuture.addListener((ChannelFutureListener) future -> {
log.debug("处理 channel 关闭之后的操作");
group.shutdownGracefully();
});
}
}
5、服务端运行结果:
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Inbound1
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Inbound2
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Inbound3
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Outbound6
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Outbound5
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Outbound4
6、客户端运行结果:
15:46:03 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96] REGISTERED
15:46:03 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96] CONNECT: 0.0.0.0/0.0.0.0:7999
15:46:03 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] ACTIVE
15:46:03 [DEBUG] [main] c.s.n.c.CloseFutureClient - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999]
Waiting Close...
sidiot
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] FLUSH
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] WRITE: 6B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 |sidiot |
+--------+-------------------------------------------------+----------------+
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] FLUSH
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] READ: 9B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 65 72 76 65 72 2e 2e 2e |server... |
+--------+-------------------------------------------------+----------------+
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] READ COMPLETE
看到服务端的运行结果,可能有小伙伴会感到疑惑,按照 handler 的添加顺序,运行结果不应该是 In1 -> In2 -> In3 -> Out4 -> Out5 -> Out6
吗?
这是因为,当 Inbound
操作发生时,Pipeline 会从头部开始向后调用 Handler,直到找到能够处理该操作的 Handler。一旦找到,该 Handler 将处理数据并将其传递给下一个 Handler,直到达到尾部为止。
同样地,当 Outbound
操作发生时,Pipeline 会从尾部开始向前调用 Handler,直到找到能够处理该操作的 Handler。一旦找到,该 Handler 将处理数据并将其传递给上一个 Handler,直到达到头部为止。
ChannelHandler
在 Netty 中,ChannelHandler
是处理 IO 事件的最基本组件之一。ChannelHandler
位于 Netty 的核心位置,并负责接收入站事件 Inbound 和转发出站事件 Outbound。
具体而言,ChannelHandler
主要有两个作用:
- 处理各种类型的 IO 事件,包括连接建立、连接关闭、数据读写等。
- 实现业务逻辑,对网络数据进行处理,例如编解码、协议解析、消息过滤、消息转发等。
Inbound
Inbound 是一种 ChannelHandler 的类型,它主要用于处理从网络接收到的数据。具体来说,当数据到达 Netty 应用程序的网络层时,Inbound 处理程序会被触发并开始处理这些数据。
Inbound 处理程序通常会执行以下操作:
- 解码:将二进制数据转换为 Java 对象。
- 验证:确保数据格式正确以及发送方有权进行操作。
- 处理:执行实际的业务逻辑,可能包括修改状态、创建响应等。
- 转发:将处理后的数据传递给下一个处理程序或写回到网络中。
在处理完所有 Inbound 处理程序之后,Netty 应用程序通常会将处理结果传递给 Outbound 处理程序,让其对数据进行编码、加密等操作,并发送回网络。
举个例子,接下来将使用三个 Inbound,第一个 handler 用于接收 name
属性,第二个 handler 用于生成 Person
类,第三个 handler 返回该结果,代码如下:
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("Inbound1, value: {}, class: {}", msg, msg.getClass());
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(Charset.defaultCharset());
super.channelRead(ctx, name);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("Inbound2, value: {}, class: {}", name, name.getClass());
Person person = new Person(name.toString());
super.channelRead(ctx, person);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("Inbound3, value: {}, class: {}", msg, msg.getClass());
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
运行结果:
17:48:00 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound1, value: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 1024), class: class io.netty.buffer.PooledUnsafeDirectByteBuf
17:48:00 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound2, value: sidiot, class: class java.lang.String
17:48:00 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound3, value: TestPipeline1.Person(name=sidiot), class: class com.sidiot.netty.c3.TestPipeline1$Person
在上述代码中,super.channelRead(ctx, msg)
方法用于将接收到的消息传递给下一个 ChannelInboundHandler 处理器进行处理,实现了消息在处理链条中的流转。
super.channelRead(ctx, msg)
源码如下:
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
通过 super.channelRead(ctx, msg)
的源码可以获知,在该方法中,通过调用 ctx.fireChannelRead(msg)
将数据传递给下一个 ChannelInboundHandler,从而实现事件的传播。
而 fireChannelRead
是在 ChannelHandlerContext 接口中定义的,默认实现是在当前 ChannelHandlerContext 中查找与 MASK_CHANNEL_READ
相应类型的 ChannelInboundHandler,并将数据传递给它的 channelRead
方法。这个方法返回当前的 ChannelHandlerContext 对象,可以链式调用其他方法。其中,invokeChannelRead()
方法在博文 从0到1(七):入门-EventLoop 进行过详细讲解。
Outbound
Outbound 是一种 ChannelHandler 的类型,它主要用于处理将数据发送到网络的操作。具体来说,当应用程序需要向网络发送数据时,会触发 Outbound 处理程序,并让其对数据进行编码、加密等处理后再发送出去。
Outbound 处理程序通常会执行以下操作:
- 编码:将 Java 对象转换为二进制数据。
- 加密:对数据进行加密以保证安全性。
- 写入:将处理后的数据写入网络中发送出去。
在处理完所有 Outbound 处理程序之后,Netty 应用程序通常会将数据传递给底层的传输层(如 TCP)并发送到远程端点。
这里需要注意的是,socketChannel.writeAndFlush()
和 ctx.writeAndFlush()
这两个方法,两个方法的作用都是写入数据并刷新缓冲区,但还是有所不同的,接下来通过实例进行讲解,为了效果更加明显,将原先代码中的 h3 与 h4 互换:
pipeline.addLast("h3", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("Outbound4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h4", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("Inbound3, value: {}, class: {}", msg, msg.getClass());
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
使用 socketChannel.writeAndFlush()
的运行结果:
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound1, value: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 1024), class: class io.netty.buffer.PooledUnsafeDirectByteBuf
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound2, value: sidiot, class: class java.lang.String
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound3, value: TestPipeline1.Person(name=sidiot), class: class com.sidiot.netty.c3.TestPipeline1$Person
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Outbound6
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Outbound5
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Outbound4
使用 ctx.writeAndFlush()
的运行结果:
23:42:34 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound1, value: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 1024), class: class io.netty.buffer.PooledUnsafeDirectByteBuf
23:42:34 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound2, value: sidiot, class: class java.lang.String
23:42:34 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound3, value: TestPipeline1.Person(name=sidiot), class: class com.sidiot.netty.c3.TestPipeline1$Person
23:42:34 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Outbound4
可以发现,socketChannel.writeAndFlush()
的运行结果包含了三个 Outbound,但是 ctx.writeAndFlush()
的运行结果只有一个 Outbound4,这是为什么呢?接下来我们通过源码进一步分析。
socketChannel.writeAndFlush()
的源码如下:
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
ctx.writeAndFlush()
的源码如下:
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
从源码中可以看出,socketChannel.writeAndFlush()
是从尾部开始向前寻找 Outbound,而 ctx.writeAndFlush()
则是从当前位置开始向前寻找 Outbound。
因此,socketChannel.writeAndFlush()
的运行结果包含了三个 Outbound,而 ctx.writeAndFlush()
的运行结果只有一个 Outbound4。
EmbeddedChannel
EmbeddedChannel
是 Netty 提供的工具类,用于在单元测试中模拟 Netty Channel 的行为。它可以被用于测试 ChannelHandler、ChannelPipeline 等模块。
通常来说,在使用 Netty 进行网络编程的时候,我们需要连接远程服务器或者监听本地端口以接收请求。这个过程需要真实的网络环境,即需要实际建立连接和发送数据,这样会增加测试的复杂性和不稳定性。而使用 EmbeddedChannel
,我们可以通过将 ChannelHandler 添加到 EmbeddedChannel
对象上,来模拟整个请求/响应的流程,从而达到测试的目的,避免了对网络环境的依赖。
另外,使用 EmbeddedChannel
还可以轻松测试多个 ChannelHandler 之间的协作情况。例如,在进行 WebSocket 消息处理的时候,我们可能需要多个 ChannelHandler 协同工作才能完成消息的解析和转发,此时使用 EmbeddedChannel
就非常方便。
完整代码:
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
super.write(ctx, msg, promise);
}
};
// 用于测试Handler的Channel
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 执行Inbound操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 执行Outbound操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
}
}
运行结果:
1
2
4
3
后记
以上就是 从0到1(十):入门-Pipeline 与 ChannelHandler 的所有内容了,希望本篇博文对大家有所帮助!
参考:
📝 上篇精讲:从0到1(九):入门-Future 与 Promise
💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;
👍 创作不易,请多多支持;
🔥 系列专栏:Netty
转载自:https://juejin.cn/post/7240116604692824120