手把手教你如何从源码到应用——Netty(二)
前言
首先需要说明的是,这篇文章不是原创,是在大佬的基础上修正了一点内容发出来的,算是我学习跟着大佬的文章一步一步走来的学习路程,他的文章里有一些初学者不理解的点,包括里面的笔误和分析源码的流程的失误,我都有修正,但是,不是原创,不是原创,不是原创!!!!原文链接在这里!!!不过看在我整理了这么多的份上,各位大哥能不能点个赞给小弟。
大佬没有更新第三篇,但是我会努力尽快肝出第三篇,紧跟大佬的步伐!
大家有什么想看的源码可以评论我,我会努力肝出来!
Spring源码以及Spring MVC源码已经在肝了!
Channel的register操作
在上一篇文章的最后,我们就介绍了Netty中的NioEventLoopGroup线程池的创建,以及线程池内的线程NioEventLoop的创建,但是NioEventLoop创建完成之后,并没有实例化其内部的Thread实例,创建线程Thread实例的时机就在于第一个任务提交过来的时候,那么第一个任务是什么呢?这就是我们将要说的channel的register操作了。
register
我们不论是从EchoClient的connect方法还是EchoServer的bind方法出发,最终都会走到initAndRegister这个方法:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1
channel = channelFactory.newChannel();
// 2 对于 Bootstrap 和 ServerBootstrap,这里面有些不一样
init(channel);
} catch (Throwable t) {
...
}
// 3 我们这里要说的是这行
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
这里我们回顾一下initAndRegister这个方法,前面已经说过这个方法了:Channel的实例化,实例化过程中会执行Channel内部Unsafe和Pipeline的实例化,以及在init(channel)方法中,会往pipeline中添加handler(pipeline此时是head+channelnitializer+tail)。
这节我们就要开始揭晓ChannelInitializer中的initChannel方法了。
现在,我们继续往下走源码,到register这一步:
ChannelFuture regFuture = config().group().register(channel);
register是十分关键的一步,回想一下channel中的一些情况,在之前,我们实例化了JDK底层的Channel,设置了非阻塞,实例化了Unsafe,实例化了Pipeline,并且往Pipeline中添加了head、一个ChannelInitializer实例、tail。
上面的config.group()方法会返回前面实例化的NioEventLoopGroup的实例,然后调用其register(channel)方法:
// MultithreadEventLoopGroup
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
这里的next方法是不是很眼熟,就是chooseFactory,也就是选择一个NioEventLoop实例,所以这个时候我们就进入到了NioEventLoop中了,NioEventLoop的register(channel)的方法实现,在它的父类SingleThreadEventLoop中(SingleThreadEventLoop它的父类记住是一个SingleThreadEventExecutor)。
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
上面的代码实例化了一个Promise,将当前channel传入其中:
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// promise 关联了 channel channel 持有 Unsafe 实例 register 操作就封装在 Unsafe 中
promise.channel().unsafe().register(this, promise);
return promise;
}
拿到Channel中关联的Unsafe实例,然后调用它的register方法:
// AbstractChannel#AbstractUnsafe
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 将这个 eventLoop 实例设置给这个 channel 从此这个 channel 就是有 eventLoop 的了
// 我觉得这一步其实挺关键的 因为后续该 channel 中的所有异步操作 都要提交给这个 eventLoop 来执行
AbstractChannel.this.eventLoop = eventLoop;
// 如果发起 register 动作的线程就是 eventLoop 实例中的线程,那么直接调用 register0(promise)
// 对于我们来说 它不会进入到这个分支,
// 之所以有这个分支 是因为我们是可以 unregister 然后再 register 的 后面再仔细看
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 否则 提交任务给 eventLoop eventLoop 中的线程会负责调用 register0(promise)
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
对于我们前面过来的register操作,其实提交到了eventLoop后会直接返回一个Promise,里面的register0方法是异步操作,它会交由NioEventLoop实例去完成,这里先不再继续往下分析register0方法,先介绍清楚NioEventLoop中的线程(Thread)介绍清楚,再回来分析这个register0方法。
Channel实例一旦register到了NioEventLoopGroup实例中的某一个NioEventLoop实例上,那么后续该Channel的所有操作,都是由该NioEventLoop来完成的。
因为Selector实例是在NioEventLoop实例中的,Channel实例一旦注册到某个Selector实例中,当然也就只能在这个实例中处理NIO事件。
NioEventLoop工作流程
前面我们在分析NioEventLoopGroup线程池实例化的时候说过,NioEventLoop中并没有启动Java线程,也就是没有实例化其中的Thread实例,现在来分析register过程中调用的eventLoop.execute(runnable)这个方法,这个代码在父类的SingleThreadEventExecutor中:
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判断添加任务的线程是否就是当前 EventLoop 中的线程
boolean inEventLoop = inEventLoop();
// 添加任务到之前介绍的 taskQueue 中,
// 如果 taskQueue 满了(默认大小 16) 根据我们之前说的 默认的策略是抛出异常
addTask(task);
if (!inEventLoop) {
// 如果不是 NioEventLoop 内部线程提交的 task 那么判断下线程是否已经启动 没有的话 就启动线程
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
所以这里就看到了启动NioEventLoop中的线程的方法在这里,另外register操作进入到了taskQueue中,它被归到了非IO操作的范畴。
下面是startThread方法的源码,判断线程是否已经启动来决定是否要进行启动操作:
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
按照前面的思路,第一次进来肯定不是NioEventLoop内的线程提交的task,此时NioEventLoop内的线程Thread实例都为null,怎么提交task呢?根据线程没有启动的情况,来看看doStartThread()方法:
private void doStartThread() {
assert thread == null;
// 这里的 executor 大家是不是有点熟悉的感觉,它就是一开始我们实例化 NioEventLoop 的时候传进来的 ThreadPerTaskExecutor 的实例 它是每次来一个任务 创建一个线程的那种 executor
// 一旦我们调用它的 execute 方法 它就会创建一个新的线程 所以这里终于会创建 Thread 实例
executor.execute(new Runnable() {
@Override
public void run() {
// 看这里 将 “executor” 中创建的这个线程设置为 NioEventLoop 的线程!!!
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行 SingleThreadEventExecutor 的 run() 方法 它在 NioEventLoop 中实现了
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ... 我们直接忽略掉这里的代码
}
}
});
}
所以看到 这里,才知道什么时候NioEventLoop内的Thread属性被实例化,上面的这个线程启动之后,会去执行NioEventLoop的run方法,这个方法肯定是没有那么容易结束的,必然是像JDK线程池的Worker那样,不断循环获取新的任务的。它需要不断的做select操作和轮询taskQueue这个队列。
先来简单看一下它的源码:
@Override
protected void run() {
// 代码嵌套在 for 循环中
for (;;) {
try {
// selectStrategy 终于要派上用场了
// 它有两个值 一个是 CONTINUE 一个是 SELECT
// 针对这块代码 我们分析一下
// 1. 如果 taskQueue 不为空 也就是 hasTasks() 返回 true
// 那么执行一次 selectNow() 该方法不会阻塞
// 2. 如果 hasTasks() 返回 false 那么执行 SelectStrategy.SELECT 分支
// 进行 select(...) 这块是带阻塞的
// 这个很好理解 就是按照是否有任务在排队来决定是否可以进行阻塞
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 如果 !hasTasks() 那么进到这个 select 分支 这里 select 带阻塞的
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
// 默认地 ioRatio 的值是 50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
// 如果 ioRatio 设置为 100 那么先执行 IO 操作 然后在 finally 块中执行 taskQueue 中的任务
try {
// 1. 执行 IO 操作 因为前面 select 以后 可能有些 channel 是需要处理的
processSelectedKeys();
} finally {
// 2. 执行非 IO 任务,也就是 taskQueue 中的任务
runAllTasks();
}
} else {
// 如果 ioRatio 不是 100 那么根据 IO 操作耗时 限制非 IO 操作耗时
final long ioStartTime = System.nanoTime();
try {
// 执行 IO 操作
processSelectedKeys();
} finally {
// 根据 IO 操作消耗的时间 计算执行非 IO 操作(runAllTasks)可以用多少时间
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
上面这段代码就是NioEventLoop的核心了,这里介绍两点:
- 首先会根据hasTask的结果来决定执行selectNow()还是select(oldWakenUp),这个很好理解,只要有任务在等待,那么就应该使用无阻塞的selectNow方法,如果没有任务在等待,那么应该使用带阻塞的select操作等待任务。
- ioRatio 控制 IO 操作所占的时间比重:
-
- 如果设置为100%,那么先执行IO操作,然后再执行任务队列中的任务
- 如果不是100%,那么先执行IO操作,然后执行任务队列中的任务,但是需要控制执行任务的总时间,也就是说,非IO操作可以占用的时间,通过ioRatio以及这次IO操作耗时计算得出。
我们这里先不用去关心里面的实现细节,只需要理解它们分别做了什么事情就可以了。
现在回到register这里来,我们在前面register的时候提交了一个register任务给NioEventLoop,这是NioEventLoop接收到的第一个任务,所以会在这里实例化一个Thread并且启动,然后会进入到NioEventLoop中的run方法中。
继续register
回到我们前面的register0(promise)这个方法,我们知道这个register任务进入到了NioEventLoop的taskQueue中,然后会调用startThread方法去启动NioEventLoop中的线程,该线程会轮询这个taskQueue,然后执行这个register任务。
注意:此时是eventLoop中的线程:
// AbstractChannel
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
// *** 进行 JDK 底层的操作:Channel 注册到 Selector 上 ***
doRegister();
neverRegistered = false;
registered = true;
// 到这里 就算是 registered 了
// 这一步也很关键 因为这涉及到了 ChannelInitializer 的 init(channel)
// 我们之前说过 init 方法会将 ChannelInitializer 内部添加的 handlers 添加到 pipeline 中
pipeline.invokeHandlerAddedIfNeeded();
// 设置当前 promise 的状态为 success
// 因为当前 register 任务是在 eventLoop 中的线程(Thread)中执行的 需要通知提交 register 任务的线程
safeSetSuccess(promise);
// 当前的 register 任务已经成功 该事件应该被 pipeline 上
// 所有关心 register 事件的 handler 感知到 往 pipeline 中扔一个事件
pipeline.fireChannelRegistered();
// 这里 active 指的是 channel 已经打开
if (isActive()) {
// 如果该 channel 是第一次执行 register,那么 fire ChannelActive 事件
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 该 channel 之前已经 register 过了
// 这里让该 channel 立马去监听通道中的 OP_READ 事件
beginRead();
}
}
} catch (Throwable t) {
...
}
}
首先说说doRegister方法,然后再说说pipeline:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 附 JDK 中 Channel 的 register 方法:
// public final SelectionKey register(Selector sel, int ops, Object att) {...}
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
从源码中可以看到,这里做了JDK底层的register操作,将SocketChannel(或ServerSocketChannel)注册到Selector中,并且可以看到,这里将监听集合设置为了0,也就是什么都不监听。
后续肯定是有一个地方修改了这个selectionKey的监听集合,否则什么都干不了。
我们先重点来说说pipeline操作,我们在介绍NioSocketChannel的pipeline的时候,介绍到了,我们的pipeline长下面这个样子:
现在我们将会看到把LoggingHandler和EchoClientHandler添加到pipeline中,前面介绍的时候并没有真正的添加。
继续走源码,register成功之后,执行了以下操作:
pipeline.invokeHandlerAddedIfNeeded();
在这里进行跟踪的话,可以发现这一步会执行到pipeline中ChannelInitializer实例的handlerAdded方法,在这个方法中会执行它的init(context)方法。
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
然后我们看到initChannel(ctx),这里终于到了之前介绍过的init(channel)方法:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
// 1. 将把我们自定义的 handlers 添加到 pipeline 中
initChannel((C) ctx.channel());
} catch (Throwable cause) {
...
} finally {
// 2. 将 ChannelInitializer 实例从 pipeline 中删除
remove(ctx);
}
return true;
}
return false;
}
前面也说到过,ChannelInitializer的init(channel)方法被执行之后,内部添加的handlers会进入到pipeline中,然后在finally块中会将ChannelInitializer的实例从pipeline中删除,那么此时pipeline就算建立起来了,如下图所示:
如果我们在ChannelInitializer中添加的是一个ChannelInitializer实例呢?这个怎么解决?——这个我会在后面的文章写一个流程出来,这个也很简单,只要自己在代码中写上一个添加ChannelInitializer实例,然后DEBUG走一遍就清楚了
pipeline建立了以后,我们继续往下走,就会执行到这一句:
pipeline.fireChannelRegistered();
我们只需要摸清楚了fireChannelRegistered()这个方法,以后碰到像 fireChannelActive()、fireXxx() 等就知道怎么回事了,它们都是类似的,来看看以下代码会发生什么:
// DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelRegistered() {
// 注意这里的传参是 head
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
可以看到,我们往pipeline中扔了一个channelRegistered事件,这里的register属于Inbound事件,pipeline接下来要做的就是执行pipeline中Inbound类型的handlers中的channelRegistered()方法。
从上面的代码可以看出,往pipeline中扔出channelRegistered事件以后,第一个处理的handler就是head。
接下来继续跟着代码走,此时我们来到了pipeline中第一个节点head的处理中:
//AbstractChannelHandlerContext
// next 此时是 head
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
// 执行 head 的 invokeChannelRegistered()
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
也就是说,这里会先执行 head.invokeChannelRegistered() 方法,而且是放到 NioEventLoop 中的 taskQueue 中执行的:
// AbstractChannelHandlerContext
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
// handler() 方法此时会返回 head
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
我们去看 head 的 channelRegistered 方法:
// HeadContext
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// 1. 这一步是 head 对于 channelRegistered 事件的处理 没有我们要关心的
invokeHandlerAddedIfNeeded();
// 2. 向后传播 Inbound 事件
ctx.fireChannelRegistered();
}
然后head会执行fireChannelRegistered方法,向后传播Inbound事件:
// AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRegistered() {
// 这里很关键
// findContextInbound() 方法会沿着 pipeline 找到下一个 Inbound 类型的 handler
invokeChannelRegistered(findContextInbound());
return this;
}
注意:pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline 中,pipeline 中的 handlers 准备处理该事件。而 context.fireChannelRegistered() 是一个 handler 处理完了以后,向后传播给下一个 handler。两个方法名字是一样的,但是来自不同的类。
findContextInbound方法会找到下一个Inbound类型的handler,然后重复上面的几个方法。
上面这块代码没必要太纠结,总之就是从 head 中开始,依次往下寻找所有 Inbound handler,执行其 channelRegistered(ctx) 操作。
到这里,register操作算是完成了,下面回到initAndRegister方法中:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
// 我们上面说完了这行
ChannelFuture regFuture = config().group().register(channel);
// 如果在 register 的过程中 发生了错误
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// 源码中说得很清楚 如果到这里 说明后续可以进行 connect() 或 bind() 了 因为两种情况:
// 1. 如果 register 动作是在 eventLoop 中发起的 那么到这里的时候 register 一定已经完成
// 2. 如果 register 任务已经提交到 eventLoop 中 也就是进到了 eventLoop 中的 taskQueue 中
// 由于后续的 connect 或 bind 也会进入到同一个 eventLoop 的 queue 中 所以一定是会先 register 成功 才会执行 connect 或 bind
return regFuture;
}
不论是服务端的NioServerSocketChannel还是客户端的NioSocketChannel,在bind或connect时,都会先进入initAndRegister这个方法,所以我们上面说的那些对于两者都是通用的。
一定要记住,register操作是十分重要的,要知道这一步大概做了哪些事情,register操作以后,将进入到bind或者connect操作中。
connect过程和bind过程分析
上节说的是register操作,它十分重要,它建立起来了很多的东西,它是 Netty 中 NioSocketChannel 和 NioServerSocketChannel 开始工作的起点。
这节主要说说 register 之后的 connect 操作和 bind 操作。
connect过程分析
对于客户端NioSocketChannel来说,完成了register以后,就要开始connect了,这一步将连接到服务端。
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 这里完成了 register 操作
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 这里我们不去纠结 register 操作是否 isDone()
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 看这里
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
....
}
}
这里最后会走到AbstractChannel 的 connect 方法:
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
从源码中可以看到,connect操作是交给pipeline来执行的,进入pipeline中,我们会发现,connect这种Outbound类型的操作,是从pipeline的tail开始的:
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
接下来就是pipeline的操作了,从tail开始,执行pipeline上的Outbound类型的handlers的connect()方法,那么真正的底层的connect是发生在哪里?首先来看看我们的pipeline图:
从 tail 开始往前找 Outbound 类型的 handlers,每经过一个 handler,都执行里面的 connect() 方法,最后会到 head 中,因为 head 也是 Outbound 类型的,我们需要的 connect 操作就在 head 中,它会负责调用 unsafe 中提供的 connect 方法:
// HeadContext
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
接下来我们来看一看connect在unsafe类的底层操作:
// AbstractNioChannel.AbstractNioUnsafe
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
......
boolean wasActive = isActive();
// 大家自己点进去看 doConnect 方法
// 这一步会做 JDK 底层的 SocketChannel connect 然后设置 interestOps 为 SelectionKey.OP_CONNECT
// 返回值代表是否已经连接成功
if (doConnect(remoteAddress, localAddress)) {
// 处理连接成功的情况
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// 下面这块代码 在处理连接超时的情况 代码很简单
// 这里用到了 NioEventLoop 的定时任务的功能 这个我们之前一直都没有介绍过 因为我觉得也不太重要
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
如果上面的 doConnect 方法返回 false,那么后续是怎么处理的呢?
在上一节介绍的 register 操作中,channel 已经 register 到了 selector 上,只不过将 interestOps 设置为了 0,也就是什么都不监听,而在上面的 doConnect 方法中,我们看到它在调用底层的 connect 方法后,会设置 interestOps 为 SelectionKey.OP_CONNECT。
剩下的就是 NioEventLoop 的事情了,还记得 NioEventLoop 的 run() 方法吗?也就是说这里的 connect 成功以后,这个 TCP 连接就建立起来了,后续的操作会在 NioEventLoop.run() 方法中被 processSelectedKeys() 方法处理掉。
bind过程分析
说完 connect 过程,我们再来简单看下 bind 过程:
private ChannelFuture doBind(final SocketAddress localAddress) {
// **前面说的 initAndRegister**
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// register 动作已经完成,那么执行 bind 操作
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
......
}
}
一直往里走代码,最终会发现bind 操作也是要由 pipeline 来完成的:
// AbstractChannel
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
bind 操作和 connect 一样,都是 Outbound 类型的,所以都是 tail 开始:
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
最后的 bind 操作和connect操作一样,又到了 head 中,由 head 来调用 unsafe 提供的 bind 方法:
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
bind 方法,非常简单,bind 操作也不是什么异步方法,就暂时介绍到这里了。
转载自:https://juejin.cn/post/7228498721784954940