likes
comments
collection
share

手把手教你如何从源码到应用——Netty(二)

作者站长头像
站长
· 阅读数 44

前言

首先需要说明的是,这篇文章不是原创,是在大佬的基础上修正了一点内容发出来的,算是我学习跟着大佬的文章一步一步走来的学习路程,他的文章里有一些初学者不理解的点,包括里面的笔误和分析源码的流程的失误,我都有修正,但是,不是原创,不是原创,不是原创!!!!原文链接在这里!!!不过看在我整理了这么多的份上,各位大哥能不能点个赞给小弟。

大佬没有更新第三篇,但是我会努力尽快肝出第三篇,紧跟大佬的步伐!

大家有什么想看的源码可以评论我,我会努力肝出来!

Spring源码以及Spring MVC源码已经在肝了!

Channel的register操作

在上一篇文章的最后,我们就介绍了Netty中的NioEventLoopGroup线程池的创建,以及线程池内的线程NioEventLoop的创建,但是NioEventLoop创建完成之后,并没有实例化其内部的Thread实例,创建线程Thread实例的时机就在于第一个任务提交过来的时候,那么第一个任务是什么呢?这就是我们将要说的channel的register操作了。

register

我们不论是从EchoClient的connect方法还是EchoServer的bind方法出发,最终都会走到initAndRegister这个方法:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 1
        channel = channelFactory.newChannel();
        // 2 对于 Bootstrap 和 ServerBootstrap,这里面有些不一样
        init(channel);
    } catch (Throwable t) {
        ...
    }
    // 3 我们这里要说的是这行
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

这里我们回顾一下initAndRegister这个方法,前面已经说过这个方法了:Channel的实例化,实例化过程中会执行Channel内部Unsafe和Pipeline的实例化,以及在init(channel)方法中,会往pipeline中添加handler(pipeline此时是head+channelnitializer+tail)。

这节我们就要开始揭晓ChannelInitializer中的initChannel方法了。

现在,我们继续往下走源码,到register这一步:

ChannelFuture regFuture = config().group().register(channel);

register是十分关键的一步,回想一下channel中的一些情况,在之前,我们实例化了JDK底层的Channel,设置了非阻塞,实例化了Unsafe,实例化了Pipeline,并且往Pipeline中添加了head、一个ChannelInitializer实例、tail。

上面的config.group()方法会返回前面实例化的NioEventLoopGroup的实例,然后调用其register(channel)方法:

// MultithreadEventLoopGroup
@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

这里的next方法是不是很眼熟,就是chooseFactory,也就是选择一个NioEventLoop实例,所以这个时候我们就进入到了NioEventLoop中了,NioEventLoop的register(channel)的方法实现,在它的父类SingleThreadEventLoop中(SingleThreadEventLoop它的父类记住是一个SingleThreadEventExecutor)。

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

上面的代码实例化了一个Promise,将当前channel传入其中:

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // promise 关联了 channel channel 持有 Unsafe 实例 register 操作就封装在 Unsafe 中
    promise.channel().unsafe().register(this, promise);
    return promise;
}

拿到Channel中关联的Unsafe实例,然后调用它的register方法:

// AbstractChannel#AbstractUnsafe
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
    // 将这个 eventLoop 实例设置给这个 channel 从此这个 channel 就是有 eventLoop 的了
    // 我觉得这一步其实挺关键的 因为后续该 channel 中的所有异步操作 都要提交给这个 eventLoop 来执行
    AbstractChannel.this.eventLoop = eventLoop;

    // 如果发起 register 动作的线程就是 eventLoop 实例中的线程,那么直接调用 register0(promise)
    // 对于我们来说 它不会进入到这个分支,
    //     之所以有这个分支 是因为我们是可以 unregister 然后再 register 的 后面再仔细看
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 否则 提交任务给 eventLoop eventLoop 中的线程会负责调用 register0(promise)
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}

对于我们前面过来的register操作,其实提交到了eventLoop后会直接返回一个Promise,里面的register0方法是异步操作,它会交由NioEventLoop实例去完成,这里先不再继续往下分析register0方法,先介绍清楚NioEventLoop中的线程(Thread)介绍清楚,再回来分析这个register0方法。


Channel实例一旦register到了NioEventLoopGroup实例中的某一个NioEventLoop实例上,那么后续该Channel的所有操作,都是由该NioEventLoop来完成的。

因为Selector实例是在NioEventLoop实例中的,Channel实例一旦注册到某个Selector实例中,当然也就只能在这个实例中处理NIO事件。


NioEventLoop工作流程

前面我们在分析NioEventLoopGroup线程池实例化的时候说过,NioEventLoop中并没有启动Java线程,也就是没有实例化其中的Thread实例,现在来分析register过程中调用的eventLoop.execute(runnable)这个方法,这个代码在父类的SingleThreadEventExecutor中:

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    // 判断添加任务的线程是否就是当前 EventLoop 中的线程
    boolean inEventLoop = inEventLoop();

    // 添加任务到之前介绍的 taskQueue 中,
    //     如果 taskQueue 满了(默认大小 16) 根据我们之前说的 默认的策略是抛出异常
    addTask(task);

    if (!inEventLoop) {
        // 如果不是 NioEventLoop 内部线程提交的 task 那么判断下线程是否已经启动 没有的话 就启动线程
        startThread();
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

所以这里就看到了启动NioEventLoop中的线程的方法在这里,另外register操作进入到了taskQueue中,它被归到了非IO操作的范畴。

下面是startThread方法的源码,判断线程是否已经启动来决定是否要进行启动操作:

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

按照前面的思路,第一次进来肯定不是NioEventLoop内的线程提交的task,此时NioEventLoop内的线程Thread实例都为null,怎么提交task呢?根据线程没有启动的情况,来看看doStartThread()方法:

private void doStartThread() {
    assert thread == null;
    // 这里的 executor 大家是不是有点熟悉的感觉,它就是一开始我们实例化 NioEventLoop 的时候传进来的 ThreadPerTaskExecutor 的实例 它是每次来一个任务 创建一个线程的那种 executor 
    // 一旦我们调用它的 execute 方法 它就会创建一个新的线程 所以这里终于会创建 Thread 实例
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 看这里 将 “executor” 中创建的这个线程设置为 NioEventLoop 的线程!!!
            thread = Thread.currentThread();

            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 执行 SingleThreadEventExecutor 的 run() 方法 它在 NioEventLoop 中实现了
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // ... 我们直接忽略掉这里的代码
            }
        }
    });
}

所以看到 这里,才知道什么时候NioEventLoop内的Thread属性被实例化,上面的这个线程启动之后,会去执行NioEventLoop的run方法,这个方法肯定是没有那么容易结束的,必然是像JDK线程池的Worker那样,不断循环获取新的任务的。它需要不断的做select操作和轮询taskQueue这个队列。

先来简单看一下它的源码:

@Override
protected void run() {
    // 代码嵌套在 for 循环中
    for (;;) {
        try {
            // selectStrategy 终于要派上用场了
            // 它有两个值 一个是 CONTINUE 一个是 SELECT
            // 针对这块代码 我们分析一下 
            // 1. 如果 taskQueue 不为空 也就是 hasTasks() 返回 true 
            //         那么执行一次 selectNow() 该方法不会阻塞
            // 2. 如果 hasTasks() 返回 false 那么执行 SelectStrategy.SELECT 分支 
            //    进行 select(...) 这块是带阻塞的
            // 这个很好理解 就是按照是否有任务在排队来决定是否可以进行阻塞
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    // 如果 !hasTasks() 那么进到这个 select 分支 这里 select 带阻塞的
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }


            cancelledKeys = 0;
            needsToSelectAgain = false;
            // 默认地 ioRatio 的值是 50
            final int ioRatio = this.ioRatio;

            if (ioRatio == 100) {
                // 如果 ioRatio 设置为 100 那么先执行 IO 操作 然后在 finally 块中执行 taskQueue 中的任务
                try {
                    // 1. 执行 IO 操作 因为前面 select 以后 可能有些 channel 是需要处理的 
                    processSelectedKeys();
                } finally {
                    // 2. 执行非 IO 任务,也就是 taskQueue 中的任务
                    runAllTasks();
                }
            } else {
                // 如果 ioRatio 不是 100 那么根据 IO 操作耗时 限制非 IO 操作耗时
                final long ioStartTime = System.nanoTime();
                try {
                    // 执行 IO 操作
                    processSelectedKeys();
                } finally {
                    // 根据 IO 操作消耗的时间 计算执行非 IO 操作(runAllTasks)可以用多少时间 
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

上面这段代码就是NioEventLoop的核心了,这里介绍两点:

  • 首先会根据hasTask的结果来决定执行selectNow()还是select(oldWakenUp),这个很好理解,只要有任务在等待,那么就应该使用无阻塞的selectNow方法,如果没有任务在等待,那么应该使用带阻塞的select操作等待任务。
  • ioRatio 控制 IO 操作所占的时间比重:
    • 如果设置为100%,那么先执行IO操作,然后再执行任务队列中的任务
    • 如果不是100%,那么先执行IO操作,然后执行任务队列中的任务,但是需要控制执行任务的总时间,也就是说,非IO操作可以占用的时间,通过ioRatio以及这次IO操作耗时计算得出。

我们这里先不用去关心里面的实现细节,只需要理解它们分别做了什么事情就可以了。

现在回到register这里来,我们在前面register的时候提交了一个register任务给NioEventLoop,这是NioEventLoop接收到的第一个任务,所以会在这里实例化一个Thread并且启动,然后会进入到NioEventLoop中的run方法中。

继续register

回到我们前面的register0(promise)这个方法,我们知道这个register任务进入到了NioEventLoop的taskQueue中,然后会调用startThread方法去启动NioEventLoop中的线程,该线程会轮询这个taskQueue,然后执行这个register任务。

注意:此时是eventLoop中的线程:

// AbstractChannel
private void register0(ChannelPromise promise) {
    try {
        ...
        boolean firstRegistration = neverRegistered;
        // *** 进行 JDK 底层的操作:Channel 注册到 Selector 上 ***
        doRegister();

        neverRegistered = false;
        registered = true;
        // 到这里 就算是 registered 了

        // 这一步也很关键 因为这涉及到了 ChannelInitializer 的 init(channel)
        // 我们之前说过 init 方法会将 ChannelInitializer 内部添加的 handlers 添加到 pipeline 中
        pipeline.invokeHandlerAddedIfNeeded();

        // 设置当前 promise 的状态为 success
        //   因为当前 register 任务是在 eventLoop 中的线程(Thread)中执行的 需要通知提交 register 任务的线程
        safeSetSuccess(promise);

        // 当前的 register 任务已经成功 该事件应该被 pipeline 上
        //   所有关心 register 事件的 handler 感知到 往 pipeline 中扔一个事件
        pipeline.fireChannelRegistered();

        // 这里 active 指的是 channel 已经打开
        if (isActive()) {
            // 如果该 channel 是第一次执行 register,那么 fire ChannelActive 事件
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // 该 channel 之前已经 register 过了
                // 这里让该 channel 立马去监听通道中的 OP_READ 事件
                beginRead();
            }
        }
    } catch (Throwable t) {
        ...
    }
}

首先说说doRegister方法,然后再说说pipeline:

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 附 JDK 中 Channel 的 register 方法:
            // public final SelectionKey register(Selector sel, int ops, Object att) {...}
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            ...
        }
    }
}

从源码中可以看到,这里做了JDK底层的register操作,将SocketChannel(或ServerSocketChannel)注册到Selector中,并且可以看到,这里将监听集合设置为了0,也就是什么都不监听。

后续肯定是有一个地方修改了这个selectionKey的监听集合,否则什么都干不了。

我们先重点来说说pipeline操作,我们在介绍NioSocketChannel的pipeline的时候,介绍到了,我们的pipeline长下面这个样子:

手把手教你如何从源码到应用——Netty(二)

现在我们将会看到把LoggingHandler和EchoClientHandler添加到pipeline中,前面介绍的时候并没有真正的添加。

继续走源码,register成功之后,执行了以下操作:

pipeline.invokeHandlerAddedIfNeeded();

在这里进行跟踪的话,可以发现这一步会执行到pipeline中ChannelInitializer实例的handlerAdded方法,在这个方法中会执行它的init(context)方法

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

然后我们看到initChannel(ctx),这里终于到了之前介绍过的init(channel)方法:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            // 1. 将把我们自定义的 handlers 添加到 pipeline 中
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            ...
        } finally {
            // 2. 将 ChannelInitializer 实例从 pipeline 中删除
            remove(ctx);
        }
        return true;
    }
    return false;
}

前面也说到过,ChannelInitializer的init(channel)方法被执行之后,内部添加的handlers会进入到pipeline中,然后在finally块中会将ChannelInitializer的实例从pipeline中删除,那么此时pipeline就算建立起来了,如下图所示:

手把手教你如何从源码到应用——Netty(二)

如果我们在ChannelInitializer中添加的是一个ChannelInitializer实例呢?这个怎么解决?——这个我会在后面的文章写一个流程出来,这个也很简单,只要自己在代码中写上一个添加ChannelInitializer实例,然后DEBUG走一遍就清楚了

pipeline建立了以后,我们继续往下走,就会执行到这一句:

pipeline.fireChannelRegistered();

我们只需要摸清楚了fireChannelRegistered()这个方法,以后碰到像 fireChannelActive()、fireXxx() 等就知道怎么回事了,它们都是类似的,来看看以下代码会发生什么:

// DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelRegistered() {
    // 注意这里的传参是 head
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

可以看到,我们往pipeline中扔了一个channelRegistered事件,这里的register属于Inbound事件,pipeline接下来要做的就是执行pipeline中Inbound类型的handlers中的channelRegistered()方法。

从上面的代码可以看出,往pipeline中扔出channelRegistered事件以后,第一个处理的handler就是head。

接下来继续跟着代码走,此时我们来到了pipeline中第一个节点head的处理中:

//AbstractChannelHandlerContext
// next 此时是 head
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {

    EventExecutor executor = next.executor();
    // 执行 head 的 invokeChannelRegistered()
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

也就是说,这里会先执行 head.invokeChannelRegistered() 方法,而且是放到 NioEventLoop 中的 taskQueue 中执行的:

// AbstractChannelHandlerContext
private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            // handler() 方法此时会返回 head
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

我们去看 head 的 channelRegistered 方法:

// HeadContext
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // 1. 这一步是 head 对于 channelRegistered 事件的处理 没有我们要关心的
    invokeHandlerAddedIfNeeded();
    // 2. 向后传播 Inbound 事件
    ctx.fireChannelRegistered();
}

然后head会执行fireChannelRegistered方法,向后传播Inbound事件:

// AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRegistered() {
    // 这里很关键
    // findContextInbound() 方法会沿着 pipeline 找到下一个 Inbound 类型的 handler
    invokeChannelRegistered(findContextInbound());
    return this;
}

注意:pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline 中,pipeline 中的 handlers 准备处理该事件。而 context.fireChannelRegistered() 是一个 handler 处理完了以后,向后传播给下一个 handler。两个方法名字是一样的,但是来自不同的类。

findContextInbound方法会找到下一个Inbound类型的handler,然后重复上面的几个方法。

上面这块代码没必要太纠结,总之就是从 head 中开始,依次往下寻找所有 Inbound handler,执行其 channelRegistered(ctx) 操作。

到这里,register操作算是完成了,下面回到initAndRegister方法中:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        ...
    }

    // 我们上面说完了这行
    ChannelFuture regFuture = config().group().register(channel);

    // 如果在 register 的过程中 发生了错误
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // 源码中说得很清楚 如果到这里 说明后续可以进行 connect() 或 bind() 了 因为两种情况:
    // 1. 如果 register 动作是在 eventLoop 中发起的 那么到这里的时候 register 一定已经完成
    // 2. 如果 register 任务已经提交到 eventLoop 中 也就是进到了 eventLoop 中的 taskQueue 中 
    //    由于后续的 connect 或 bind 也会进入到同一个 eventLoop 的 queue 中 所以一定是会先 register 成功 才会执行 connect 或 bind
    return regFuture;
}

不论是服务端的NioServerSocketChannel还是客户端的NioSocketChannel,在bind或connect时,都会先进入initAndRegister这个方法,所以我们上面说的那些对于两者都是通用的。

一定要记住,register操作是十分重要的,要知道这一步大概做了哪些事情,register操作以后,将进入到bind或者connect操作中。

connect过程和bind过程分析

上节说的是register操作,它十分重要,它建立起来了很多的东西,它是 Netty 中 NioSocketChannel 和 NioServerSocketChannel 开始工作的起点。

这节主要说说 register 之后的 connect 操作和 bind 操作。

connect过程分析

对于客户端NioSocketChannel来说,完成了register以后,就要开始connect了,这一步将连接到服务端。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 这里完成了 register 操作
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    // 这里我们不去纠结 register 操作是否 isDone()
    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        // 看这里
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        ....
    }
}

这里最后会走到AbstractChannel 的 connect 方法:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

从源码中可以看到,connect操作是交给pipeline来执行的,进入pipeline中,我们会发现,connect这种Outbound类型的操作,是从pipeline的tail开始的:

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

接下来就是pipeline的操作了,从tail开始,执行pipeline上的Outbound类型的handlers的connect()方法,那么真正的底层的connect是发生在哪里?首先来看看我们的pipeline图:

手把手教你如何从源码到应用——Netty(二)

从 tail 开始往前找 Outbound 类型的 handlers,每经过一个 handler,都执行里面的 connect() 方法,最后会到 head 中,因为 head 也是 Outbound 类型的,我们需要的 connect 操作就在 head 中,它会负责调用 unsafe 中提供的 connect 方法:

// HeadContext
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

接下来我们来看一看connect在unsafe类的底层操作:

// AbstractNioChannel.AbstractNioUnsafe
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        ......

        boolean wasActive = isActive();
        // 大家自己点进去看 doConnect 方法
        // 这一步会做 JDK 底层的 SocketChannel connect 然后设置 interestOps 为 SelectionKey.OP_CONNECT
        // 返回值代表是否已经连接成功
        if (doConnect(remoteAddress, localAddress)) {
            // 处理连接成功的情况
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // 下面这块代码 在处理连接超时的情况 代码很简单
            // 这里用到了 NioEventLoop 的定时任务的功能 这个我们之前一直都没有介绍过 因为我觉得也不太重要
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

如果上面的 doConnect 方法返回 false,那么后续是怎么处理的呢?

在上一节介绍的 register 操作中,channel 已经 register 到了 selector 上,只不过将 interestOps 设置为了 0,也就是什么都不监听,而在上面的 doConnect 方法中,我们看到它在调用底层的 connect 方法后,会设置 interestOps 为 SelectionKey.OP_CONNECT。

剩下的就是 NioEventLoop 的事情了,还记得 NioEventLoop 的 run() 方法吗?也就是说这里的 connect 成功以后,这个 TCP 连接就建立起来了,后续的操作会在 NioEventLoop.run() 方法中被 processSelectedKeys() 方法处理掉。

bind过程分析

说完 connect 过程,我们再来简单看下 bind 过程:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // **前面说的 initAndRegister**
    final ChannelFuture regFuture = initAndRegister();

    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // register 动作已经完成,那么执行 bind 操作
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ......
    }
}

一直往里走代码,最终会发现bind 操作也是要由 pipeline 来完成的:

// AbstractChannel
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

bind 操作和 connect 一样,都是 Outbound 类型的,所以都是 tail 开始:

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

最后的 bind 操作和connect操作一样,又到了 head 中,由 head 来调用 unsafe 提供的 bind 方法:

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

bind 方法,非常简单,bind 操作也不是什么异步方法,就暂时介绍到这里了。

转载自:https://juejin.cn/post/7228498721784954940
评论
请登录