likes
comments
collection
share

手把手教你如何从源码到应用——Netty(一)

作者站长头像
站长
· 阅读数 32

前言

首先需要说明的是,这篇文章不是原创,是在大佬的基础上修正了一点内容发出来的,算是我学习跟着大佬的文章一步一步走来的学习路程,他的文章里有一些初学者不理解的点,包括里面的笔误和分析源码的流程的失误,我都有修正,但是,不是原创,不是原创,不是原创!!!!原文链接在这里!!!不过看在我整理了这么多的份上,各位大哥能不能点个赞给小弟。

第二篇在这里: 手把手教你如何从源码到应用——Netty(二)

准备环境

首先就是使用maven搭建一个最简单的环境,使用到的maven依赖如下所示:

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-example</artifactId>
   <version>4.1.25.Final</version>
</dependency>

Echo例子

打开netty-example的源码,可以发现它下面有着一个叫做echo的包,把里面的代码复制出来,它就是客户端传什么值,服务端就原样返回什么值。

手把手教你如何从源码到应用——Netty(一)

我们先来看一下上述代码中涉及到的一些内容:

  • ServerBootstrap 类用于创建服务端实例,Bootstrap 用于创建客户端实例。
  • 两个 EventLoopGroup:bossGroup 和 workerGroup,它们涉及的是 Netty 的线程模型,可以看到服务端有两个 group,而客户端只有一个,它们就是 Netty 中的线程池。
  • Netty 中的 Channel,没有直接使用 Java 原生的 ServerSocketChannel 和 SocketChannel,而是包装了 NioServerSocketChannel 和 NioSocketChannel 与之对应。当然,也有对其他协议的支持,如支持 UDP 协议的 NioDatagramChannel,本文只关心 TCP 相关的。
  • 左边 handler(…) 方法指定了一个 handler(LoggingHandler),这个 handler 是给服务端收到新的请求的时候处理用的。右边 handler(...) 方法指定了客户端处理请求过程中需要使用的 handlers。如果你想在 EchoServer 中也指定多个 handler,也可以像右边的 EchoClient 一样使用 ChannelInitializer
  • 左边 childHandler(…) 指定了 childHandler,这边的 handlers 是给新创建的连接用的,我们知道服务端 ServerSocketChannel 在 accept 一个连接以后,需要创建 SocketChannel 的实例,childHandler(…) 中设置的 handler 就是用于处理新创建的 SocketChannel 的,而不是用来处理 ServerSocketChannel 实例的。
  • pipeline:handler 可以指定多个(需要上面的 ChannelInitializer 类辅助),它们会组成了一个 pipeline,它们其实就类似拦截器的概念,现在只要记住一点,每个 NioSocketChannel 或 NioServerSocketChannel 实例内部都会有一个 pipeline 实例。pipeline 中还涉及到 handler 的执行顺序。
  • ChannelFuture:这个涉及到 Netty 中的异步编程,和 JDK 中的 Future 接口类似。

Netty中的Channel

Netty中的Channel和JDK中的Channel是一一对应的,比如Netty中的NioSocketChannel与Java中底层的SocketChannel,Netty中的NioServerSocketChannel与Java底层的ServerSocketChannel。

在netty-example中的Echo示例中,存在着如下代码:

	// Server端代码
	try {
            ServerBootstrap b = new ServerBootstrap();
            ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.SO_BACKLOG, 100)).handler(new LoggingHandler(LogLevel.INFO))).childHandler(new ChannelInitializer<SocketChannel>() {
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    if (sslCtx != null) {
                        p.addLast(new ChannelHandler[]{sslCtx.newHandler(ch.alloc())});
                    }

                    p.addLast(new ChannelHandler[]{new EchoServerHandler()});
                }
            });
            ChannelFuture f = b.bind(PORT).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
	// client端代码
	try {
            Bootstrap b = new Bootstrap();
            ((Bootstrap)((Bootstrap)((Bootstrap)b.group(group)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, true)).handler(new ChannelInitializer<SocketChannel>() {
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    if (sslCtx != null) {
                        p.addLast(new ChannelHandler[]{sslCtx.newHandler(ch.alloc(), EchoClient.HOST, EchoClient.PORT)});
                    }

                    p.addLast(new ChannelHandler[]{new EchoClientHandler()});
                }
            });
            ChannelFuture f = b.connect(HOST, PORT).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }

在客户端和服务端中的启动过程中都会调用channel()方法,接下来我们来看到channel()方法的源码:

	// AbstractBootstrap
	public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        } else {
            return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
        }
    }

可以看到源码中调用了this.channelFactory()方法,说明这个方法是用于设置类中的ChannelFactory实例的,并且传入了new ReflectiveChannelFactory(channelClass))实例作为参数,其中channelClass就是我们调用channel方法时,传入的NioServerSocketChannel | NioSocketChannel的类对象。

然后我们看到这个ReflectiveChannelFactory的源码,看看它到底是什么?

package io.netty.channel;

import io.netty.util.internal.StringUtil;

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        } else {
            this.clazz = clazz;
        }
    }

    public T newChannel() {
        try {
            return (Channel)this.clazz.getConstructor().newInstance();
        } catch (Throwable var2) {
            throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
        }
    }

    public String toString() {
        return StringUtil.simpleClassName(this.clazz) + ".class";
    }
}

通过源码,我们重点需要了解newChannel方法,这个类就是一个工厂类,也就是使用了工厂模式,这个类的newChannel方法通过类的反射去调用Channel的无参构造方法去创建一个具体的Channel对象。我们只需要直到这个ChannelFactory的newChannel方法什么时候会被调用就好了。

  • 对于NioSocketChannel,它主要用作客户端,所以它的创建时机是在connect(...)方法的时候
  • 对于NioServerSocketChannel,它主要作用于服务端,所以它的创建时机是在绑定端口bind(...)的时候

接下来我们简单看看客户端的Bootstrap中的NioSocketChannel的创建过程,看看NioSocketChannel是怎么和JDK中的SocketChannel关联在一起的。

// Bootstrap类
public ChannelFuture connect(InetAddress inetHost, int inetPort) {
    return connect(new InetSocketAddress(inetHost, inetPort));
}

继续走代码:

public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    // validate 方法用于校验各个参数是否正确设置了
    validate();
    return doResolveAndConnect(remoteAddress, config.localAddress());
}

继续:

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 重点就在这部分了
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    ......
}

首先看到initAndRegister()方法:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 这里就是客户端创建Channel实例的时机 也就是在connect方法内一步一步走到这里
        channel = channelFactory.newChannel();
        init(channel);
        ......
    } catch (Throwable t) {
        ......
    }
    ......
    return regFuture;
}

那么走到这里,就能看到Channel的创建时机了,前面看到channelFactory这个方法的源码的时候,知道它是通过反射调用无参构造方法创建对应的Channel实例的,所以这个时候去看看NioSocketChannel的无参构造方法源码就能知道NioSocketChannel与SocketChannel的联系了:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

public NioSocketChannel() {
    // SelectorProvider 实例用于创建 JDK 的 SocketChannel 实例
    this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
    // 这一行代码就是关键
    // 这行代码会使用provider去创建JDK的SocketChannel实例
    this(newSocket(provider));
}

继续往里走:

private static SocketChannel newSocket(SelectorProvider provider) {
    try {
        // 创建 SocketChannel 实例
        return provider.openSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);
    }
}

NioServerSocketChannel的创建时机以及与ServerSocketChannel的联系也是一样的,从bind方法中一路点击就知道了。

所以我们现在知道了,在创建NioSocketChannel的时候,会先实例化JDK底层的SocketChannel实例,NioServerSocketChannel也是一样的,会先实例化ServerSocketChannel实例。

继续往下看NioSocketChannel的构造方法:

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

我们重点关注于第一行代码,第二行代码看着就知道是设置了Channel的一些配置信息,这里暂时跳过,第一行代码调用了父类构造器,除了设置属性之外,还设置了SocketChannel的非阻塞模式:

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    // 客户端肯定关心的是 OP_READ 事件 等待读取服务端返回数据
    super(parent, ch, SelectionKey.OP_READ);
}

// 然后是到这里
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    // 我们看到这里使用 readInterestOp 保存了 SelectionKey.OP_READ 这个信息
    this.readInterestOp = readInterestOp;
    try {
        // 设置channel的非阻塞模式
        ch.configureBlocking(false);
    } catch (IOException e) {
        ......
    }
}

NioServerSocketChannel的构造方法类似,也是设置了非阻塞模式,然后设置服务端关心的SelectionKey.OP_ACCEPT 事件:

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 对于服务端来说 关心的是 SelectionKey.OP_ACCEPT 事件 等待客户端连接
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

总结

这一小节主要就是介绍了一下Channel,Netty中的Channel与JDK底层的Channel的联系,它们是一对一的,并且分析了channel方法的源码,发现它的底层是使用了工厂模式去创建Channel,并且是利用反射去调用相关channel的无参构造方法去创建channel实例,并且分别说明了channel的创建时机,分别如下所示:

  • NioServerSocketChannel:创建时机在于bind方法中
  • NioSocketChannel:创建时机在于connect方法中,本小节主要分析了它的创建流程

Netty中的Future、Promise

前面在介绍Echo例子的时候,已经使用到了ChannelFuture这个接口了:

手把手教你如何从源码到应用——Netty(一)

本节主要是搞清楚上面这几行划线部分是如何走的。

关于Future接口,我们最常接触到的就是在使用Java线程池ThreadPoolExecutor的时候了,通常在submit一个任务之后返回的就是一个Future实例,然后通过它来获取提交的任务的执行状态和执行结果,最常使用的方法就是isDone方法、get方法。

下面是JDK中Future接口的源码了:

public interface Future<V> {
    // 取消该任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 任务是否已取消
    boolean isCancelled();
    // 任务是否已完成
    boolean isDone();
    // 阻塞获取任务执行结果
    V get() throws InterruptedException, ExecutionException;
    // 带超时参数的获取任务执行结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

而Netty中的Future接口继承了JDK的Future接口,并且添加了一些方法:

public interface Future<V> extends java.util.concurrent.Future<V> {

    // 是否成功
    boolean isSuccess();

    // 是否可取消
    boolean isCancellable();

    // 如果任务执行失败 这个方法返回异常信息
    Throwable cause();

    // 添加 Listener 来进行回调
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    // 阻塞等待任务结束 如果任务失败 将“导致失败的异常”重新抛出来
    Future<V> sync() throws InterruptedException;
    // 不响应中断的 sync() 这个大家应该都很熟了
    Future<V> syncUninterruptibly();

    // 阻塞等待任务结束 和 sync() 功能是一样的 不过如果任务失败 它不会抛出执行过程中的异常
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);

    // 获取执行结果 不阻塞 我们都知道 java.util.concurrent.Future 中的 get() 是阻塞的
    V getNow();

    // 取消任务执行 如果取消成功 任务会因为 CancellationException 异常而导致失败
    //      也就是 isSuccess()==false 同时上面的 cause() 方法返回 CancellationException 的实例 
    // mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能停止该任务的执行) 
    //       似乎 Netty 中 Future 接口的各个实现类 都没有使用这个参数
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

上面的Netty中的Future接口,它添加了sync()和await()方法用于阻塞等待,还加了Listeners,只要任务结束就去回调Listener,所以我们不一定需要主动调用isDone()方法去获取任务的状态和get()方法阻塞的获取任务执行结果的值。

这里说一下sync()与await()方法的区别:sync方法内部会先调用await方法,等待await方法返回之后,再去检查一下这个任务是否失败,如果失败,则重新将导致失败的原因抛出来,如果使用await方法,任务抛出异常之后await方法会返回,但是不会抛出异常,而sync方法返回的同时会抛出异常。

这里可以看到Netty的Future接口还没有和IO操作关联在一起,所以接下来要介绍的就是Future接口的子接口ChannelFuture,这个接口用的最多,它将和IO操作关联在一起,用于异步处理Channel中的事件

public interface ChannelFuture extends Future<Void> {

    // ChannelFuture 关联的 Channel
    Channel channel();

    // 覆写以下几个方法,使得它们返回值为 ChannelFuture 类型 
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;
    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;
    @Override
    ChannelFuture awaitUninterruptibly();

    // 用来标记该 future 是 void 的,
    // 这样就不允许使用 addListener(...)  sync()  await() 以及它们的几个重载方法
    boolean isVoid();
}

ChannelFuture接口相对于Future接口,除了将channel关联进来,没有增加什么东西,其余都是重写了几个方法,让它们的返回值变成了ChannFuture,而不是原来的Future。

这里接下来要介绍Promise接口,它和ChannelFuture接口无关但是和前面的Future接口有关,Promise这个接口非常重要。

Promise接口和ChannelFuture接口一样,也继承了Future接口,然后添加了一些Promise的内容:

public interface Promise<V> extends Future<V> {

    // 标记该 future 成功及设置其执行结果 并且会通知所有的 listeners 
    // 如果该操作失败 将抛出异常(失败指的是该 future 已经有了结果了 成功的结果 或者失败的结果)
    Promise<V> setSuccess(V result);

    // 和 setSuccess 方法一样 只不过如果失败 它不抛异常 返回 false
    boolean trySuccess(V result);

    // 标记该 future 失败 及其失败原因 
    // 如果失败 将抛出异常(失败指的是已经有了结果了)
    Promise<V> setFailure(Throwable cause);

    // 标记该 future 失败 及其失败原因。
    // 如果已经有结果 返回 false 不抛出异常
    boolean tryFailure(Throwable cause);

    // 标记该 future 不可以被取消
    boolean setUncancellable();

    // 这里和 ChannelFuture 一样 对这几个方法进行覆写 目的是为了返回 Promise 类型的实例
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;
    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;
    @Override
    Promise<V> syncUninterruptibly();
}

Promise 实例内部是一个任务,任务的执行往往是异步的,通常是一个线程池来处理任务。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 将来会被某个执行任务的线程在执行完成以后调用,同时那个线程在调用 setSuccess(result) 或 setFailure(t) 后会回调 listeners 的回调函数(当然,回调的具体内容不一定要由执行任务的线程自己来执行,它可以创建新的线程来执行,也可以将回调任务提交到某个线程池来执行)。而且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的线程就会从等待中返回。

所以这里就有两种编程方式:

  1. 使用await()方法,等await()方法返回之后,得到promise的执行结果,然后处理它
  2. 提供Listener实例,我们不太关心这个任务什么时候执行完成,只要它执行完成之后去调用Listener中的处理方法就可以了

接下来就是介绍结合Promise接口和ChannelFuture接口的ChannelPromise接口了,它继承了前面介绍的ChannelFuture接口和Promise接口。

手把手教你如何从源码到应用——Netty(一)

/**
 * Special {@link ChannelFuture} which is writable.
 */
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    // 覆写 ChannelFuture 中的 channel() 方法 
    @Override
    Channel channel();

    // 下面几个方法是覆写 Promise 中的接口 为了返回值类型是 ChannelPromise
    @Override
    ChannelPromise setSuccess(Void result);
    ChannelPromise setSuccess();
    boolean trySuccess();
    @Override
    ChannelPromise setFailure(Throwable cause);

    // 到这里大家应该都熟悉了 下面几个方法的覆写也是为了得到 ChannelPromise 类型的实例
    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise sync() throws InterruptedException;
    @Override
    ChannelPromise syncUninterruptibly();
    @Override
    ChannelPromise await() throws InterruptedException;
    @Override
    ChannelPromise awaitUninterruptibly();

    /**
     * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
     */
    // 我们忽略这个方法 
    ChannelPromise unvoid();
}

看到它的源码就能知道,它没有增加什么新的功能,就是重写了几个方法将返回值全部变成ChannelPromise,下面一张图将这几个接口的方法列在一起,如下所示:

手把手教你如何从源码到应用——Netty(一)

接下来我们需要介绍一个实现类去直观的看出它们是如何使用的,因为上面的方法都是接口定义,所以接下来我们来看到DefaultPromise这个实现类,我们首先介绍关键的内容,然后介绍一个使用示例。

首先来看看DefaultPromise的源码,看看它有着哪些属性:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    // 保存执行结果
    private volatile Object result;
    // 执行任务的线程池 promise 持有 executor(执行器) 的引用 这个其实有点奇怪了
    // 因为“任务”其实没必要知道自己在哪里被执行的
    private final EventExecutor executor;
      // 监听者 回调函数 任务结束后(正常或异常结束)执行
    private Object listeners;

    // 等待这个 promise 的线程数(调用sync()/await()进行等待的线程数量)
    private short waiters;

    // 是否正在唤醒等待线程 用于防止重复执行唤醒 不然会重复执行 listeners 的回调方法
    private boolean notifyingListeners;
    ......
}

通过查看源码可以看出,这个类继承自Promise但是没有和ChannelFuture联系起来,但是我们后面会碰到另一个类就是DefaultChannelPromise这个类是综合了ChannelFuture和Promise的,但是它的实现大部分继承自这个DefaultPromise类的。

说完上面的属性以后,可以看下 setSuccess(V result) 、trySuccess(V result) 和 setFailure(Throwable cause) 、 tryFailure(Throwable cause) 这几个方法:

手把手教你如何从源码到应用——Netty(一)

通过上面这张图就能很好的看出setSuccess(result)和trySuccess(result)的区别了。

上面的几个方法都十分的简单,先设置好相应的值,然后执行监听者们的回调方法,notifyListeners还设计到了Netty的线程池的一些内容,先暂时跳过,后面会介绍,上面的代码中在setSuccess0 或 setFailure0 方法中都会唤醒阻塞在 sync() 或 await() 的线程。

另外可以看看sync()和await()的区别:

@Override
public Promise<V> sync() throws InterruptedException {
    await();
    // 如果任务是失败的,重新抛出相应的异常
    rethrowIfFailed();
    return this;
}

接下来介绍DefaultPromise的使用示例:

public static void main(String[] args) {

    // 构造线程池
    EventExecutor executor = new DefaultEventExecutor();

    // 创建 DefaultPromise 实例
    Promise promise = new DefaultPromise(executor);

    // 下面给这个 promise 添加两个 listener
    promise.addListener(new GenericFutureListener<Future<Integer>>() {
        @Override
        public void operationComplete(Future future) throws Exception {
            if (future.isSuccess()) {
                System.out.println("任务结束,结果:" + future.get());
            } else {
                System.out.println("任务失败,异常:" + future.cause());
            }
        }
    }).addListener(new GenericFutureListener<Future<Integer>>() {
        @Override
        public void operationComplete(Future future) throws Exception {
            System.out.println("任务结束,balabala...");
        }
    });

    // 提交任务到线程池,五秒后执行结束,设置执行 promise 的结果
    executor.submit(new Runnable() {
        @Override
        public void run() {
            try {
                // 模拟执行任务的过程
                Thread.sleep(5000);
            } catch (InterruptedException e) {
            }
            // 设置 promise 的结果
            // promise.setFailure(new RuntimeException());
            promise.setSuccess(123456);
        }
    });

    // main 线程阻塞等待执行结果
    try {
        promise.sync();
    } catch (InterruptedException e) {
    }
}

这里可以试试sync方法和await方法的区别,在任务中调用promise.setFailure(new RuntimeException())试试。

这里需要清楚一个点,那就是具体的任务不一定需要在这个executor中被执行,任务结束之后,需要调用promise.setSuccess(result)作为通知。

通常来说,promise代表的future是不需要和线程池搅合在一起的,future只需要关心任务是否结束以及任务的执行结果,至于是哪个线程执行的任务,future是不关心的

接下来回到这一小节开头的那种图,看看是不是能够看懂它了:

手把手教你如何从源码到应用——Netty(一)

现在就能很好的理解上图中划线的部分了,首先说左边这张图,main线程会调用bind方法返回一个ChannelFuture,bind方法是一个异步方法,当某个执行线程执行了真正的绑定操作之后,那个执行线程一定会标记这个future成功,然后这里的sync方法(main线程)就会返回了。

如果bind方法执行失败,那么sync方法就会抛出异常到finally块了。

一旦bind方法绑定成功,那么进入到下面一行,那么f.channel()方法就会返回该future关联的channel。channel.closeFuture()也会返回一个ChannelFuture,然后调用sync方法,就会阻塞在这里,这个sync方法的返回条件是:有其他线程关闭了NioServerSocketChannel,往往是因为需要停掉服务了,然后那个线程会设置 future 的状态( setSuccess(result) 或 setFailure(cause) ),这个 sync() 方法才会返回。

ChannelPipeline、Inbound、Outbound

在使用Netty的时候,我们通常只需要写一些自定义的handler就可以了,我们定义的这些handler会组成一个pipeline,用于处理IO事件,就和平时接触到的Filter、Interceptor表达的差不多是一个意思。

每个Channel内有一个Pipeline,Pipeline由多个handler组成,handler之间的顺序是十分重要的,因为IO事件将按照顺序依次经过pipeline中的handler,这样每个handler可以专注于做一件事情,由多个handler组合起来完成一些复杂的逻辑。

手把手教你如何从源码到应用——Netty(一)

从图中可以知道这是一个双向链表,首先我们需要了解两个十分重要的概念,就是Inbound事件和Outbound事件。

  • Outbound:out指的是出去,比如connect、write、flush这些IO操作就是往外部方向进行的,它们属于Outbound事件。
  • Inbound:就是和Outbound事件相反的,是往内部方向进行的事件,比如accept、read这种就属于Inbound事件。

比如现在有这样的一个流程,客户端需要写一些数据到服务端并且读一些数据到客户端中,就会经历下面三个步骤:

  1. connect到服务器端
  2. write数据传到服务器端
  3. read服务器返回数据

这里的connect和write就是out事件,read就是in事件。

下面这段代码初学者会有点歧义,这段代码用于服务端的childHandler中:

1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());

注意:这段代码是用于服务端中的,所以很多初学者任务应该是先进行decode客户端发送过来的数据,然后再使用BizHandler进行逻辑处理,处理完成之后再encode数据返回给客户端,所以handler的顺序应该为1->3->2,但是,这里的handler是分类的,分为Inbound(1和3)和OutBound(2):

1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());

我们再来分析上面这三行代码:

  • 客户端连接进来的时候,读取(read)客户端请求数据的操作是Inbound的,所以会先使用1,然后使用3对其进行处理
  • 处理完数据之后,返回给客户端数据的write操作是Outbound,此时使用的是2

所以虽然添加的顺序十分奇怪,但是执行的顺序是按照1->3->2去执行的。

如果我们在上面的基础上,加上下面的第四行,这是一个OutboundHandler:

4. pipeline.addLast(new OutboundHandlerA());

那么执行顺序就应该是1 -> 3 -> 4 -> 2,而不是1 -> 3 -> 2 -> 4,对于Inbound操作,按照添加顺序执行每个Inbound类型的handler,而对于Outbound操作,是反着来的,从后往前,顺序执行Outbound类型的handler。

下面就来介绍一下它们的接口使用:

手把手教你如何从源码到应用——Netty(一)

通过上面这张图,猜也能猜出一个大概了,定义处理Inbound事件的handler需要实现ChannelInboundHandler,定义处理Outbound事件的handler需要实现ChannelOutboundHandler。最下面的三个类,是Netty提供的适配器,如果我们想要定义一个handler能够同时处理Inbound事件和Outbound事件,可以通过继承中间下面的那个ChannelDuplexHandler的方式,比如LoggingHandler这种既可以用来处理Inbound可以用来处理Outbound事件的handler。

有了Inbound和Outbound的概念以后,就可以开始介绍Pipeline的源码了,提一下前面说过的一点,一个Channel中包含一个pipeline,一个pipeline中包含多个handler,handler的顺序十分重要,其中handler分为处理Inbound事件的handler和处理Outbound事件的handler和同时处理Inbound、Outbound事件的handler

那么开始分析Pipeline,既然一个Channel关联一个Pipeline,那么肯定是在Channel创建的时候进行关联,所以NioSocketChannel和NioServerSocketChannel在执行构造方法的时候会走到它们的父类AbstractChannel构造方法中去

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 给每个 channel 分配一个唯一 id
    id = newId();
    // 每个 channel 内部需要一个 Unsafe 的实例
    unsafe = newUnsafe();
    // 每个 channel 内部都会创建一个 pipeline
    pipeline = newChannelPipeline();
}

上面的三行代码中,id不怎么重要,unsafe实例挺重要的,这里进行简单介绍一下:


在 JDK 的源码中,sun.misc.Unsafe 类提供了一些底层操作的能力,它设计出来是给 JDK 中的源码使用的,比如 AQS、ConcurrentHashMap 等,这个 Unsafe 类不是给我们的代码使用的,是给 JDK 源码使用的(需要的话,我们也是可以获取它的实例的)。

Unsafe类的构造方法是private的,但是它提供了getUnsafe()方法,通过这个方法去获取Unsafe实例:

Unsafe unsafe = Unsafe.getUnsafe();

但是上面这行代码直接使用会抛java.lang.SecurityException异常,因为它就不是给我们的代码使用的,所以我们需要通过特殊手段去获取Unsafe实例,比如反射,进行暴力破解:

Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);

Netty中的Unsafe也是同样的意思,它封装了Netty中会使用到的JDK提供的NIO接口,比如将channel注册到selector上、bind操作、connect操作等,这些操作都是偏底层一些Netty同样也是不希望我们的业务代码使用Unsafe的实例,它是提供给Netty中的源码使用的

这里只需要暂时记住Unsafe封装了大部分需要访问JDK的NIO接口的操作就好了,这里继续把重心放在pipeline上:

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

这里开始调用DefaultChannelPipeline的构造方法,并且把当前channel的引用传入:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

这里实例化了tail和head这两个handler,其中tail实现了ChannelInboundHandler接口,而head实现了ChannelOutboundHandler和ChannelInboundHandler两个接口,并且最后两行代码将这两个节点连接了起来,组成了双向链表:

手把手教你如何从源码到应用——Netty(一)

注意:在不同的版本中,源码也有差异,head不一定是in+out。

pipeline中的每个元素都是ChannelHandlerContext的实例,而不是ChannelHandler的实例,context包装了一下handler,但是后面我们都会使用handler来描述一个pipeline上的节点,而不是使用context

这里只是构造了pipeline,并且添加了两个固定的handler到其中,还不设计到自定义的handler代码执行。现在回过头来看这段代码:

手把手教你如何从源码到应用——Netty(一)

这里的childHandler中指定的handler不是给NioServerSocketChannel使用的,是给NioSocketChannel使用的,所以这里暂时不看它。

这里调用handler方法指定了LoggingHandler实例,然后我们再进入下面的bind方法中去看看这个LoggingHandler实例是怎么注册到我们之前构造的pipeline内的。

顺着bind方法一直往里走,bind() -> doBind() -> initAndRegister():

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 1. 构造 channel 实例,同时会构造 pipeline 实例,
        // 现在 pipeline 中有 head 和 tail 两个 handler 了
        channel = channelFactory.newChannel();
        // 2. 看这里
        init(channel);
    } catch (Throwable t) {
    ......
}

上面的两行代码,第一行实现了构造channel和channel内部的pipeline,我们来看第二行的init代码:

// 这个方法是抽象方法 我们在它的实现类 ServerBootstrap 中查看相应的步骤
@Override
void init(Channel channel) throws Exception {
    ......
    // 拿到刚刚创建的 channel 内部的 pipeline 实例
    ChannelPipeline p = channel.pipeline();
    ...
    // 开始往 pipeline 中添加一个 handler 这个 handler 是 ChannelInitializer 的实例
    p.addLast(new ChannelInitializer<Channel>() {

        // 我们以后会看到 下面这个 initChannel 方法何时会被调用
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 这个方法返回我们最开始指定的 LoggingHandler 实例
            ChannelHandler handler = config.handler();
            if (handler != null) {
                // 添加 LoggingHandler
                pipeline.addLast(handler);
            }

            // 先不用管这里的 eventLoop
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 添加一个 handler 到 pipeline 中:ServerBootstrapAcceptor
                    // 从名字可以看到 这个 handler 的目的是用于接收客户端请求
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

在上述代码中,涉及到了pipeline中的辅助类ChannelInitializer,它本身就是一个handler类型(Inbound类型),但是它的作用和普通的handler有一点不一样,它纯粹是为了辅助将其他的handler加入到pipeline中的,这一点多看看源码就能知道了

简单的看一下 ChannelInitializer 的 initChannel 方法,看看它到底干了一些什么,它将我们指定的LoggingHandler和ServerBootstrapAcceptor这两个handler加入到了pipeline中,此时的pipeline如下图所示:

手把手教你如何从源码到应用——Netty(一)

ChannelInitializer 的 initChannel(channel) 方法被调用的时候,会往pipeline内添加我们最开始指定的LoggingHandler,并且还向内添加了一个ServerBootstrapAcceptor,但是现在不知道这个initChannel方法何时会被调用。

以上就是我们说的作为服务端的NioServerSocketChannel的pipeline,NioSocketChannel也是差不多的,我们可以看一下Bootstrap类的init方法:

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());
    ...
}

手把手教你如何从源码到应用——Netty(一)

它和服务端 ServerBootstrap 要添加 ServerBootstrapAcceptor 不一样,它只需要将 EchoClient 类中的 ChannelInitializer 实例加进来就可以了,它的 ChannelInitializer 中添加了两个 handler,LoggingHandler 和 EchoClientHandler:

手把手教你如何从源码到应用——Netty(一)

那么可以看出,我们需要的是LoggingHandler、EchoClientHandler,但是它们现在还不在pipeline中,那么什么时候会真正的进入到pipeline中呢?后面我们再揭晓。

注意:为什么Server端我们指定的是一个handler实例,而Client端指定的是一个ChannelInitializer实例呢?其实它们是可以随意搭配使用的,甚至可以在ChannelInitializer实例中添加ChannelInitializer的实例。

到目前为止,这里需要断开来了,下面需要开始介绍线程池了,大家需要记住pipeline现在长什么样子:head+channelInitializer+tail

这节没有介绍handler的向后传播,就是一个handler处理完成之后怎么传递给另一个handler来处理,比如Java EE中的Filter是采用在一个Filter实例中调用chain.doFilter(request,response)来传递给下一个Filter这种方式的。

下图展示了传播的方法,但是更加需要关注哪些事件是Inbound类型的,哪些是Outbound类型的:

手把手教你如何从源码到应用——Netty(一)

注意:bind方法也是Outbound类型的。

Netty 中的线程池 EventLoopGroup

接下来我们来分析Netty中的线程池,Netty中的线程池比较不好理解,因为它的类比较多,而且关系错综复杂,如下图,感受一下NioEventLoop类和NioEventLoopGroup类的继承结构:

手把手教你如何从源码到应用——Netty(一)

上图是按照继承关系整理而来,大家仔细看看就会发现涉及到的类有很多,本节就来给大家盘一盘这部分内容。

首先,我们说的线程池,指的是左图中的NioEventLoopGroup的实例,线程池中的线程指的就是NioEventLoop的实例。

在我们第一节介绍的Echo的例子中,不论是客户端还是服务端,我们可以发现我们总是先实例化NioEventLoopGroup:

// EchoClient 代码最开始:
EventLoopGroup group = new NioEventLoopGroup();

// EchoServer 代码最开始:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

下面就开始对NioEventLoopGroup的源码开始分析:

首先看到NioEventLoopGroup的构造方法源码,可以发现它有着多个构造方法用于参数设置,比如上面的代码new NioEventLoopGroup(1)中,我们只在实例化 bossGroup 的时候指定了参数,代表该线程池需要一个线程,源码如下所示:

public NioEventLoopGroup() {
    this(0);
}
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

...

// 参数最全的构造方法
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler) {
    // 调用父类的构造方法
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}

现在来介绍一下构造方法中的各个参数:

  • nThreads:线程池中的线程个数,也就是NioEventLoop的实例数量
  • executor:这里的executor实例是给NioEventLoop用的,现在就暂时只要记住这点就好了
  • chooserFactory:当我们提交一个任务到线程池的时候,线程池需要选择一个线程去执行这个任务,这个就是用来实现选择策略的
  • selectorProvider:通过它去实例化JDK的Selector,可以看到每个线程池都持有一个selectorProvider实例
  • selectStrategyFactory:这个是线程池中线程的工作流程,这个会在介绍NioEventLoop的时候会说
  • rejectedExecutionHandler:用于处理线程池中没有可用的线程去执行任务的情况,这个是给NioEventLoop实例使用的

通过上面的参数,可以发现里面是有几个参数是给NioEventLoop使用的,下面通过NioEventLoopGroup的其中一个参构造方法开始,走源码:

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

然后一步一步会跳到下面这个构造方法中:

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
    final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

通过源码可以发现,下面这几个参数被设置了默认值:

  1. selectorProvider = SelectorProvider.provider()

这个就是调用了JDK提供的方法

  1. selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE

这个是线程在做select操作和执行任务过程中的策略选择问题,在介绍NioEventLoop的时候会用到

  1. rejectedExecutionHandler = RejectedExecutionHandlers.reject()

其中reject方法的源码如下:

private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
        @Override
        public void rejected(Runnable task, SingleThreadEventExecutor executor) {
            throw new RejectedExecutionException();
        }
};

public static RejectedExecutionHandler reject() {
        return REJECT;
}

可以看出Netty默认的拒绝策略就是抛出异常。

回到NioEventLoopGroup的构造方法中,跟着源码走,会发现最后会走到父类 MultithreadEventLoopGroup 的构造方法中:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

可以发现,如果我们使用默认的构造函数,到这里的时候线程数会被设置为DEFAULT_EVENT_LOOP_THREADS,它的值就是当前CPU核心数 * 2,下面是它的取值逻辑:

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

继续跟着源码走:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}

发现到这一步的时候,new ThreadPerTaskExecutor(threadFactory) 会构造一个 executor。


首先我们看一下ThreadPerTaskExecutor的源码:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        // 为每个任务新建一个线程
        threadFactory.newThread(command).start();
    }
}

Executor作为线程池中最顶层的接口,它只有一个execute方法,从上述源码中就可以看出ThreadPerTaskExecutor的逻辑就是每来一个任务,就新建一个线程去执行


上一步设置完了executor,继续往下走源码:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

这一步设置了chooseFactory,用来实现从线程池中选择一个线程的选择策略。


ChooseFactory的逻辑很简单,我们看看DefaultEventExecutorChooserFactory的相关源码:

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

这里的设置策略就是:

  • 如果线程池的线程个数是2^n个,就使用PowerOfTwoEventExecutorChooser实例,这个类中有一个next方法,就是它的选择策略,相反,如果不是2^n个,那么就是用GenericEventExecutorChooser实例,类中的next方法就是按照取模的方式选择线程。

走到最后一个最后一个构造方法了,源码如下所示:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    // executor 如果是 null 做一次和前面一样的默认设置 就是来一个任务就新建立一个线程
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 这里的 children 数组非常重要 它就是线程池中的线程数组 这么说不太严谨 但是就大概这个意思
    children = new EventExecutor[nThreads];

    // 下面这个 for 循环将实例化 children 数组中的每一个元素
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 实例化!!!!!!
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // 如果有一个 child 实例化失败 那么 success 就会为 false 然后进入下面的失败处理逻辑
            if (!success) {
                // 把已经成功实例化的“线程” shutdown shutdown 是异步操作
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                // 等待这些线程成功 shutdown
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // 把中断状态设置回去 交给关心的线程来处理 
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // ================================================
    // === 到这里,就是代表上面的实例化所有线程已经成功结束 ===
    // ================================================

    // 通过之前设置的 chooserFactory 来实例化 Chooser 把线程池数组传进去 
    //     这就不必再说了吧 实现线程选择策略
    chooser = chooserFactory.newChooser(children);

    // 设置一个 Listener 用来监听该线程池的 termination 事件
    // 下面的代码逻辑是:给池中每一个线程都设置这个 listener 当监听到所有线程都 terminate 以后 这个线程池就算真正的 terminate 了 
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    // 设置 readonlyChildren 它是只读集合 以后用到再说
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

上述源码看看注释应该就都理解了,接下来我们主要看这行代码children[i] = newChild(executor, args);关注于newChild()方法,它就是用于创建线程池中的线程。

注意:这里说的线程不是说Thread,指的是NioEventLoop,但是每个NioEventLoop内部都会持有一个线程实例Thread,后面会看到这个Thread在何时会被实例化。

newChild方法在NioEventLoopGroup被重写了,源码如下所示:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

可以看到它调用了NioEventLoop的构造方法,源码如下所示:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    // 调用父类构造器
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 开启 NIO 中最重要的组件:Selector
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

我们先对上面进行一个简单的总结,再继续往下走。

  • 在Netty中NioEventLoopGroup就是线程池,线程池中的一个个线程,就是NioEventLoop
  • 线程池NioEventLoopGroup是线程池中每个线程NioEventLoop的parent,看构造方法就能看出来
  • 每个NioEventLoop都有自己的Selector
  • executor、selectStrategy 和 rejectedExecutionHandler 从 NioEventLoopGroup 中一路传到了 NioEventLoop 中

这个时候,我们来看看这个线程NioEventLoop的源码,看看它拥有哪些属性:

private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

private final AtomicBoolean wakenUp = new AtomicBoolean();

private final SelectStrategy selectStrategy;

private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;

结合它的构造方法,来进行一个总结:

  1. provider:它由 NioEventLoopGroup 传进来,前面我们说了一个线程池有一个 selectorProvider,用于创建 Selector 实例
  2. selector:这里需要注意的是和Java中的NIO还是有一定的区别的,通过selector属性就能知道,Netty中的selector是线程池中的每一个线程都持有一个selector实例
  3. selectStrategy:select操作的策略
  4. ioRatio:这个属性是IO任务的执行时间比例,因为每个线程都可能有着IO任务,也可能有非IO任务,所以这个参数是保证线程有足够的时间是给IO任务的

然后继续走NioEventLoop的构造方法,上面看到它会调用父类的构造方法,也就是会走到SingleThreadEventLoop类中:

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

    // 我们可以直接忽略这个东西 以后我们也不会再介绍它
    tailTasks = newTaskQueue(maxPendingTasks);
}

然后它的构造方法又调用了父类SingleThreadEventExecutor的构造方法:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    // taskQueue 这个东西很重要 提交给 NioEventLoop 的任务都会进入到这个 taskQueue 中等待被执行
    // 这个 queue 的默认容量是 16
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

这里我们进行捋捋,NioEventLoop的父类是SingleThreadEventLoop,通过名字可以看出这是一个单例线程,而SingleThreadEventLoop的父类是SingleThreadEventExecutor,注意,这里有一个关键字,Executor,线程池,说明它是一个线程池,并且是Single Thread 单线程的线程池。

也就是说,线程池NioEventLoopGroup中的每一个线程NioEventLoop其实可以说是一个线程池,但是这个线程池中只有一个线程。

上面的构造函数看起来还是十分简单的:

  • 设置了parent,也就是前面实例化线程池的NioEventLoopGroup实例
  • executor:它就是我们之前实例化的ThreadPerTaskExecutor,这个东西在线程池NioEventLoopGroup中是没有作用的,它主要是用在NioEventLoop中的,后面会说到它主要是用于NioEventLoop中创建线程实例(Thread实例)的
  • taskQueue:它就是任务队列,这个也比较好理解,NioEventLoop需要负责IO事件和非IO事件,通常它都在执行selector的select方法,或者正在处理selectedKeys,如果此时我们submit一个任务给它,任务就会被放到taskQueue中,等它来轮询,该队列是线程安全的LinkedBlockingQueue,默认容量为16
  • rejectedExecutionHandler:拒绝策略,taskQueue的默认容量是16,如果submit的任务超过了16个,那么再往里提交任务就会触发这个handler的拒绝策略,前面我们也提高过,Netty默认就是抛出异常

然后回到NioEventLoop的构造方法中:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    // 我们刚刚说完了这个
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 创建 selector 实例
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;

    selectStrategy = strategy;
}

可以看到里面最重要的方法就是openSelector方法了,它创建了NIO中最重要的一个组件Selector,在这个方法中,Netty也做了一些优化,到这里为止,线程池NioEventLoopGroup已经创建完成了,并且线程池中的每一个线程NioEventLoop都已经实例化好了,但是NioEventLoop中的Thread实例还没有被实例化!后面会说到,创建线程(Thread实例)的时机在第一个任务提交过来的时候,也就是 下一篇文章会讲到的 channel 的 register 操作。

转载自:https://juejin.cn/post/7228440310463709221
评论
请登录