likes
comments
collection
share

(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!

作者站长头像
站长
· 阅读数 13

引言

   现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty恰恰是Java网络编程领域的无冕之王。Netty这个框架相信大家定然听说过,其在Java网络编程中的地位,好比JavaEE中的Spring

   当然,这样去聊它大家可能无法实际感受出它的重要性,那先来看看基于Netty构建的应用: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 观察上述列出的开源组件,一眼望去几乎全是各个领域中大名鼎鼎的框架,而这些组件都是基于Netty构建的,涵盖中间件、大数据、离线计算、分布式、RPC、No-SQL等各个方向....,很明显的可感知出Netty的地位之高,因此如果要打造一款Java高性能的网络通信程序、想要真正熟知分布式架构的底层原理,Netty成为了每个Java开发进阶必须要掌握的核心技术之一。

Netty的重要性不言而知,但网上相关的大部分视频、文章、书籍等资料却五花八门,很难真正帮助大家构建出一套完整的体系,本文的目的就是带诸位走入基于Netty的网络世界,在真正意义上为诸君构建一套Netty的知识储备。

一、初识Netty的基础概念与快速入门

(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 注意看:上图中右边这位黑眼圈堪比熊猫眼的哥们,从他头顶的发量就能明显感受出其技术强度,他!!!旁边的这位才是Netty框架的原作者Trustin Lee(韩国人),同时他也是另一个著名网络框架Mina的核心主程之一,现任职于Apple苹果集团......,不过多介绍作者了,总之是一位网络方面的大牛。

   重点来聊聊我们的主角:Netty框架,其实这个框架是基于Java原生NIO技术的进一步封装,在其中对Java-NIO技术做了进一步增强,作者充分结合了Reactor线程模型,将Netty变为了一个基于异步事件驱动的网络框架,Netty从诞生至今共发布了五个大版本,但目前最常用的反而并非是最新的5.x系列,而是4.x系列的版本,原因在于Netty本身就是基于Java-NIO封装的,而JDK本身又很稳定,再加上5.x版本并未有太大的性能差异,因此4.x系列才是主流。

   再回过头来思考一个问题:为什么Netty要二次封装原生NIO呢?相信看过NIO源码的小伙伴都清楚,原生的NIO设计的特别繁琐,而且还存在一系列安全隐患,因此Netty则是抱着简化NIO、解决隐患、提升性能等目的而研发的。

不过有意思的一点在于:Netty虽然是基于Java-NIO封装的框架,但实际使用起来却跟之前聊到的Java-AIO(NIO2)技术有些相似。

1.1、Netty的入门实例

   上面扯了不少Netty的概念,现在就直接先实操一番快速入门,毕竟编程讲究施展出真理,首先第一步则是添加对应的依赖,如下:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.43.Final</version>
</dependency>

然后先创建NettyServer服务端,代码如下:

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        // 创建两个EventLoopGroup,boss:处理连接事件,worker处理I/O事件
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        // 创建一个ServerBootstrap服务端(同之前的ServerSocket类似)
        ServerBootstrap server = new ServerBootstrap();
        try {
            // 将前面创建的两个EventLoopGroup绑定在server上
            server.group(boss,worker)
                    // 指定服务端的通道为Nio类型
                    .channel(NioServerSocketChannel.class)
                    // 为到来的客户端Socket添加处理器
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        // 这个只会执行一次(主要是用于添加更多的处理器)
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            // 添加一个字符解码处理器:对客户端的数据解码
                            ch.pipeline().addLast(
                                new StringDecoder(CharsetUtil.UTF_8));
                            // 添加一个入站处理器,对收到的数据进行处理
                            ch.pipeline().addLast(
                                new SimpleChannelInboundHandler<String>() {
                                // 读取事件的回调方法
                                @Override
                                protected void channelRead0(ChannelHandlerContext 
                                    ctx,String msg) {
                                    System.out.println("收到客户端信息:" + msg);
                                }
                            });
                        }
                    });
            // 为当前服务端绑定IP与端口地址(sync是同步阻塞至连接成功为止)
            ChannelFuture cf = server.bind("127.0.0.1",8888).sync();
            // 关闭服务端的方法(之后不会在这里关闭)
            cf.channel().closeFuture().sync();
        }finally {
            // 优雅停止之前创建的两个Group
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

紧接着再构建一个NettyClient客户端,代码如下:

public class NettyClient {
    public static void main(String[] args) {
        // 由于无需处理连接事件,所以只需要创建一个EventLoopGroup
        EventLoopGroup worker = new NioEventLoopGroup();
        // 创建一个客户端(同之前的Socket、SocketChannel)
        Bootstrap client = new Bootstrap();
        try {
            client.group(worker)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc)
                            throws Exception {
                            // 添加一个编码处理器,对数据编码为UTF-8格式
                            sc.pipeline().addLast(new
                                StringEncoder(CharsetUtil.UTF_8));
                        }
                    });
            // 与指定的地址建立连接
            ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
            // 建立连接成功后,向服务端发送数据
            System.out.println("正在向服务端发送信息......");
            cf.channel().writeAndFlush("我是<竹子爱熊猫>!");
        } catch (Exception e){
          e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
        }
    }
}

先看运行结果吧,控制台输出如下:

NettyServer控制台输出:
    收到客户端信息:我是<竹子爱熊猫>!

NettyClient控制台输出:
    正在向服务端发送信息......

从结果中很容易看出这个案例中做了什么事情,其实无非就是利用Netty实现了简单的对端通信,实现的功能很简单,但对于未学习过Netty技术的小伙伴,在代码方面估计有些许懵,那么接下来简单的解释一下代码。

但在此之前先声明一点:Netty是支持链式编程的一个框架,也就是如上述中的代码调用,所有的方法都可以一直用.连下去,所以在Netty的应用中会见到大量的这类写法。

  • EventLoopGroup:可以理解成之前的Selector选择器,但结合了线程池(后续详细分析)。
  • ServerBootstrap/Bootstrap:类似于之前的ServerSocketChannel/SocketChannel
  • childHandler:这个是新概念,可以理解成过滤器,在之前的Servlet编程中,新请求到来都会经过一个个的过滤器,而这个处理器也类似于之前的过滤器,新连接到来时,也会经过添加好的一系列处理器。

OK~,对于上述几个新概念有了简单认知后,接着把上面案例的完整流程分析一下:

  • ①先创建两个EventLoopGroup事件组,然后创建一个ServerBootstrap服务端。
  • ②将创建的两个事件组boss、worker绑定在服务端上,并指定服务端通道为NIO类型。
  • ③在server上添加处理器,对新到来的Socket连接进行处理,在这里主要分为两类:
    • ChannelInitializer:连接到来时执行,主要是用于添加更多的处理器(只触发一次)。
    • addLast():通过该方式添加的处理器不会立马执行,而是根据处理器类型择机执行。
  • ④为创建好的服务端绑定IP及端口号,调用sync()意思是阻塞至绑定成功为止。
  • ⑤再创建一个EventLoopGroup事件组,并创建一个Bootstrap客户端。
  • ⑥将事件组绑定在客户端上,由于无需处理连接事件,所以只需要一个事件组。
  • ⑦指定Channel通道类型为NIO、添加处理器.....(同服务端类似)
  • ⑧与前面服务端绑定的地址建立连接,由于默认是异步的,也要调用sync()阻塞。
  • ⑨建立连接后,客户端将数据写入到通道准备发送,首先会先经过添加好的编码处理器,将数据的格式设为UTF-8
  • ⑩服务器收到数据后,会先经过解码处理器,然后再去到入站处理,执行对应的Read()方法逻辑。
  • ⑪客户端完成数据发送后,先关闭通道,再优雅关闭创建好的事件组。
  • ⑫同理,服务端工作完成后,先关闭通道再停止事件组。

结合上述的流程,再去看一遍给出的案例源码,相信诸位应该可以彻底理解。不过需要注意的一点是:Netty的大部分操作都是异步的,比如地址绑定、客户端连接等。好比调用connect()方法与服务端建立连接时,主线程会把这个工作交给事件组中的线程去完成,所以此刻如果主线程直接去向通道中写入数据,有几率会出现报错,因为实际生产环境中,可能由于网络延迟导致连接建立的时间有些长,此时通道并未建立成功,因此尝试发送数据时就会有问题,这点与之前的Java-AIO通信案例中,客户端建立连接要调用.get()方法是同理。

到这里,你对Netty框架已经入门了,接着咱们重点聊聊Netty中的一些核心组件。

二、Netty框架核心组件:启动器与事件组

   对于Netty有了基本的认知后,接下来慢慢的熟悉这个框架吧,先依次来看看其中的一些核心组件,了解这些组件及作用后,才能真正意义上的“玩转Netty”。

2.1、启动器-ServerBootstrap、Bootstrap

   ServerBootstrap、Bootstrap这两个组件应该无需过多解释,上个表格对比大家就理解了:

对比项服务端客户端
BIOServerSocketSocket
NIOServerSocketChannelSocketChannel
AIOAsynchronousServerSocketChannelAsynchronousSocketChannel
NettyServerBootstrapBootstrap

从上表中能明显感觉出它俩在Netty中的作用,无非就是服务端与客户端换了个叫法而已。

2.2、事件组-EventLoopGroup、EventLoop

   这两个东西比较重要,但同时也比较抽象,EventLoop这东西翻译过来就是事件循环的意思,你可以把它理解成NIO中的Selector选择器,实际它本质上就是这玩意儿,因为内部会维护一个Selector,然后由一条线程会循环处理Channel通道上发生的所有事件,所以每个EventLoop对象都可以看成一个单线程执行器。

   EventLoopGroup可以将其理解成AIO中的AsynchronousChannelGroup可能会更合适,在AIOACG(前面那玩意儿的缩写)中,我们需要手动指定一个线程池,然后AIO的所有客户端工作都会使用线程池中的线程进行管理,而Netty中的EventLoopGroup就类似于AIO-ACG这玩意儿,只不过不需要我们管理线程池了,而是Netty内部维护。

EventLoopGroup、EventLoop有了基本认知后,你再点进它们的源码实现,其实能够观测到:其实它们继承了两个类,一个是Netty自己实现的有序线程池OrderedEventExecutor类,另一个则JDK提供的原生定时调度线程池ScheduledExecutorService类(源码篇会详细分析,这里先简单了解)。

public static void main(String[] args) {
    EventLoopGroup threadPool = new NioEventLoopGroup();
    // 递交Runnable类型的普通异步任务
    threadPool.execute(()->{
        System.out.println("execute()方法提交的任务....");
    });
    // 递交Callable类型的有返回异步任务
    threadPool.submit(() -> {
        System.out.println("submit()方法提交的任务....");
        return "我是执行结果噢!";
    });
    // 递交Callable类型的延时调度任务
    threadPool.schedule(()->{
        System.out.println("schedule()方法提交的任务,三秒后执行....");
        return "调度执行后我会返回噢!";
    },3,TimeUnit.SECONDS);
    // 递交Runnable类型的延迟间隔调度任务
    threadPool.scheduleAtFixedRate(()->{
        System.out.println("scheduleAtFixedRate()方法提交的任务....");
    },3,1,TimeUnit.SECONDS);
}

/* ~~~~~~~~~~~~~~~~~~我是性感的分割线~~~~~~~~~~~~~~~~~~ */
执行结果如下:
    立即执行:
        execute()方法提交的任务....
        submit()方法提交的任务....
    
    延时三秒后执行:
        schedule()方法提交的任务....
        scheduleAtFixedRate()方法提交的任务....
    
    之后没间隔一秒执行:
        scheduleAtFixedRate()方法提交的任务....
        scheduleAtFixedRate()方法提交的任务....

上述我们创建了一个EventLoopGroup事件循环组,然后通过之前JDK线程池提供的一系列的提交任务的方法,向其递交了几个异步任务,然后运行该程序,答案显而易见,EventLoopGroup确实可以当做JDK原生线程池来使用。

当然,这些并非分析的重点,重点来看看EventLoopGroup如何在Netty中合理运用。

在了解它们的Netty用法之前,先来看看除原生线程池之外所提供的方法:

  • EventLoop.inEventLoop(Thread):判断一个线程是否属于当前EventLoop
  • EventLoop.parent():判断当前EventLoop属于哪一个事件循环组。
  • EventLoopGroup.next():获取当前事件组中的下一个EventLoop(线程)。

这些方法我们简单了解即可,因为大多数情况下在Netty源码中才会用到,暂且无需关注太多,我们先把目光移到前面给出的Netty使用案例中,还记得最开始定义的两个事件组吗?

EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();

为什么在服务端要定义两个组呢?一个难道不行吗?其实也是可以的,但定义两个组的好处在于:可以让Group中的每个EventLoop分工更加明确,不同的Group分别处理不同类型的事件,各司其职。

在前面案例中,为服务端绑定了两个事件循环组,也就代表着会根据ServerSocketChannel上触发的不同事件,将对应的工作分发到这两个Group中处理,其中boss主要负责客户端的连接事件,而worker大多数情况下负责处理客户端的IO读写事件。

当客户端的SocketChannel连接到来时,首先会将这个注册事件的工作交给boss处理,boss会调用worker.register()方法,将这条客户端连接注册到worker工作组中的一个EventLoop上。前面提到过:EventLoop内部会维护一个Selector选择器,因此实际上也就是将客户端通道注册到其内部中的选择器上。

注意:将一个Socket连接注册到一个EventLoop上之后,这个客户端连接则会和这个EventLoop绑定,以后这条通道上发生的所有事件,都会交由这个EventLoop处理。

到这里大家应该也理解了为何要拆出两个EventLoopGroup,主要目的就在于分工更为明细。当然,由于EventLoopGroup本质上可以理解成一个线程池,其中存在的线程资源自然是有限的,那此时如果到来的客户端连接大于线程数量怎么办呢?这是不影响的,因为Netty本身是基于Java-NIO封装的,而NIO底层又是基于多路复用模型实现的,天生就能实现一条线程管理多个连接的功能,所以就算连接数大于线程数,也完全可以Hold住。

OK~,除开可以根据事件类型划分Group之外,也可以根据为每个处理器划分不同的事件组,如下:

// 创建EventLoopGroup和JDK原生的线程池一样,可以指定线程数量
EventLoopGroup extra = new NioEventLoopGroup(2);
sc.pipeline().addLast(extra, new xxxChannelHandler());

这样做的好处在于什么呢?因为前面提到过:一个连接注册到EventLoop,之后所有的工作都会由这个EventLoop处理,而一个EventLoop又有可能同时管理多个连接,因此假设一条连接上的某个处理器,执行过程非常耗时,此时必然就会影响到这个EventLoop管理的其他连接,因此对于一些较为耗时的Handler,可以专门指派给一个额外的extra事件组处理,这样就不会影响到所管理的其他连接。

当然,这个功能其实也略微有些鸡肋,一般多个Handler之间都会存在耦合关系,下一个Handler需要依赖上一个Handler的处理结果执行,因此也很难拆出来单独放到另一个事件组中执行。

看到这里,相信你对于EventLoopGroup、EventLoop这两个组件应该有了基本认知,简单来说可以EventLoop理解成有一条线程专门维护的Selector选择器,而EventLoopGroup则可以理解成一个有序的定时调度线程池,负责管理所有的EventLoop。举个生活案例来加深印象:

现在有个工厂,其中分为了不同的片区,一个片区中有很多条流水线,由每个工人负责一部分流水线的作业。开始工作后,流水线的传输带会源源不断的将货物传递过来,这些货物最终会等待工人进行加工。

(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 在上述这个例子中,工厂就是ServerBootstrap服务端,而一个个片区就是不同的EventLoopGroup事件组,一条流水线则可以理解成一个SocketChannel客户端通道,而负责多条流水线的工人就是EventLoop单线程执行器,加工的动作其实就是处理通道上发生的事件。

大家可以将这个例子套进去想象一下,相信这会让你印象更加深刻。

三、Netty中的增强版通道(ChannelFuture)

   对于通道这个概念,相信诸位都不陌生,这也是Java-NIO、AIO中的核心组件之一,而在Netty中也对其做了增强和拓展。首先来看看通道类型,Netty根据不同的多路复用函数,分别拓展出了不同的通道类型: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!

  • NioServerSocketChannel:通用的NIO通道模型,也是Netty的默认通道。
  • EpollServerSocketChannel:对应Linux系统下的epoll多路复用函数。
  • KQueueServerSocketChannel:对应Mac系统下的kqueue多路复用函数。
  • OioServerSocketChannel:对应原本的BIO模型,用的较少,一般用原生的。

当然,对于客户端的通道也可以选择TCP、UDP...类型的,就不再介绍了,重点来看看Netty中是如何对于通道类做的增强。

其实在Netty中,主要结合了JDK提供的Future接口,对通道类做了进一步增强。

Bootstrap client = new Bootstrap();
client.connect("127.0.0.1", 8888);

但这个connect()连接方法,本质上是一个异步方法,返回的并不是Channel对象,而是一个ChannelFuture对象,如下:

public ChannelFuture connect(String inetHost, int inetPort);

也包括ServerBootstrap绑定地址的bind()也相同,返回的并非ServerChannel,也是一个ChannelFuture对象。这是因为在Netty的机制中,绑定/连接工作都是异步的,因此如果要用Netty创建一个客户端连接,为了确保连接建立成功后再操作,通常情况下都会再调用.sync()方法同步阻塞,直到连接建立成功后再使用通道写入数据,如下:

// 与服务端建立连接
ChannelFuture cf = client.connect("127.0.0.1", 8888);
// 同步阻塞至连接建立成功为止
cf.sync(); 
// 连接建立成功后再获取对应的Socket通道写入数据
cf.channel().writeAndFlush("...");

上述这种方式能够确保连接建立成功后再写数据,但既然Netty中的绑定、连接等这些操作都是异步的,有没有办法让整个过程都是异步的呢?

答案是当然有,如何操作呢?

我们可以向ChannelFuture中添加回调处理器,然后异步处理,如下:

ChannelFuture cf = client.connect("127.0.0.1", 8888);
cf.addListener((ChannelFutureListener) cfl -> {
    // 这里可以用cf,也可以用cfl,返回的都是同一个channel通道
    cf.channel().writeAndFlush("...");
});

当通过connect()方法与服务端建立连接时,Netty会将这个任务交给当前Bootstrap绑定的EventLoopGroup中的线程执行,因此建立连接的过程是异步的,所以会返还一个ChannelFuture对象给我们,而此时可以通过该对象的addListener()方法编写成功回调逻辑,当连接建立成功后,会由对应的线程来执行其中的代码,因此可以实现全过程的异步操作。

这样做,似乎的确实现了整个过程的异步,甚至关闭通道的过程也可以换成异步的,如下:

// 异步关闭Channel通道
ChannelFuture closeCF = cf.channel().closeFuture();
// 通道关闭后,添加对应的回调函数
closeCF.addListener((ChannelFutureListener) cfl -> {
    // 关闭前面创建的EventLoopGroup事件组,也可以在这里做其他善后工作
    worker.shutdownGracefully();
});

Netty中为何要将大量的操作都抽象成异步执行呢?这不是反而让逻辑更加复杂化吗?让发起连接、建立连接、发送数据、接收数据、关闭连接等一系列操作,全部交由调用的那条线程执行不可以吗?答案是可以的,但异步能在一定程度上提升性能,尤其是并发越高,带来的优势更为明显。

对于这段话大家估计会有疑惑,为什么能提升性能呢?下面举个例子理解。

3.1、为何Netty所有API都是异步式操作?

相信大家一定在生活中见过这样的场景:医院看病/体检、银行开户、政府办事、法院起诉、保险公司买保险等等,各类办理业务的地方,都会拿号办理,然后经过一个个的窗口办理不同的业务,那为什么要这么做呢?就拿常规的医院看病来说,为什么会分为如下步骤呢?

  • 导诊处:先说明大致情况,导诊人员根据你的病理,指导你挂什么科的号。
  • 挂号处:去到对应的病理科排队挂号(暂且不考虑缴费,假设网上缴挂号费)。
  • 诊断室:跟着挂的号找到对应的科室,医生根据你的情况进行诊断。
  • 化验处:从你身上提取一些标本,然后去到化验处等待化验结果。
  • 缴费处:医生根据化验结果分析病情,然后给出具体的治疗方案,让你来缴费。
  • 拿药/治疗处:交完相关的费用后,根据治疗方案进行拿药/治疗等处理措施。

有上述这些步骤实际上并不奇怪,问题是在于每个步骤都分为了专门的科室处理,因此以上述流程为例,至少需要有六个医生提供服务,那么为什么不专门由这六位医生专门提供全系列服务呢?如下: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 我们分析一下,假设此时每个步骤平均要五分钟,一个病人的完整流程下来就需要半小时,而下一批预约看病的其他病人,则需要等待半小时后才能被受理,而把这些步骤拆开之后再来看看: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 此时有六位医生各司其职,每位医生负责单一的工作,这样做的好处在于:每个挂号的病人只需要等待五分钟,就能够被受理,通过这种方式就将之前批次式看病,转变为了流水线式看病。

Netty框架中的异步处理方式,也具备异曲同工之妙,将API的操作从批处理转变成了流式处理。套入实际的业务中,也就是主线程(调用API的线程)无需等待操作完成后再执行,而是调用某个API后可继续往下执行,相较而言,在并发情况下能很大程度上提升程序性能。

但上述这个例子估计有些小伙伴照旧会犯迷糊,那接着再举个更加形象化的例子,好比快递小哥送货,如果以同步模式工作,将一个货物送达指定地点后,需要等待客户签收才能去送下个货物,这无疑会让下个客户等很久很久,并且也极其影响快递小哥的工作效率。

而采用异步模式工作,快递小哥将一个货物送达指定地点后,给对应客户发个信息后,就立马赶往下个客户的货物地点,前面的客户拿到货物后,再给快递小哥回个信息即可。在这种异步工作模式中,小哥无需在原地“阻塞”等待客户签收,只需要将手中一个个货物送达指定地点就行,这在很大程度上提升了整体工作效率,每个客户之间拿到货物的时间也大大缩短了,Netty框架中的异步思想也是同理。

3.2、ChannelFuture、Netty-Future、JDK-Future的关系

当大家试图翻阅ChannelFuture的实现时,会发现该类继承了Future接口:

public interface ChannelFuture extends Future<Void> {
    // 省略内部方法.....
}

但要注意,这个Future接口并非是JDK原生的Future接口,而是Netty框架中的Future接口:

package io.netty.util.concurrent;

public interface Future<V> extends java.util.concurrent.Future<V> {
    // 省略内部方法.....
}
方法名方法作用
isDone()判断当前异步任务是否结束
cancel()取消当前异步任务
isCancel()判断当前异步任务是否被取消
get()阻塞等待当前异步任务执行完成

JDK-Future接口中,想要获取一个异步任务的执行结果,此时只能调用get()方法,但该方法是一个阻塞方法,调用后会阻塞主线程直到任务结束为止,这显然依旧会导致异步变为同步执行,所以这种方式是一种“伪异步”,此时再来看看Netty-Future中增强的核心方法:

方法名方法作用
getNow()非阻塞式获取任务结果,任务未执行完成时返回null
sync()阻塞等待至异步任务执行结束,执行出错时会抛出异常
await()阻塞等待至异步任务执行结束,执行出错时不会抛出异常
isSuccess()判断任务是否执行成功,如果为true代表执行成功
cause()获取任务执行出错时的报错信息,如果执行未出错,则返回null
addLinstener()添加回调方法,异步任务执行完成后会主动执行回调方法中的代码

在原生JDK-Future的基础上,Netty-Future新增了一个异常检测机制,当异步任务执行出错时,可以通过cause()方法处理异常,同时也基于回调模式,可通过addLinstener()方法添加异步执行后的回调逻辑,从而让主线程创建任务后永远不会阻塞,做到了真正意义上的异步执行。

当然,除开基本的Future接口外,Netty框架中还有一个Promise接口,该接口继承自Netty-Future接口:

public interface Promise<V> extends Future<V> {
    // 省略内部方法.....
}

这个接口中主要多拓展了两个方法:

方法名方法作用
setSuccess()设置任务的执行状态为成功
setFailure()设置任务的执行状态为失败

这两个方法可以用来设置异步任务的执行状态,因此Promise接口除开具备Netty-Future的功能外,还能作为多个线程之间传递异步任务结果的容器。

3.3、不同Future的效果测试

public class FutureDemo {
    
    // 测试JDK-Future的方法
    public static void jdkFuture() throws Exception {
        System.out.println("--------JDK-Future测试--------");
        // 创建一个JDK线程池用于执行异步任务
        ExecutorService threadPool = Executors.newSingleThreadExecutor();

        System.out.println("主线程:步骤①");

        // 向线程池提交一个带有返回值的Callable任务
        java.util.concurrent.Future<String> task =
                threadPool.submit(() ->
                    "我是JDK-Future任务.....");
        // 输出获取到的任务执行结果(阻塞式获取)
        System.out.println(task.get());

        System.out.println("主线程:步骤②");
        // 关闭线程池
        threadPool.shutdownNow();
    }

    // 测试Netty-Future的方法
    public static void nettyFuture(){
        System.out.println("--------Netty-Future测试--------");
        // 创建一个Netty中的事件循环组(本质是线程池)
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        System.out.println("主线程:步骤①");

        // 向线程池中提交一个带有返回值的Callable任务
        io.netty.util.concurrent.Future<String> task =
                eventLoop.submit(() ->
                    "我是Netty-Future任务.....");

        // 添加一个异步任务执行完成之后的回调方法
        task.addListener(listenerTask ->
                System.out.println(listenerTask.getNow()));

        System.out.println("主线程:步骤②");
        // 关闭事件组(线程池)
        group.shutdownGracefully();
    }

    // 测试Netty-Promise的方法
    public static void nettyPromise() throws Exception {
        System.out.println("--------Netty-Promise测试--------");
        // 创建一个Netty中的事件循环组(本质是线程池)
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        // 主动创建一个传递异步任务结果的容器
        DefaultPromise<String> promise = new DefaultPromise<>(eventLoop);
        // 创建一条线程执行,往结果中添加数据
        new Thread(() -> {
            try {
                // 主动抛出一个异常
                int i = 100 / 0;
                // 如果异步任务执行成功,向容器中添加数据
                promise.setSuccess("我是Netty-Promise容器:执行成功!");
            }catch (Throwable throwable){
                // 如果任务执行失败,将异常信息放入容器中
                promise.setFailure(throwable);
            }
        }).start();
        // 输出容器中的任务结果
        System.out.println(promise.get());
    }

    public static void main(String[] args) throws Exception {
        jdkFuture();
        nettyFuture();
        nettyPromise();
    }
}

在上述的测试类中,存在三个测试方法:

  • jdkFuture():测试JDK-Future的方法。
  • nettyFuture():测试Netty-Future的方法。
  • nettyPromise():测试Netty-Promise的方法。

接着启动对应的类,来看看控制台的输出结果:

--------JDK-Future测试--------
主线程:步骤①
我是JDK-Future任务.....
主线程:步骤②

--------Netty-Future测试--------
主线程:步骤①
主线程:步骤②
我是Netty-Future任务.....

--------Netty-Promise测试--------
Exception in thread "main" java.util.concurrent.ExecutionException:
    java.lang.ArithmeticException: / by zero
    ........

首先来对比一下JDK-Future、Netty-Future两者之间的差别,在使用JDK-Future时,想要获取异步任务的执行结果,调用get()方法后会阻塞主线程,也就是主线程的步骤②,需要等到异步任务执行完成后才会继续执行,因此输出结果为:

--------JDK-Future测试--------
主线程:步骤①
我是JDK-Future任务.....
主线程:步骤②

但此时再来看看Netty-Future,因为在内部咱们提交异步任务后,就立即通过addListener()添加了一个回调,这个回调方法会在异步任务执行结束后调用,咱们将获取任务结果的工作,放入到了回调方法中完成,此时会观测到,获取Netty-Future的执行结果并不会阻塞主线程:

--------Netty-Future测试--------
主线程:步骤①
主线程:步骤②
我是Netty-Future任务.....

而对于Netty-Promise的使用就无需过多讲解,也就是可以根据异步任务的执行状态,向Promise对象中设置不同的结果,在前面的多线程中,由于主动制造了异常,所以最终会进入catch代码块,执行setFailure()向容器中填充异常信息。

四、核心组件 - 通道处理器(Handler)

   Handler可谓是整个Netty框架中最为重要的一部分,它的职责主要是用于处理Channel通道上的各种事件,所有的处理器都可被大体分为两类:

  • 入站处理器:一般都是ChannelInboundHandlerAdapter以及它的子类实现。
  • 出站处理器:一般都是ChannelOutboundHandlerAdapter以及它的子类实现。

在系统中网络操作都通常会分为入站和出站两种,所谓的入站即是指接收请求,反之,所谓的出站则是指返回响应,而Netty中的入站处理器,会在客户端消息到来时被触发,而出站处理器则会在服务端返回数据时被触发,接着来展开聊一聊。

4.1、入站处理器与出站处理器

前面讲明白了入站、出站的基本概念,接着来简单认识一下Netty中的入站处理器,这里先上个案例:

// 服务端
public class HandlerServer {
    public static void main(String[] args) {
        // 0.准备工作:创建一个事件循环组、一个ServerBootstrap服务端
        EventLoopGroup group = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();

        server
            // 1.绑定前面创建的事件循环组
            .group(group)
            // 2.声明通道类型为服务端NIO通道
            .channel(NioServerSocketChannel.class)
            // 3.通过ChannelInitializer完成通道的初始化工作
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nsc) throws Exception {
                    // 4.获取通道的ChannelPipeline处理器链表
                    ChannelPipeline pipeline = nsc.pipeline();
                    // 5.基于pipeline链表向通道上添加入站处理器
                    pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg)
                                                throws Exception {
                            System.out.println("俺是第一个入站处理器...");
                            super.channelRead(ctx, msg);
                        }
                    });
                    pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg)
                                                throws Exception {
                            System.out.println("我是第二个入站处理器...");
                            super.channelRead(ctx, msg);
                        }
                    });
                    pipeline.addLast("In-③",new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg)
                                                throws Exception {
                            System.out.println("朕是第三个入站处理器...");
                        }
                    });
                }
            })
            // 为当前启动的服务端绑定IP和端口地址
            .bind("127.0.0.1",8888);
    }
}

// 客户端
public class HandlerClient {
    public static void main(String[] args) {
        // 0.准备工作:创建一个事件循环组、一个Bootstrap启动器
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client
                // 1.绑定事件循环组
                .group(group)
                // 2.声明通道类型为NIO客户端通道
                .channel(NioSocketChannel.class)
                // 3.初始化通道,添加一个UTF-8的编码器
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc)
                            throws Exception {
                        // 添加一个编码处理器,对数据编码为UTF-8格式
                        ChannelPipeline pipeline = sc.pipeline();
                        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                    }
                });

            // 4.与指定的地址建立连接
            ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
            // 5.建立连接成功后,向服务端发送数据
            System.out.println("正在向服务端发送信息......");
            cf.channel().writeAndFlush("我是<竹子爱熊猫>!");
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 6.最后关闭事件循环组
            group.shutdownGracefully();
        }
    }
}

在上述案例的服务端代码中,启动服务端时为其添加了In-①、In-②、In-③这三个入站处理器,接着编写了一个客户端,其内部主要是向服务端发送了一条数据,运行结果如下:

俺是In-①入站处理器...
我是In-②入站处理器...
朕是In-③入站处理器...

此时大家观察结果会发现,入站处理器的执行顺序,会按照添加的顺序执行,两个过滤器之间,依靠super.channelRead(ctx, msg);这行代码来实现向下调用的逻辑,这和之前Servlet中的过滤器相差无几。

除开上述重写的channelRead()方法外,入站处理器中还有很多其他方法可以重写,每个方法都对应着一种事件,会在不同时机下被触发,如下:

// 会在当前Channel通道注册到选择器时触发(与EventLoop绑定时触发)
public void channelRegistered(ChannelHandlerContext ctx) ...
// 会在选择器移除当前Channel通道时触发(与EventLoop解除绑定时触发)
public void channelUnregistered(ChannelHandlerContext ctx) ...
// 会在通道准备就绪后触发(Pipeline处理器添加完成、绑定EventLoop后触发)
public void channelActive(ChannelHandlerContext ctx) ...
// 会在通道关闭时触发
public void channelInactive(ChannelHandlerContext ctx) ...
// 会在收到客户端数据时触发(每当有数据时都会调用该方法,表示有数据可读)
public void channelRead(ChannelHandlerContext ctx, Object msg) ...
// 会在一次数据读取完成后触发
public void channelReadComplete(ChannelHandlerContext ctx) ...
// 当通道上的某个事件被触发时,这个方法会被调用
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) ...
// 当通道的可写状态发生改变时被调用(一般在发送缓冲区超出限制时调用)
public void channelWritabilityChanged(ChannelHandlerContext ctx) ...
// 当通道在读取过程中抛出异常时,当前方法会被触发调用
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) ...

接着再来看看出站处理器,这回基于上述案例做些许改造即可,也就是再通过pipeline.addLast()方法多添加几个处理器,但处理器的类型为ChannelOutboundHandlerAdapter,如下:

// 基于pipeline链表向通道上添加出站处理器
pipeline.addLast("Out-A",new ChannelOutboundHandlerAdapter(){
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
            throws Exception {
        System.out.println("在下是Out-A出站处理器...");
        super.write(ctx, msg, promise);
    }
});
pipeline.addLast("Out-B",new ChannelOutboundHandlerAdapter(){
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
            throws Exception {
        System.out.println("鄙人是Out-B出站处理器...");
        super.write(ctx, msg, promise);
    }
});
pipeline.addLast("Out-C",new ChannelOutboundHandlerAdapter(){
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
            throws Exception {
        System.out.println("寡人是Out-C出站处理器...");
        super.write(ctx, msg, promise);
    }
});

根据原本入站处理器的执行逻辑,是不是理论上执行顺序为Out-A、Out-B、Out-C?先看运行结果: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 此时观察结果可明显看到,在通道上添加的出站处理器压根没被触发呀,这是为何呢?这要说回前面聊到的出站概念,出站是指响应过程,也意味着出站处理器是在服务端返回数据时被触发的,而案例中并未向客户端返回数据,显然就不会触发出站处理器,所以此时咱们在In-③入站处理器中,多加几行代码:

// 利用通道向客户端返回数据
ByteBuf resultMsg = ctx.channel().alloc().buffer();
resultMsg.writeBytes("111".getBytes());
nsc.writeAndFlush(resultMsg);

此时再运行案例,就会看到如下结果:

俺是In-①入站处理器...
我是In-②入站处理器...
朕是In-③入站处理器...
寡人是Out-C出站处理器...
鄙人是Out-B出站处理器...
在下是Out-A出站处理器...

此时注意看,结果和预料的不同,呈现的顺序并非Out-A、Out-B、Out-C,而是Out-C、Out-B、Out-A,这是啥原因呢?为什么与添加顺序反过来了?这其实跟pipeline处理器链表有关,等会儿再聊聊pipeline这个概念,先来看看出站处理器中的其他方法:

// 当通道调用bind()方法时触发(当Channel绑定端口地址时被调用,一般用于客户端通道)
public void bind(...) ...
// 当通道调用connect()方法,连接到远程节点/服务端时触发(一般也用于客户端通道)
public void connect(...) ...
// 当客户端通道调用disconnect()方法,与服务端断开连接时触发
public void disconnect(...) ...
// 当客户端通道调用close()方法,关闭连接时触发
public void close(...) ...
// 当通道与EventLoop解除绑定时触发
public void deregister(...) ...
// 当通道中读取多次数据时被调用触发
public void read(...) ...
// 当通道中写入数据时触发
public void write(...) ...
// 当通道中的数据被Flush给对端节点时调用
public void flush(...) ...

对于出站/入站处理器的这些其他方法/事件,大家可根据业务的不同,选择重写不同的方法,其中每个不同的方法,其触发时机也不同,因此可以在适当的位置重写方法,作为业务代码的切入点。

4.2、pipeline处理器链表

如果接触Netty框架的小伙伴应该对这玩意儿不陌生,如果没接触过也无关紧要,其实它也并非是特别难懂的概念,一个处理器被称为Handler,而一个Handler添加到一个通道上之后,则被称之为ChannelHandler,而一个通道上的所有ChannelHandler全部连接起来,则被称之为ChannelPipeline处理器链表。

以上述给出的案例来说,其内部形成的ChannelPipeline链表如下: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! pipeline本质上是一个双向链表,同时具备head、tail头尾节点,每当调用pipeline.addLast()方法添加一个处理器时,就会将处理器封装成一个节点,然后加入pipeline链表中:

  • 当接收到客户端的数据时,Netty会从Head节点开始依次往后执行所有入站处理器。
  • 而当服务端返回数据时,Netty会从Tail节点开始依次向前执行所有入站处理器。

理解上述过程后,大家应该就理解了之前出站处理器的执行顺序,为何是Out-C、Out-B、Out-A,因为出站处理器是以Tail尾节点开始,向前依次执行的原因造成的,那处理器的作用是干嘛的呢?举个例子大家就懂了。

这里假设Netty的服务端是一个饲料加工厂,客户端则是原料供应商,连接两者之间的通道就相当于一条条的流水线,而客户端发送的数据相当于原料。 在一条流水线上,玉米、豆粕、小麦....等原料不可能啥也不干,直接从头传到尾,如果原料想要加工成某款私聊,显然需要经过一道道工序,而处理器则是这一道道工序。 比如原料刚传进来时,首先要将其粉碎成颗粒,接着需要将其碾压成粉末,最后需要按照配方比例进行混合,才能形成按配方制成的饲料。在这个过程中,原料进入加工厂后,经过的一道道工序则可以被称为入站处理器。 而原料被加工成饲料后,想要对外出售,还需要先装入一个个的饲料袋,然后将饲料袋进行封口,最后印上生产日期与厂家,才能打包成最终的商用饲料对外出售。而该过程中的一道道工序,则可被理解成是一个个出站处理器。

在上述的例子中,一个加工厂的流水线上,存在着一道道工序,经过依次处理后,能够将原料加工成最终商品。Netty中亦是同理,对于客户端和服务端之间的数据,可以通过处理器,完成一系列核心处理,如转换编码格式、对数据进行序列化、对数据进行加/解密等操作。

4.3、自定义出/入站处理器

前面简单讲明白了一些关于Netty处理器的知识,但实际开发过程中,为了更好的代码阅读性,以及代码的维护性,通常pipeline.addLast并不会直接new接口,而是自己定义处理器类,然后继承对应的父类,如下:

// 自定义的入站处理器
public class ZhuziHandler extends ChannelInboundHandlerAdapter {
    public ZhuziHandler() {
        super();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 在这里面编写处理入站msg的核心代码.....
        // (如果要自定义msg的处理逻辑,请记住去掉下面这行代码)
        super.channelRead(ctx, msg);
    }
}

对于入站处理器而言,主要重写其channelRead()方法即可,该方法会在消息入站时被调用,可以在其中完成对数据的复杂处理,而自定义处理器完成后,想要让该处理器生效,请记得将其绑定到对应的通道上,如下:

pipeline.addLast("In-X", new ZhuziHandler());

与入站处理器相反的出站处理器亦是同理,只不过将父类实现换成ChannelInboundHandlerAdapter,并且重写其write()方法即可,这样所有消息(数据)出站时,都会调用该方法。

最后,不仅仅处理器可以单独抽出来实现,而且对于通道的初始化器,也可以单独抽出来实现,如下:

// 自定义的通道初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置编码器、解码器、处理器
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new ZhuziHandler());
    }
}

这样写能够让代码的整洁性更强,并且可以统一管理通道上的所有出/入站处理器,而服务端的代码改成下述方式即可:

server
    .group(group)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ServerInitializer())
    .bind("127.0.0.1",8888);

五、Netty重构后的缓冲区(ByteBuf)

  • ByteBufAllocator.DEFAULT.heapBuffer(cap):使用堆内存来创建ByteBuf对象。
  • ByteBufAllocator.DEFAULT.directBuffer(cap):使用本地内存来创建ByteBuf对象。

基于堆内存创建的ByteBuf对象会受到GC机制管理,在发生GC时需要来回移动Buffer对象,同时之前在NIO中也聊到过堆、本地内存的区别,如下: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 通常本地内存的读写效率都会比堆内存高,因为OS可以直接操作本地内存,而堆内存在读写数据时,则需要多出一步内存拷贝的动作,总结如下:

  • 堆内存因为直接受到JVM管理,所以在Java程序中创建时,分配效率较高,但读写效率低。
  • 本地内存因为OS可直接操作,所以读写效率高,但由于创建时,需要向OS额外申请,分配效率低。

但上述聊到的这些特征,NIOBuffer也具备,那Netty对于Buffer缓冲区到底增强了什么呢?主要是三方面:Buffer池化技术、动态扩容机制、零拷贝实现。

5.1、ByteBuf缓冲区池化技术

池化这个词汇大家应该都不陌生,Java线程池、数据库连接池,这些都是池化思想的产物,一般系统中较为珍贵的资源,都会采用池化技术来缓存,以便于下次需要时可直接使用,而无需经过繁琐的创建过程。

前面聊到过,Netty默认会采用本地内存创建ByteBuf对象,而本地内存因为不是操作系统分配给Java程序使用的,所以基于本地内存创建对象时,则需要额外单独向OS申请,这个过程自然开销较大,在高并发情况下,频繁的创建、销毁ByteBuf对象,一方面会导致性能降低,同时还有可能造成OOM的风险(使用完没及时释放,内存未归还给OS的情况下会出现内存溢出)。

而使用池化技术后,一方面能有效避免OOM问题产生,同时还可以省略等待创建缓冲区的时间,那Netty中的池化技术,什么时候会开启呢?这个要分平台!

  • Android系统默认会采用非池化技术,而其他系统,如Linux、Mac、Windows等会默认启用。

但上述这条原则是Netty4.1版本之后才加入的,因为4.1之前的版本,其内部的池化技术还不够完善,所以4.1之前的版本默认会禁用池化技术。当然,如果你在某些平台下想自行决定是否开启池化,可通过下述参数控制:

  • -Dio.netty.allocator.type=unpooled:关闭池化技术。
  • -Dio.netty.allocator.type=pooled:开启池化技术。

这两个参数直接通过JVM参数的形式,在启动Java程序时指定即可。如果你想要查看自己创建的ByteBuf对象,是否使用了池化技术,可直接打印对象的Class即可,如下:

// 查看创建的缓冲区是否使用了池化技术
private static void byteBufferIsPooled(){
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
    System.out.println(buffer.getClass());
}

public static void main(String[] args) {
    byteBufferIsPooled();
}

/* *
 * 输出结果:
 *    class io.netty.buffer.PooledUnsafeDirectByteBuf
 * */

从输出的结果中的类名可看出,如果是以Pooled开头的类名,则表示当前ByteBuf对象使用池化技术,如若是以Unpooled开头的类名,则表示未使用池化技术。

5.2、ByteBuf动态扩容机制

由于Java-NIO中的Buffer设计有些缺德,因此在使用NIO的原生Buffer对象时,就显得额外麻烦,必须要遵从如下步骤:

  • ①先创建对应类型的缓冲区
  • ②通过put这类方法往缓冲区中写入数据
  • ③调用flip()方法将缓冲区转换为读模式
  • ④通过get这类方法从缓冲区中读取数据
  • ⑤调用clear()、compact()方法清空缓冲区数据

而正是由于Java-NIO原生的Buffer设计的不合理,因此Netty中直接重构了整个缓冲区组件,在Netty-ByteBuf中,存在四个核心属性:

  • initialCapacity:初始容量,创建缓冲区时指定的容量大小,默认为256字节。
  • maxCapacity:最大容量,当初始容量不足以供给使用时,ByteBuf的最大扩容限制。
  • readerIndex:读取指针,默认为0,当读取一部分数据时,指针会随之移动。
  • writerIndex:写入指针,默认为0,当写入一部分数据时,指针会随之移动。

首先来说说和NIO-Buffer的两个主要区别:首先将原本一根指针变为了两根,分别对应读/写操作,这样就保障了使用ByteBuf时,无需每次读写数据时手动翻转模式。同时加入了一个最大容量限制,在创建的ByteBuf无法存下数据时,允许在最大容量的范围内,对ByteBuf进行自动扩容,下面上个图理解: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 上图中模拟了使用ByteBuf缓冲区的过程,在创建时会先分配一个初始容量,这个容量可以自己指定,不指定默认为256,接着会去创建出对应容量的缓冲区,最初读写指针都为0,后续会随着使用情况不断变化。

这里重点观察最后一个状态,在真正使用过程中,一个ByteBuf会被分为四个区域:

  • 已废弃区域:这是指已经被读取过的数据区域,因为其中的数据已被使用,所以属于废弃区域。
  • 可读取区域:这主要是指被写入过数据,但还未读取的区域,这块区域的数据都可被读取使用。
  • 可写入区域:这主要是指写入指针和容量之间的区域,意味着这块区域是可以被写入数据的。
  • 可扩容区域:这主要是指容量和最大容量之间的区域,代表当前缓冲区可扩容的范围。

ByteBuf的主要实现位于AbstractByteBuf这个子类中,但内部还有两根markedReaderIndex、markedWriterIndex标记指针,这两根指针就类似于NIO-Buffer中的mark指针,这里就不做重复赘述。下面上个案例简单实验一下BtyeBuf的自动扩容特性,代码如下:

// 测试Netty-ByteBuf自动扩容机制
private static void byteBufCapacityExpansion() {
    // 不指定默认容量大小为16
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
    System.out.println("测试前的Buffer容量:" + buffer);
    // 使用StringBuffer来测试ByteBuf的自动扩容特性
    StringBuffer sb = new StringBuffer();
    // 往StringBuffer中插入17个字节的数据
    for (int i = 0; i < 17; i++) {
        sb.append("6");
    }
    // 将17个字节大小的数据写入缓冲区
    buffer.writeBytes(sb.toString().getBytes());
    printBuffer(buffer);
}

在这个测试自动扩容的方法中,最后用到了一个printBuffer()方法来打印缓冲区,这是自定义的一个输出方法,也就基于Netty自身提供的Dump方法实现的,如下:

// 打印ByteBuf中数据的方法
private static void printBuffer(ByteBuf buffer) {
    // 读取ByteBuffer已使用的字节数
    int byteSize = buffer.readableBytes();
    // 基于byteSize来计算显示的行数
    int rows = byteSize / 16 + (byteSize % 15 == 0 ? 0 : 1) + 4;
    // 创建一个StringBuilder用来显示输出
    StringBuilder sb = new StringBuilder(rows * 80 * 2);
    // 获取缓冲区的容量、读/写指针信息放入StringBuilder
    sb.append("ByteBuf缓冲区信息:{");
    sb.append("读取指针=").append(buffer.readerIndex()).append(", ");
    sb.append("写入指针=").append(buffer.writerIndex()).append(", ");
    sb.append("容量大小=").append(buffer.capacity()).append("}");

    // 利用Netty框架自带的格式化方法、Dump方法输出缓冲区数据
    sb.append(StringUtil.NEWLINE);
    ByteBufUtil.appendPrettyHexDump(sb, buffer);
    System.out.println(sb.toString());
}

接着在main方法中调用并运行,如下:

public static void main(String[] args) {
    byteBufCapacityExpansion();
}

/* *  运行结果:
* 
* 测试前的Buffer容量:PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 16)
* ByteBuf缓冲区信息:{读取指针=0, 写入指针=17, 容量大小=64}
*          +-------------------------------------------------+
*          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 |6666666666666666|
* |00000010| 36                                              |6               |
* +--------+-------------------------------------------------+----------------+
* */

先来观察最初的容量:cap=16,因为这是咱们显示指定的初始容量,接着向该ByteBuf中插入17个字节数据后,会发现容量自动扩展到了64,但如果使用NIO-Buffer来进行这样的操作,则会抛出异常。同时最后还把缓冲区中具体的数据打印出来了,这个是利用Netty自带的appendPrettyHexDump()方法实现的,中间是字节值,后面是具体的值,这里就不做过多阐述~

5.3、 Netty中的读写API

首先在讲述Netty-ByteBuf的读写API之前,咱们再说清楚一点与NIO-Buffer的区别,不知大家是否还记得我在之前NIO中聊到的一点: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 其实这也是NIO-Buffer设计不合理的一个地方,当你想要向缓冲区中写入不同类型的数据,要么得自己手动转换成Byte字节类型,要么得new一个对应的子实现,所以整个实现就较为臃肿,大家可以点进Java.nio包看一下,你会看到下述场景: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 这里的类关系,大家一眼看过去明显会感觉头大,基本上实现都大致相同,但针对于每个数据类型,都编写了对应的实现类,而Netty的作者显然意识到了这点,因此并未提供多种数据类型的缓冲区,仅提供了ByteBuf这一种缓冲区,Why

其实道理十分简单,因为计算机上的所有数据资源,在底层本质上都是0、1形成的字节数据,所以只提供Byte类型的ByteBuf缓冲区就够了,毕竟它能够存储所有类型的数据,同时为了便于写入其他类型的数据,如Int、boolean、long....Netty框架中也对外提供了相关的写入API,接着一起来看看。

// Netty-ByteBuf抽象类
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
    // 写入boolean数据的方法,内部使用一个字节表示,0=false、1=true
    public abstract ByteBuf writeBoolean(boolean var1);
    // 写入字节数据的方法
    public abstract ByteBuf writeByte(int var1);
    // 大端写入Short数据的方法
    public abstract ByteBuf writeShort(int var1);
    // 小端写入Short数据的方法
    public abstract ByteBuf writeShortLE(int var1);
    
    // 下述方法和写Short类型的方法仅类型不同,都区分了大小端,不再重复注释
    public abstract ByteBuf writeMedium(int var1);
    public abstract ByteBuf writeMediumLE(int var1);
    public abstract ByteBuf writeInt(int var1);
    public abstract ByteBuf writeIntLE(int var1);
    public abstract ByteBuf writeLong(long var1);
    public abstract ByteBuf writeLongLE(long var1);
    public abstract ByteBuf writeChar(int var1);
    public abstract ByteBuf writeFloat(float var1);
    public ByteBuf writeFloatLE(float value) {
        return this.writeIntLE(Float.floatToRawIntBits(value));
    }
    public abstract ByteBuf writeDouble(double var1);
    public ByteBuf writeDoubleLE(double value) {
        return this.writeLongLE(Double.doubleToRawLongBits(value));
    }
    
    // 将另一个ByteBuf对象写入到当前缓冲区
    public abstract ByteBuf writeBytes(ByteBuf var1);
    // 将另一个ByteBuf对象的前N个长度的数据,写入到当前缓冲区
    public abstract ByteBuf writeBytes(ByteBuf var1, int var2);
    // 将另一个ByteBuf对象的指定范围数据,写入到当前缓冲区
    public abstract ByteBuf writeBytes(ByteBuf var1, int var2, int var3);
    // 向缓冲区中写入一个字节数组
    public abstract ByteBuf writeBytes(byte[] var1);
    // 向缓冲区中写入一个字节数组中,指定范围的数据
    public abstract ByteBuf writeBytes(byte[] var1, int var2, int var3);
    // 将一个NIO的ByteBuffer数据写入到当前ByteBuf对象
    public abstract ByteBuf writeBytes(ByteBuffer var1);
    // 将一个输入流中的数据写入到当前缓冲区
    public abstract int writeBytes(InputStream var1, int var2) 
                                                throws IOException;
    // 将一个NIO的ScatteringByteChannel通道中的数据写入当前缓冲区
    public abstract int writeBytes(ScatteringByteChannel var1, int var2)
                                                        throws IOException;
    // 将一个NIO的文件通道中的数据写入当前缓冲区
    public abstract int writeBytes(FileChannel var1, long var2, int var4)
                                                        throws IOException;
    // 将一个任意字符类型的数据写入缓冲区(CharSequence是所有字符类型的老大)
    public abstract int writeCharSequence(CharSequence var1, Charset var2);
    
    // 省略其他写入数据的API方法........
}

上面列出了Netty-ByteBuf中常用的写入方法,其实大家在这里就能明显观察出与NIO的区别,NIO是为不同数据类型提供了不同的实现类,而Netty则仅仅只是为不同类型,提供了不同的API方法,显然后者的做法更佳,因为整体的代码结构会更为优雅。

这里主要说一下大端写入和小端写入的区别,从前面的API列表中,大家可以看到,Netty为每种数据类型,都提供了一个结尾带LE的写入方法,这个带LE的方法则是小端写入方法,那么大小端之间有何差异呢?

大小端写入是网络编程中的通用概念,因为网络数据传输过程中,所有的数据都是以二进制的字节格式传输的,而所谓的大端(Big Endian)写入,是指先写高位,再写低位,高低位又是什么意思呢?

  • 高位写入:指从前往后写,例如1这个数字,比特位形式为00000000 00000001
  • 低位写入:指从后往前写,依旧是1这个数字,比特位形式为00000001 00000000

这里不了解的小伙伴又会疑惑:为啥高位写入时,1在最后面呀?这是因为要先写0,再写1的原因导致的。而反过来。所谓的小端(Little Endian)写入,也就是指先写低位,再写高位。默认情况下,网络通信会采用大端写入的模式。

简单了解Netty-ByteBuf写入数据的API后,接着再来看一些读取数据的API方法,如下:

// 一系列read开头的读取方法,这种方式会改变读取指针(区分大小端)
public abstract boolean readBoolean();
public abstract byte readByte();
public abstract short readUnsignedByte();
public abstract short readShort();
public abstract short readShortLE();
public abstract int readUnsignedShort();
public abstract int readUnsignedShortLE();
public abstract int readMedium();
public abstract int readMediumLE();
public abstract int readUnsignedMedium();
public abstract int readUnsignedMediumLE();
public abstract int readInt();
public abstract int readIntLE();
public abstract long readUnsignedInt();
public abstract long readUnsignedIntLE();
public abstract long readLong();
public abstract long readLongLE();
public abstract char readChar();
public abstract float readFloat();
// 省略其他的read方法.....

// 一系列get开头的读取方法,这种方式不会改变读取指针(区分大小端)
public abstract boolean getBoolean(int var1);
public abstract byte getByte(int var1);
public abstract short getUnsignedByte(int var1);
public abstract short getShort(int var1);
public abstract short getShortLE(int var1);
public abstract int getUnsignedShort(int var1);
public abstract int getUnsignedShortLE(int var1);
public abstract int getMedium(int var1);
public abstract int getMediumLE(int var1);
public abstract int getUnsignedMedium(int var1);
public abstract int getUnsignedMediumLE(int var1);
public abstract int getInt(int var1);
public abstract int getIntLE(int var1);
public abstract long getUnsignedInt(int var1);
public abstract long getUnsignedIntLE(int var1);
public abstract long getLong(int var1);
public abstract long getLongLE(int var1);
public abstract char getChar(int var1);
public abstract float getFloat(int var1);
// 省略其他的get方法.....

在上面列出的一系列读取方法中,主要可分为read、get两大类方法:

  • readXXX():这种方式读取数据后,会导致ByteBuf内部的读取指针随之移动。
  • getXXX():这种方式读取数据后,不会改变ByteBuf内部的读取指针。

那么读取指针改变之后会出现什么影响呢?大家还记得前面聊到的ByteBuf的四部分嘛?前面讲过,读取指针之前的数据部分,都会被标记为废弃部分,这也就意味着通过read系列的方式读取一段数据后,会导致这些数据无法再次被读取到,这里来做个实验:

// 测试ByteBuf的read、get、mark功能
private static void bufferReader(){
    // 分配一个初始容量为10的缓冲区
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);

    // 向缓冲区中写入10个字符(占位十个字节)
    StringBuffer sb = new StringBuffer();
    for (int i = 0; i < 10; i++) {
        sb.append(i);
    }
    buffer.writeBytes(sb.toString().getBytes());

    // 使用read方法读取前5个字节数据
    printBuffer(buffer);
    buffer.readBytes(5);
    printBuffer(buffer);

    // 再使用get方法读取后五个字节数据
    buffer.getByte(5);
    printBuffer(buffer);
}

public static void main(String[] args) {
    bufferReader();
}

在上面的循环中,我是通过StringBuffer来作为缓冲区的数据,但为何不直接写入int数据呢?这是因为int默认会占四个字节,而StringBuffer底层是char,一个字符只占用一个字节~,这里是一个小细节,接着来看看运行结果:

ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量大小=10}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39                   |0123456789      |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量大小=10}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39                                  |56789           |
+--------+-------------------------------------------------+----------------+

ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量大小=10}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39                                  |56789           |
+--------+-------------------------------------------------+----------------+

从上述结果中可看出,使用readBytes()方法读取五个字节后,读取指针会随之移动到5,接着看看前后的数据变化,此时会发现数据从0123456789变成了56789,这是因为前面五个字节的数据,已经属于废弃部分了,所以printBuffer()方法无法读取显示。

接着再看看后面,通过getByte()读取五个字节后,此时ByteBuf对象的读取指针,显然不会随之移动,也就是通过get系列方法读取缓冲区数据,并不会导致读过的数据废弃。

那如果使用read系列方法读取数据后,后续依旧想要读取数据该怎么办呢?这里可以使用ByteBuf内部的标记指针实现,如下:

// 在上述方法的最后继续追加下述代码:

// 使用mark标记一下读取指针,然后再使用read方法读取数据
buffer.markReaderIndex();
buffer.readBytes(5);
printBuffer(buffer);

// 此时再通过reset方法,使读取指针恢复到前面的标记位置
buffer.resetReaderIndex();
printBuffer(buffer);

此时再次查询运行结果,如下:

ByteBuf缓冲区信息:{读取指针=10, 写入指针=10, 容量大小=10}

ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量大小=10}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39                                  |56789           |
+--------+-------------------------------------------------+----------------+

从结果中可以明显看到,对读取指针做了标记后,再次使用read系列方法读取数据,依旧会导致读过的部分变为废弃数据,但后续可以通过reset方法,将读取指针恢复到前面的标记位置,然后再次查看缓冲区的数据,就会发现数据又可以重复被读取啦~

其实除开可以通过markReaderIndex()、resetReaderIndex()方法标记、恢复读取指针外,还可以通过markWriterIndex()、resetWriterIndex()方法来标记、恢复写入指针。标记读取指针后,可以让缓冲区中的一段数据被多次read读取,而标记写入指针后,可以让缓冲区的一段区间被反复写入,但每次后面的写入会覆盖前面写入的数据。

OK~,对于ByteBufAPI操作就介绍到这里,其实内部提供了一百多个API方法,但我就不一一去做说明啦,大家点进源码后就能看到,感兴趣的小伙伴可以自行调试!

5.4、ByteBuf的内存回收

在前面聊到过,Netty-ByteBuf在除安卓平台外,都会使用池化技术来创建,那一个已创建出的ByteBuf对象,其占用的内存在什么情况下会归还给内存池呢?想要聊明白这点,得先理解ByteBuf的引用释放。

首先来看看Netty-ByteBuf的类关系:

public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>

从上面的类定义中可明显看到,ByteBuf实现了ReferenceCounted接口,该接口翻译过来的含义则是引用计数,该接口中提供的方法列表如下:

public interface ReferenceCounted {
    // 查看一个对象的引用计数统计值
    int refCnt();
    
    // 对一个对象的引用计数+1
    ReferenceCounted retain();
    // 对一个对象的引用计数+n
    ReferenceCounted retain(int var1);
    
    // 记录当前对象的当前访问位置,内存泄漏时会返回该方法记录的值
    ReferenceCounted touch();
    ReferenceCounted touch(Object var1);
    
    // 对一个对象的引用计数-1
    boolean release();
    // 对一个对象的引用计数-n
    boolean release(int var1);
}

重点关注retain()、release()方法,这两个方法分别对应加/减一个对象的引用计数,把ByteBuf套入进来,当一个缓冲区对象的引用计数为0时,会清空当前缓冲区中的数据,并且将占用的内存归还给内存池,所有尝试再次访问该ByteBuf对象的操作,都会被拒绝。简单来说,一句话总结就是:当一个ByteBuf对象的引用计数变为0时,该缓冲区就会变为外部不可访问的状态

综上所述,在使用完一个ByteBuf对象后,明确后续不会用到该对象时,一定要记得手动调用release()清空引用计数,否则会导致该缓冲区长久占用内存,最终引发内存泄漏。

这里拓展一点小细节,似乎在Netty-Channel中,都会采用ByteBuf来发送/接收数据,那这些通道传输数据用的ByteBuf对象,其占用的内存会在何时回收呢?这会牵扯到前面的ChannelPipeline链表。

  • Head处理器:
    • 如果通道上只有入站处理器,它会作为整个处理器链表的第一个处理器调用。
    • 如果通道上只有出站处理器,它会作为整个处理器链表的最后一个处理器调用。
    • 如果通道上入/出站处理器都有,它会作为入站的第一个处理调用,出站的最后一个处理器调用。
  • Tail处理器:
    • 如果通道上只有入站处理器,Tail节点会作为整个链表的最后一个处理器调用。
    • 如果通道上只有出站处理器,Tail节点会作为整个链表的第一个处理器调用。
    • 如果通道上入/出站处理器都有,它会作为出站的第一个调用、入站的最后一个调用。

结合上面所说的内容,Head、Tail处理器在任何情况下,其中至少会有一个,作为通道上的最后一个处理器调用,而在这两个头尾处理器中,会自动释放ByteBuf的工作,先来看看Head处理器,源码如下:

// ChannelPipeline处理器链表的默认实现类
public class DefaultChannelPipeline implements ChannelPipeline {
    // Head处理器的实现类:同时实现了入站、出站处理器接口
    final class HeadContext extends AbstractChannelHandlerContext 
            implements ChannelOutboundHandler, ChannelInboundHandler {
        
        // 作为入站链表第一个处理器时,会调用的方法
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 继续往下调用其他自定义的入站处理器
            ctx.fireChannelRead(msg);
        }
        
        // 作为出站链表的最后一个处理器时,会调用的方法
        public void write(ChannelHandlerContext ctx, Object msg, 
                                            ChannelPromise promise) {
            // unsafe.write()最终会调用到AbstractUnsafe.write()方法
            this.unsafe.write(msg, promise);
        }
    }
    // 省略其他方法....
}

public abstract class AbstractChannel extends DefaultAttributeMap 
                                            implements Channel {
    protected abstract class AbstractUnsafe implements Unsafe {
        public final void write(Object msg, ChannelPromise promise) {
            this.assertEventLoop();
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            
            // 这里先不需要理解,后续源码篇会聊
            if (outboundBuffer == null) {
                this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.
                                this.initialCloseCause));
                
                // 最终在这里,依旧调用了引用计数工具类的release方法
                ReferenceCountUtil.release(msg);
            } else {
                int size;
                try {
                    msg = AbstractChannel.this.filterOutboundMessage(msg);
                    size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable var6) {
                    this.safeSetFailure(promise, var6);
                    // 这里也会调用了引用计数工具类的release方法
                    ReferenceCountUtil.release(msg);
                    return;
                }
    
                outboundBuffer.addMessage(msg, size, promise);
            }
        }
        // 省略其他方法....
    }
    // 省略其他类与方法....
}

// 引用计数工具类
public final class ReferenceCountUtil {
    public static boolean release(Object msg) {
        // 这里会先判断一下对应的msg对象是否实现了引用计数接口,
        // 只有对应的msg实现了ReferenceCounted接口时,才会释放引用
        return msg instanceof ReferenceCounted ? 
                ((ReferenceCounted)msg).release() : false;
    }
    
    // 省略其他方法.....
}

Head节点会作为出站链表的最后一个处理器调用,因此在所有自定义出站处理器执行完成后,最终调用该节点的write()方法,在这个方法内部,最终调用了AbstractUnsafe.write()方法,对应的方法实现中,咱们仅需关注ReferenceCountUtil.release(msg)这行代码即可,最终会在该工具类中释放msg对象的引用计数。

接着再来看看Tail节点的实现源码:

// ChannelPipeline处理器链表的默认实现类
public class DefaultChannelPipeline implements ChannelPipeline {

    // Tail处理器的实现类:实现了入站处理器接口,作为入站调用链最后的处理器
    final class TailContext extends AbstractChannelHandlerContext 
                                    implements ChannelInboundHandler {
        // 所有自定义的入站处理器执行完成后,会调用的方法
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
        }
        
        // 省略其他方法.....
    }
    
    // 前面Tail、Head调用的释放方法
    protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
        // 调用释放ByteBuf缓冲区的方法
        this.onUnhandledInboundMessage(msg);
        // 记录日志
        if (logger.isDebugEnabled()) {
            logger.debug("Discarded message pipeline :" + 
                "{}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
        }
    }
    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug("Discarded inbound message {} that reached " + 
                "at the tail of the pipeline. Please check your pipeline" + 
                "configuration.", msg);
        } finally {
            // 最终调用了引用计数工具类的release方法
            ReferenceCountUtil.release(msg);
        }
    }
}

Tail节点会作为入站链表的最后一个处理器调用,所以在执行Tail处理器时,最终会调用它的channelRead()方法,而在相应的方法内部,调用了onUnhandledInboundMessage()方法,跟着源码继续走,此时也会发现,最终也调用了ReferenceCountUtil.release(msg)方法来释放引用。

根据源码中的推断,似乎Netty框架发送/接收数据用的ByteBuf,都会由头尾处理器来释放,但答案确实如此吗?NO,为什么呢?再次将目光放到ReferenceCountUtil.release(msg)这处代码:

// 引用计数工具类
public final class ReferenceCountUtil {
    public static boolean release(Object msg) {
        // 这里会先判断一下对应的msg对象是否实现了引用计数接口,
        // 只有对应的msg实现了ReferenceCounted接口时,才会释放引用
        return msg instanceof ReferenceCounted ? 
                ((ReferenceCounted)msg).release() : false;
    }
    
    // 省略其他方法.....
}

// ByteBuf的类定义
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>

此时大家注意看,ReferenceCountUtil.release()在执行前,会先判断一下当前的msg是否实现了ReferenceCounted接口,而ByteBuf是实现了的,因此如果执行到Head/Tail处理器时,msg数据依旧为ByteBuf类型,头尾处理器自然可以完成回收工作,但如若是下面这种情况呢?

pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
                            throws Exception {
        System.out.println("俺是In-①入站处理器...");
        
        // 在第一个入站处理器中,将接收到的ByteBuf数据转换为String向下传递
        ByteBuf buffer = (ByteBuf) msg;
        String message = buffer.toString(Charset.defaultCharset());

        super.channelRead(ctx, message);
    }
});
pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
                            throws Exception {
        System.out.println("我是In-②入站处理器...");
        super.channelRead(ctx, msg);
    }
});

在上述这个案例中,咱们在第一个入站处理器中,将接收到的ByteBuf数据转换为String向下传递,也就意味着从In-②处理器开始,后面所有的处理器收到的msg都为String类型,当自定义的两个处理器执行完成后,最终会调用Tail处理器完成收尾工作,但问题来了!

因为在In-①msg类型发生了改变,所以当Tail处理器中调用ReferenceCountUtil.release()时,由于String并未实现ReferenceCounted接口,所以Tail无法对该msg进行释放,最终就会造成内存泄漏问题。

但此时内存泄漏,发生在哪个位置呢?答案是位于In-①中,因为In-①处理器中就已经将ByteBuf用完了,将其中的数据转换成了String类型,而ByteBuf后续处理器都不会用到,因此该ByteBuf占用的内存永远不会被释放,所以一定要注意:在使用处理器的过程中,如果明确ByteBuf不会继续使用,那请一定要记得手动调用release()方法释放引用,以上述案例说明:

ByteBuf buffer = (ByteBuf) msg;
String message = buffer.toString(Charset.defaultCharset());
buffer.release();

当明确不使用该ByteBuf值时,请记住调用对应的release()方法释放引用!这样能够有效避免内存泄漏的问题出现,有人也许会说,JVM不是有GC机制吗?为什么会出现内存泄漏呀?

关于上述问题的道理十分简单,因为Netty默认采用本地内存来创建缓冲区,并且会利用池化技术管理所有缓冲区,如果一个ByteBuf对象的引用不为0,那么该ByteBuf会永久的占用内存资源,Netty无法主动将其占用的内存回收到池中。

5.5、 Netty中的零拷贝技术

想要讲清楚Netty-ByteBuf中的零拷贝技术,那首先得先明白零拷贝到底是个啥,因此咱们先讲明白零拷贝的概念,再讲清楚操作系统的零拷贝技术,然后再说说Java-NIO中的零拷贝体现,最后再来聊Netty-ByteBuf中的零拷贝技术。

六、随处可见的零拷贝技术

   零拷贝这个词,在很多地方都有出现,例如Kafka、Nginx、Tomcat、RocketMQ...的底层都使用了零拷贝的技术,那究竟什么叫做零拷贝呢?其实所谓的零拷贝,并不是不需要经过数据拷贝,而是减少内存拷贝的次数,上个例子来理解,比如Nginx向客户端提供文件下载的功能。

客户端要下载的文件都位于Nginx所在的服务器磁盘中,如果当一个客户端请求下载某个资源文件时,这时需要经过的步骤如下: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 先来简单聊一聊文件下载时,Nginx服务器内部的数据传输过程:

  • ①客户端请求下载服务器上的某个资源,Nginx解析请求并得知客户端要下载的具体文件。
  • NginxOS发起系统IO调用,调用内核read(fd)函数,应用上下文切态至内核空间。
  • read()函数通过DMA控制器,将目标文件的数据从磁盘读取至内核缓冲区。
  • DMA传输数据完成后,CPU将数据从内核缓冲区拷贝至用户缓冲区(程序的内存空间)。
  • CPU拷贝数据完成后,read()调用结束并返回,上下文从内核态切回用户态。
  • Nginx再次向OS发起内核write(fd)函数的系统调用,应用上下文再次切到内核态。
  • ⑦接着CPU将用户缓冲区中的数据,写入到Socket网络套接字的缓冲区。
  • ⑧数据复制到Socket缓冲区后,DMA控制器将Socket缓冲区的数据传输到网卡设备。
  • DMA控制器将数据拷贝至网卡设备后,write()函数调用结束,再次切回用户态。
  • ⑩文件数据抵达网卡后,Nginx准备向客户端响应数据,组装报文返回数据......

从上述流程大家可得知,一次文件下载传统的IO流程,需要经过四次切态,四次数据拷贝(CPU、DMA各两次),而所谓的零拷贝,并不是指不需要经过数据拷贝,而是指减少其中的数据拷贝次数。

6.1、操作系统中的零拷贝技术

我这里指的操作系统默认是Linux,因为MacOS、Windows系统相对闭源,因此对于这两个操作系统中的零拷贝技术个人并不熟悉。在Linux中提供了多种零拷贝的实现:

  • MMAP共享内存 + write()系统函数。
  • sendfile()内核函数。
  • ③结合DMA-Scatter/Gather Copy收集拷贝功能实现的sendfile()函数。
  • splice()内核函数。

6.1.1、MMAP共享内存

什么又叫做内存映射技术呢?这个其实很好理解,就好比Linux中的软链接、Windows中的快捷方式一样,拿大家熟悉的Windows系统来说,一般在安装一个程序后,为了方便后续使用,通常都会默认在桌面上生成快捷方式(图标),这个快捷方式其实并不是一个真正的程序,而是指向安装目录下xxx.exe的链接。

Windows系统上安装一个程序后,咱们可以通过点击桌面图标打开,亦可双击安装目录下的xxx.exe文件启动,而操作系统中的共享内存也是同样的思路。

在主流操作系统中都有一种名为虚拟内存的机制,这是指可以分配多个虚拟内存地址,指向同一个物理内存地址,此时内核态程序和用户态程序,可以通过不同的虚拟地址,来操纵同一块物理内存,这也就是MMAP共享内存技术的真正实现。

MMAP的系统定义如下:

void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
  • addr:指定映射的虚拟内存地址。
  • length:映射的内存空间长度。
  • prot:映射内存的保护模式。
  • flags:指定映射的类型。
  • fd:进行映射的文件句柄。
  • offset:文件偏移量。

还是之前那幅图(不要问我为什么,因为懒的画~),重点看图中圈出来的区域: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 如果内核缓冲区和用户缓冲区使用了MMAP共享内存,那当DMA控制器将数据拷贝至内核缓冲区时,因为这里的内核缓冲区,本质是一个虚拟内存地址指向用户缓冲区,所以DMA会直接将磁盘数据拷贝至用户缓冲区,这就减少了一次内核缓冲区到用户缓冲区的CPU拷贝过程,后续直接调用write()函数把数据写到Socket缓冲区即可,因此这也是一种零拷贝的体现。

6.1.2、sendfile()内核函数

sendfile()Linux2.1版本中推出的一个内核函数,系统调用的原型如下:

ssize_t sendfile(int fd_in, int fd_out, off_t *offset, size_t count);
  • fd_in:待写入数据的文件描述符(一般为Socket网络套接字的描述符)。
  • fd_out:待读取数据的文件描述符(一般为磁盘文件的描述符)。
  • offset:磁盘文件的文件偏移量。
  • count:声明在fd_outfd_in之间,要传输的字节数。
  • ①客户端请求下载服务器上的某个资源,Nginx解析请求并得知客户端要下载的具体文件。
  • NginxOS发起系统IO调用,调用内核sendfile()函数,上下文切态至内核空间。
  • sendfile()函数通过DMA控制器,将目标文件的数据从磁盘读取至内核缓冲区。
  • DMA传输数据完成后,CPU将数据从内核缓冲区拷贝至Socket缓冲区。
  • CPU拷贝数据完成后,DMA控制器将数据从Socket缓冲区拷贝至网卡设备。
  • ⑥数据拷贝到网卡后,sendfile()调用结束,应用上下文切回用户态空间。
  • Nginx准备向客户端响应数据,组装报文返回数据......

相较于原本的MMAP+write()的方式,使用sendfile()函数来处理IO请求,这显然性能更佳,因为这里不仅仅减少了一次CPU拷贝,而且还减少了两次切态的过程。

6.1.3、DMA-Scatter/Gather Copy - sendfile()函数

前面聊了Linux2.1版本中的sendfile()函数,而到了Linux2.4版本中,又对sendfile()做了升级,引入了S/G-DMA技术支持,也就是在DMA拷贝阶段,如果硬件支持的情况下,会加入Scatter/Gather操作,这样就省去了仅有的一次CPU拷贝过程,如下: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 优化后的sendfile()函数,拷贝数据时只需要告知out_fd、in_fd、count即可,然后DMA控制器会直接将数据从磁盘拷贝至网卡,而无需经过CPU将数据拷贝至Socket缓冲区这一步。

6.1.4、splice()内核函数

前面聊到的sendfile()函数只适用于将数据从磁盘文件拷贝到Socket套接字或网卡上,所以这也限制了它的使用范围,因此在Linux2.6版本中,引入了splice()函数,其系统调用的原型如下:

ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, 
                                            size_t len, unsigned int flags);
  • fd_in:等待写入数据的文件描述符。
  • off_in:如果fd_in是一个管道文件(如Socket),该值必须为NULL,否则为文件的偏移量。
  • fd_out:等待读取数据的文件描述符。
  • off_out:作用同off_in参数。
  • len:指定fd_in、fd_out之间传输数据的长度。
  • flags:控制数据传输的模式:
    • SPLICE_F_MOVE:如果数据合适,按标准页大小移动数据(2.6.21版本后被废弃)。
    • SPLICE_F_NONBLOCK:以非阻塞式模式执行splice(),实际依旧会受FD状态影响。
    • SPLICE_F_MORE:给内核一个提示,后续splice()还会继续传输更多的数据。
    • SPLICE_F_GIFT:没有效果的选项。

使用splice函数时,fd_in、fd_out中必须至少有一个是管道文件描述符,套到网络编程中的含义即是指:必须要有一个文件描述符是Socket类型,如果两个磁盘文件进行复制,则无法使用splice函数。

splice()函数的作用和DMA-Scatter/Gather版的sendfile()函数完全相同,但与其不同的是:splice()函数不仅不需要硬件支持,而且能够做到两个文件描述符之间的数据零拷贝,实现的过程是基于一端的管道文件描述符,在两个FD之间搭建pipeline管道,从而实现两个FD之间的数据零拷贝。

6.2、另类的零拷贝技术

前面聊到了四种Linux系统中的零拷贝技术,而除开Linux系统中的零拷贝技术外,还有一些另类的零拷贝实现,先来聊一聊缓冲区共享技术,然后再聊聊应用程序中的零拷贝体现。

6.2.1、缓冲区共享

缓冲区共享技术类似于Linux中的MMAP共享内存,但缓冲区共享则是真正意义上的内存共享技术,内核缓冲区和用户缓冲区共享同一块内存,如下: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 操作系统一般为了系统的安全性,在运行期间都会分为用户态和内核态,无法直接访问用户态程序内核态空间,所以Linux中的MMAP是基于虚拟内存实现的,而想要实现真正意义上的内存共享,这也就意味着需要重写内核结构,目前比较成熟的只有Solaris系统上的Fast Buffer技术,但大家只需了解即可,因为这个也很少用到。

6.2.2、程序数据的零拷贝

前面聊到的零拷贝技术,都是在减少磁盘文件和网络套接字之间的数据拷贝次数,而程序中也会存在很多的数据拷贝过程,比如将一个大集合拆分为两个小集合、将多个小集合合并成一个大集合等等,传统的做法如下:

List<Integer> a = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> b = new ArrayList<>();
List<Integer> c = new ArrayList<>();

for (Integer num : a) {
    int index = a.indexOf(num);
    if (index < 5){
        b.add(num);
    } else {
        c.add(num);
    }
}

而这种做法显然会牵扯到数据拷贝,但上述这个做法,会从a中将数据拷贝到b、c集合中,而所谓的零拷贝,即是无需发生拷贝动作,也能够将a拆分成b、c两个集合。

关于具体如何实现,这点待会儿在Netty-ByteBuf中演示,因为Netty中的零拷贝技术,也实现了程序数据的零拷贝。

6.3、Java-NIO中的零拷贝体现

Java-NIO中,主要有三个方面用到了零拷贝技术:

  • MappedByteBuffer.map():底层调用了操作系统的mmap()内核函数。
  • DirectByteBuffer.allocateDirect():可以直接创建基于本地内存的缓冲区。
  • FileChannel.transferFrom()/transferTo():底层调用了sendfile()内核函数。

观察上述给出的三处位置,其实本质也就是在调用操作系统内核提供的零拷贝函数,以此减少数据的拷贝次数。

6.4、再聊Netty中的零拷贝体现

Netty中的零拷贝与前面操作系统层面的零拷贝不同,它是一种用户进程级别的零拷贝体现,主要也包含三方面:

Netty的发送、接收数据的ByteBuf缓冲区,默认会使用堆外本地内存创建,采用直接内存进行Socket读写,数据传输时无需经过二次拷贝。如果使用传统的堆内存进行Socket网络数据读写,JVM需要先将堆内存中的数据拷贝一份到直接内存,然后才写入Socket缓冲区中,相较于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

Netty的文件传输采用了transferTo()/transferFrom()方法,它可以直接将文件缓冲区的数据发送到目标Channel(Socket),底层就是调用了sendfile()内核函数,避免了文件数据的CPU拷贝过程。

Netty提供了组合、拆解ByteBuf对象的API,咱们可以基于一个ByteBuf对象,对数据进行拆解,也可以基于多个ByteBuf对象进行数据合并,这个过程中不会出现数据拷贝,下面重点聊一聊这个!

其中前两条就不过多赘述了,毕竟前面都唠叨过好几次,重点说说第三种零拷贝技术,这是一种Java级别的零拷贝技术,ByteBuf中主要有slice()、composite()这两个方法,用于拆分、合并缓冲区,先来聊聊拆分缓冲区的方法,案例如下:

// 测试Netty-ByteBuf的slice零拷贝方法
private static void sliceZeroCopy(){
    // 分配一个初始容量为10的缓冲区
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);

    // 写入0~9十个字节数据
    byte[] numData = {'0','1','2','3','4','5','6','7','8','9'};
    buffer.writeBytes(numData);
    printBuffer(buffer);

    // 从下标0开始,向后截取五个字节,拆分成一个新ByteBuf对象
    ByteBuf b1 = buffer.slice(0, 5);
    printBuffer(b1);
    // 从下标5开始,向后截取五个字节,拆分成一个新ByteBuf对象
    ByteBuf b2 = buffer.slice(5, 5);
    printBuffer(b2);

    // 证明切割出的两个ByteBuf对象,是共享第一个ByteBuf对象数据的
    // 这里修改截取后的b1对象,然后查看最初的buffer对象
    b1.setByte(0,'a');
    printBuffer(buffer);
}

public static void main(String[] args) {
    sliceZeroCopy();
}

在上述方法中,首先创建了一个buffer对象,往其中写入了0~9这十个字符,接着将其拆分成了b1、b2这两个ByteBuf对象,b1、b2都具备独立的读写指针,但却并未真正的从buffer中拷贝新的数据出来,而是基于buffer这个对象,进行了数据截取,运行结果如下:

ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量大小=10}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39                   |0123456789      |
+--------+-------------------------------------------------+----------------+

ByteBuf缓冲区信息:{读取指针=0, 写入指针=5, 容量大小=5}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34                                  |01234           |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=0, 写入指针=5, 容量大小=5}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39                                  |56789           |
+--------+-------------------------------------------------+----------------+

ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量大小=10}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 31 32 33 34 35 36 37 38 39                   |a123456789      |
+--------+-------------------------------------------------+----------------+

观察上述第二、三个ByteBuf缓冲区信息,与前面说的毫无差异,明显都具备独立的读写指针,但我为什么说:b1、b2没有拷贝数据呢?接着看方法中的最后一步,我对b1的第一个元素做了修改,然后输出了buffer对象,看上述结果中的第四个ByteBuf缓冲区信息,其实会发现:buffer对象中下标为0的数据,也被改成了a!由此即可证明前面的观点。

不过这种零拷贝方式,虽然减少了数据复制次数,但也会有一定的局限性: ①使用slice()方法拆分出的ByteBuf对象,不支持扩容,也就是切割的长度为5,最大长度也只能是5,超出长度时会抛出下标越界异常。 ②由于拆分出的ByteBuf对象,其数据依赖于原ByteBuf对象,因此当原始ByteBuf对象被释放时,拆分出的缓冲区也会不可用,所以在使用slice()方法时,要手动调用retain()/release()来增加引用计数(这个后面细聊)。

除开上述的slice()方法外,还有其他一个叫做duplicate()的零拷贝方法,它的作用是完全克隆原有ByteBuf对象,但读写指针都是独立的,并且支持自动扩容,大家感兴趣可以自行实验。

接着聊一聊合并ByteBuf缓冲区的零拷贝方法,该方法的使用方式与前面的方法并不同,如下:

// 测试Netty-ByteBuf的composite零拷贝方法
private static void compositeZeroCopy(){
    // 创建两个小的ByteBuf缓冲区,并往两个缓冲区中插入数据
    ByteBuf b1 = ByteBufAllocator.DEFAULT.buffer(5);
    ByteBuf b2 = ByteBufAllocator.DEFAULT.buffer(5);
    byte[] data1 = {'a','b','c','d','e'};
    byte[] data2 = {'n','m','x','y','z'};
    b1.writeBytes(data1);
    b2.writeBytes(data2);

    // 创建一个合并缓冲区的CompositeByteBuf对象
    CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
    // 将前面两个小的缓冲区,合并成一个大的缓冲区
    buffer.addComponents(true,b1,b2);
    printBuffer(buffer);
}

public static void main(String[] args) {
    compositeZeroCopy();
}

/* * 运行结果:
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量大小=10}
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 6e 6d 78 79 7a                   |abcdenmxyz      |
+--------+-------------------------------------------------+----------------+
* */

案例中,想要将多个缓冲区合并成一个大的缓冲区,需要先创建一个CompositeByteBuf对象,接着调用它的addComponent()/addComponents()方法,将小的缓冲区添加进去即可。但在合并多个缓冲区时,addComponents()方法中的第一个参数必须为true,否则不会自动增长读写指针。

其实说到底,Netty-ByteBuf缓冲区的零拷贝方法,实际上也可以被称之为“一种特殊的浅拷贝”,与之对应的是“深拷贝”,而ByteBuf中的“深拷贝”,则是一系列以Copy开头的方法,通过这类方法复制缓冲区,会完全分配新的内存地址、读写指针。

最后,在Netty内部还提供了一个名为Unpooled的工具类,这主要是针对于非池化缓冲区的工具类,内部也提供了一系列wrappend开头的方法,可以用来组合、包装多个ByteBuf对象或字节数组,调用对应方法时,内部也不会发生拷贝动作,这也是一类零拷贝的方法。

七、Netty入门篇小结

   经过上述一系列的叨叨絮絮后,对于Netty框架的基本概念,以及Netty框架中大多数核心组件做了介绍,但对于一些粘包、半包、解码器、长连接、心跳机制等内容未阐述,原本打算将这些内容一篇写完,但本章的字数实在太多,严重超出单章限制: (九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽! 因此对于后续一些进阶的知识,会再开设一篇讲述,预计Netty的文章会有4~5篇左右,大体顺序为《Netty入门篇》、《Netty进阶篇》、《Netty实战篇》、《Netty应用篇》、《Netty源码篇》,但具体的篇幅会在后续适当调整。

本篇的内容就到这里啦,如若对你有帮助,请记得点个小赞支持一下~