likes
comments
collection
share

【Netty】从0到1(十):入门-Pipeline 与 ChannelHandler

作者站长头像
站长
· 阅读数 60

前言

本篇博文是《从0到1学习 Netty》系列的第十篇博文,主要内容是介绍 Netty 中 Pipeline 与 ChannelHandler 的概念和作用,通过源码分析和应用案例进行详细讲解,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

Pipeline

在 Netty 中,pipeline 是一种机制,它由一系列的 ChannelHandler 组成。pipeline 负责处理进入或离开 Channel 的数据,并且将事件(比如连接建立、数据读取等)转发给正确的 handler 进行处理。

handlerpipeline 的节点,每个 handler 会接收来自前一个 handler 的处理结果,并进行自己的处理。然后,它将处理结果传递给下一个 handler,直到最终达到 pipeline 的尾部。pipeline 的头部和尾部都是特殊的 handler,头部负责处理 Inbound 操作,尾部则负责处理 Outbound 操作。

【Netty】从0到1(十):入门-Pipeline 与 ChannelHandler


接下来进行具体操作:

1、通过 channel 获取 pipeline

ChannelPipeline pipeline = ch.pipeline();

2、添加处理器:

pipeline.addLast(){
    ...
};

3、服务端代码如下,完整代码见 Github

.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("Inbound1");
                super.channelRead(ctx, msg);
            }
        });
        // h2
        // h3
        ...
        
        pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("Outbound4");
                super.write(ctx, msg, promise);
            }
        });
        // h5
        // h6
        ...
    }
})

4、客户端完整代码在博文 从0到1(八):入门-ChannelFuture 与 CloseFuture 中有详细讲解:

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress(7999));

        Channel channel = channelFuture.sync().channel();
        log.debug(channel.toString());

        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("quit".equals(line)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();

        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("Waiting Close...");
        closeFuture.addListener((ChannelFutureListener) future -> {
            log.debug("处理 channel 关闭之后的操作");
            group.shutdownGracefully();
        });

    }
}

5、服务端运行结果:

15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Inbound1
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Inbound2
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Inbound3
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Outbound6
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Outbound5
15:46:19 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline - Outbound4

6、客户端运行结果:

15:46:03 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96] REGISTERED
15:46:03 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96] CONNECT: 0.0.0.0/0.0.0.0:7999
15:46:03 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] ACTIVE
15:46:03 [DEBUG] [main] c.s.n.c.CloseFutureClient - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999]
Waiting Close...
sidiot
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] FLUSH
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] WRITE: 6B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74                               |sidiot          |
+--------+-------------------------------------------------+----------------+
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] FLUSH
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] READ: 9B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 65 72 76 65 72 2e 2e 2e                      |server...       |
+--------+-------------------------------------------------+----------------+
15:46:19 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3ff0fb96, L:/169.254.80.84:57747 - R:IDIOT/169.254.80.84:7999] READ COMPLETE

看到服务端的运行结果,可能有小伙伴会感到疑惑,按照 handler 的添加顺序,运行结果不应该是 In1 -> In2 -> In3 -> Out4 -> Out5 -> Out6 吗?

这是因为,当 Inbound 操作发生时,Pipeline 会从头部开始向后调用 Handler,直到找到能够处理该操作的 Handler。一旦找到,该 Handler 将处理数据并将其传递给下一个 Handler,直到达到尾部为止。

【Netty】从0到1(十):入门-Pipeline 与 ChannelHandler

同样地,当 Outbound 操作发生时,Pipeline 会从尾部开始向前调用 Handler,直到找到能够处理该操作的 Handler。一旦找到,该 Handler 将处理数据并将其传递给上一个 Handler,直到达到头部为止。

【Netty】从0到1(十):入门-Pipeline 与 ChannelHandler

ChannelHandler

在 Netty 中,ChannelHandler 是处理 IO 事件的最基本组件之一。ChannelHandler 位于 Netty 的核心位置,并负责接收入站事件 Inbound 和转发出站事件 Outbound

具体而言,ChannelHandler 主要有两个作用:

  1. 处理各种类型的 IO 事件,包括连接建立、连接关闭、数据读写等。
  2. 实现业务逻辑,对网络数据进行处理,例如编解码、协议解析、消息过滤、消息转发等。

Inbound

Inbound 是一种 ChannelHandler 的类型,它主要用于处理从网络接收到的数据。具体来说,当数据到达 Netty 应用程序的网络层时,Inbound 处理程序会被触发并开始处理这些数据。

Inbound 处理程序通常会执行以下操作:

  1. 解码:将二进制数据转换为 Java 对象。
  2. 验证:确保数据格式正确以及发送方有权进行操作。
  3. 处理:执行实际的业务逻辑,可能包括修改状态、创建响应等。
  4. 转发:将处理后的数据传递给下一个处理程序或写回到网络中。

在处理完所有 Inbound 处理程序之后,Netty 应用程序通常会将处理结果传递给 Outbound 处理程序,让其对数据进行编码、加密等操作,并发送回网络。

举个例子,接下来将使用三个 Inbound,第一个 handler 用于接收 name 属性,第二个 handler 用于生成 Person 类,第三个 handler 返回该结果,代码如下:

pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("Inbound1, value: {}, class: {}", msg, msg.getClass());
        ByteBuf buf = (ByteBuf) msg;
        String name = buf.toString(Charset.defaultCharset());
        super.channelRead(ctx, name);
    }
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
        log.debug("Inbound2, value: {}, class: {}", name, name.getClass());
        Person person = new Person(name.toString());
        super.channelRead(ctx, person);
    }
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("Inbound3, value: {}, class: {}", msg, msg.getClass());
        ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
    }
});

运行结果:

17:48:00 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound1, value: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 1024), class: class io.netty.buffer.PooledUnsafeDirectByteBuf
17:48:00 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound2, value: sidiot, class: class java.lang.String
17:48:00 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound3, value: TestPipeline1.Person(name=sidiot), class: class com.sidiot.netty.c3.TestPipeline1$Person

在上述代码中,super.channelRead(ctx, msg) 方法用于将接收到的消息传递给下一个 ChannelInboundHandler 处理器进行处理,实现了消息在处理链条中的流转。

super.channelRead(ctx, msg) 源码如下:

@Skip  
@Override  
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
    ctx.fireChannelRead(msg);  
}

@Override  
public ChannelHandlerContext fireChannelRead(final Object msg) {  
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);  
    return this;  
}

通过 super.channelRead(ctx, msg) 的源码可以获知,在该方法中,通过调用 ctx.fireChannelRead(msg) 将数据传递给下一个 ChannelInboundHandler,从而实现事件的传播。

fireChannelRead 是在 ChannelHandlerContext 接口中定义的,默认实现是在当前 ChannelHandlerContext 中查找与 MASK_CHANNEL_READ 相应类型的 ChannelInboundHandler,并将数据传递给它的 channelRead 方法。这个方法返回当前的 ChannelHandlerContext 对象,可以链式调用其他方法。其中,invokeChannelRead() 方法在博文 从0到1(七):入门-EventLoop 进行过详细讲解。

Outbound

Outbound 是一种 ChannelHandler 的类型,它主要用于处理将数据发送到网络的操作。具体来说,当应用程序需要向网络发送数据时,会触发 Outbound 处理程序,并让其对数据进行编码、加密等处理后再发送出去。

Outbound 处理程序通常会执行以下操作:

  1. 编码:将 Java 对象转换为二进制数据。
  2. 加密:对数据进行加密以保证安全性。
  3. 写入:将处理后的数据写入网络中发送出去。

在处理完所有 Outbound 处理程序之后,Netty 应用程序通常会将数据传递给底层的传输层(如 TCP)并发送到远程端点。

这里需要注意的是,socketChannel.writeAndFlush()ctx.writeAndFlush() 这两个方法,两个方法的作用都是写入数据并刷新缓冲区,但还是有所不同的,接下来通过实例进行讲解,为了效果更加明显,将原先代码中的 h3 与 h4 互换:

pipeline.addLast("h3", new ChannelOutboundHandlerAdapter() {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("Outbound4");
        super.write(ctx, msg, promise);
    }
});
pipeline.addLast("h4", new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("Inbound3, value: {}, class: {}", msg, msg.getClass());
        ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
    }
});

使用 socketChannel.writeAndFlush() 的运行结果:

23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound1, value: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 1024), class: class io.netty.buffer.PooledUnsafeDirectByteBuf
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound2, value: sidiot, class: class java.lang.String
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound3, value: TestPipeline1.Person(name=sidiot), class: class com.sidiot.netty.c3.TestPipeline1$Person
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Outbound6
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Outbound5
23:22:17 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Outbound4

使用 ctx.writeAndFlush() 的运行结果:

23:42:34 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound1, value: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 1024), class: class io.netty.buffer.PooledUnsafeDirectByteBuf
23:42:34 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound2, value: sidiot, class: class java.lang.String
23:42:34 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Inbound3, value: TestPipeline1.Person(name=sidiot), class: class com.sidiot.netty.c3.TestPipeline1$Person
23:42:34 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.TestPipeline1 - Outbound4

可以发现,socketChannel.writeAndFlush() 的运行结果包含了三个 Outbound,但是 ctx.writeAndFlush() 的运行结果只有一个 Outbound4,这是为什么呢?接下来我们通过源码进一步分析。

socketChannel.writeAndFlush() 的源码如下:

@Override  
public ChannelFuture writeAndFlush(Object msg) {  
    return pipeline.writeAndFlush(msg);  
}

@Override  
public final ChannelFuture writeAndFlush(Object msg) {  
    return tail.writeAndFlush(msg);  
}

ctx.writeAndFlush() 的源码如下:

@Override  
public ChannelFuture writeAndFlush(Object msg) {  
    return writeAndFlush(msg, newPromise());  
}

从源码中可以看出,socketChannel.writeAndFlush() 是从尾部开始向前寻找 Outbound,而 ctx.writeAndFlush() 则是从当前位置开始向前寻找 Outbound。

【Netty】从0到1(十):入门-Pipeline 与 ChannelHandler

因此,socketChannel.writeAndFlush() 的运行结果包含了三个 Outbound,而 ctx.writeAndFlush() 的运行结果只有一个 Outbound4。

EmbeddedChannel

EmbeddedChannel 是 Netty 提供的工具类,用于在单元测试中模拟 Netty Channel 的行为。它可以被用于测试 ChannelHandler、ChannelPipeline 等模块。

通常来说,在使用 Netty 进行网络编程的时候,我们需要连接远程服务器或者监听本地端口以接收请求。这个过程需要真实的网络环境,即需要实际建立连接和发送数据,这样会增加测试的复杂性和不稳定性。而使用 EmbeddedChannel,我们可以通过将 ChannelHandler 添加到 EmbeddedChannel 对象上,来模拟整个请求/响应的流程,从而达到测试的目的,避免了对网络环境的依赖。

另外,使用 EmbeddedChannel 还可以轻松测试多个 ChannelHandler 之间的协作情况。例如,在进行 WebSocket 消息处理的时候,我们可能需要多个 ChannelHandler 协同工作才能完成消息的解析和转发,此时使用 EmbeddedChannel 就非常方便。

完整代码:

public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };

        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };

        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };

        // 用于测试Handler的Channel
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);

        // 执行Inbound操作
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));

        // 执行Outbound操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
    }
}

运行结果:

1
2
4
3

后记

以上就是 从0到1(十):入门-Pipeline 与 ChannelHandler 的所有内容了,希望本篇博文对大家有所帮助!

参考:

📝 上篇精讲:从0到1(九):入门-Future 与 Promise

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多支持;

🔥 系列专栏:Netty

转载自:https://juejin.cn/post/7240116604692824120
评论
请登录