likes
comments
collection
share

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

作者站长头像
站长
· 阅读数 58

前言

本篇博文是《从0到1学习 Netty》中源码系列的第三篇博文,主要内容是深入分析连接超时的实现原理,包括了 connect 方法的源码解析和 ChannelFuture.sync() 执行过程的解析。,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

介绍

在实际应用中,当客户端尝试连接服务器时,可能会面临多种原因导致连接失败的情况。为了避免无限等待,我们可以在客户端代码中设置一个超时连接时间 CONNECT_TIMEOUT_MILLIS,该时间表示客户端尝试连接服务器的最长时间限制,如果在指定的超时时间内未能成功建立连接,客户端应该主动抛出连接超时的异常。

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)

上述代码的作用是设置连接超时时间为 1000 毫秒,这个选项用于指定连接建立的最大时间,如果超过该时间仍未建立连接,则会放弃连接尝试。

运行结果:

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

然而,当服务器没有启动时,且连接超时时间大于 2 秒钟时,则会抛出连接被拒绝的异常,运行结果如下所示:

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

这是 Java 底层的网络异常。

需要完整代码的读者请访问博主的 Github:TestConnectionTimeout

接下来让我们探索 connect()ChannelFuture.sync() 的执行过程。

connect 源码解析

我们先来探究成功执行连接超时所进行的过程,核心方法 connect() 的部分源码如下所示:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    try {
        ...
        // Schedule connect timeout.
        int connectTimeoutMillis = config().getConnectTimeoutMillis();
        if (connectTimeoutMillis > 0) {
            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                    ConnectTimeoutException cause =
                            new ConnectTimeoutException("connection timed out: " + remoteAddress);
                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                        close(voidPromise());
                    }
                }
            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        ...
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

上述代码的主要内容是根据配置获取连接超时时间,并使用事件循环调度一个定时任务,在指定的时间内检查连接是否超时。如果连接超时,会触发一个 ConnectTimeoutException 异常,并尝试向 connectPromise 发送连接超时的失败信息;否则,连接超时任务被取消,通道关闭。

那主线程是如何知道消息的呢?其实是通过 connectPromise 进行传递消息,我们可以在主线程中标记一下 future,如下图所示:

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

然后切换至 NIO 线程,可以发现 connectPromise 也被标记了,说明他们共属于一个主体,如下图所示:

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

如果不是很了解 FuturePromise 之间的联系的话,可以阅读博主的另一篇文章:异步编程模型:利用 Future 和 Promise 提高性能与响应能力

在上述事例中,我们设置了两秒钟的连接超时时间,由于两秒钟内客户端并没有与服务器建立连接,因此触发了定时任务,执行了 run() 方法,抛出了连接超时异常 ConnectTimeoutException

ChannelFuture.sync() 执行过程解析

下面是 ChannelFuture.sync() 方法的执行过程:

  1. 调用 ChannelFuture.sync() 方法将当前线程阻塞,直到对应的操作完成或发生异常。

  2. sync() 方法内部,会获取当前线程绑定的 EventLoop 对象,然后将当前任务包装成一个特殊的 Promise 对象。

  3. Promise 对象会被注册到 EventLoop 中的任务队列中,等待执行。EventLoop 会按顺序从任务队列中取出任务并执行。

  4. 一旦 Promise 执行完成,即异步操作完成或发生异常,sync() 方法会解除当前线程的阻塞状态,并返回操作的结果或抛出异常。

  5. 操作成功完成,可以通过 ChannelFuture.isSuccess() 方法检查操作是否成功。如果成功,可以继续执行后续的操作;如果失败,可以通过 ChannelFuture.cause() 方法获取失败的原因。

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

需要注意的是,由于 ChannelFuture.sync() 是一个同步阻塞方法,如果在事件循环线程中调用该方法,可能会导致死锁或性能问题。因此,通常建议在其他线程中使用 ChannelFuture.addListener() 方法注册监听器来处理异步操作的结果,而不是直接使用 sync() 方法。

sync 源码解析

首先使用 super.sync() 调用了父类的 sync() 方法,将当前对象作为结果返回。

@Override  
public ChannelPromise sync() throws InterruptedException {  
    super.sync();  
    return this;  
}

上述代码的目的是在执行特定的同步操作后,返回当前的 ChannelPromise 对象。在这种情况下,子类通过调用父类的 sync() 方法来实现同步操作,并在执行完成后返回当前对象,以便支持链式调用或其他需要获取该对象的操作。

然后在父类的 sync() 方法中,调用 await()rethrowIfFailed() 来实现同步等待和异常检查,并返回当前对象。

@Override  
public Promise<V> sync() throws InterruptedException {  
    await();  
    rethrowIfFailed();  
    return this;  
}

在之后的几个方法中,就不对子类做过多的介绍了。

await 源码解析

await 方法是一种等待机制的实现,它通过检查承诺是否已完成,处理中断异常以及使用同步块和等待机制来让线程等待承诺的完成。

@Override  
public Promise<V> await() throws InterruptedException {  
    if (isDone()) {  
        return this;  
    }  
    // 处理线程中断
    if (Thread.interrupted()) {  
        throw new InterruptedException(toString());  
    }  
    // 检查死锁
    checkDeadLock();  

    synchronized (this) {  
        while (!isDone()) {  
            incWaiters();  
            try {  
                wait();  
            } finally {  
                decWaiters();  
            }  
        }  
    }  
    return this;  
}

在上述代码中,如果 isDone() 方法返回 true,说明该承诺已经完成,直接返回当前对象。

Thread.interrupted() 用于检查当前线程是否被中断,如果是,则抛出 InterruptedException 异常,并将当前对象的字符串表示作为异常消息。

checkDeadLock() 方法用于检查是否存在死锁情况。

对于 synchronized (this) {...} 代码块,使用当前对象作为同步锁,确保在多线程环境下只有一个线程可以进入代码块。其中,该代码块核心为当承诺未完成时,一直执行循环。

在循环内部,调用 incWaiters() 方法增加等待中的线程计数器。同时,调用 wait() 方法,使当前线程进入等待状态,直到其他线程调用该对象的 notify()notifyAll() 方法唤醒。但无论如何,最终都会执行 decWaiters() 方法来减少等待中的线程计数器。

接下来,我们看看 isDone() 方法的具体实现。

isDone 源码解析

private static boolean isDone0(Object result) {  
    return result != null && result != UNCANCELLABLE;  
}

上述代码主要作用是判断给定的 result 是否满足完成的条件。

后记

我们深入分析了 ChannelFuture.sync() 方法的执行过程,通过对 connect 源码的解析,我们了解到它在超时连接设置中的作用。而在 ChannelFuture.sync() 的执行过程中,我们进一步解析了 syncawaitisDone 的源码。

这些源码解析的过程帮助我们更好地理解了 ChannelFuture.sync() 方法的执行流程,并且使我们能够更好地降低意外情况的发生率,并提高系统的稳定性和可靠性。

以上就是 设置连接超时:深入分析 ChannelFuture.sync() 的执行过程 的所有内容了,希望本篇博文对大家有所帮助!

参考:

📝 上篇精讲:「项目实战」(三)序列化算法选型对聊天室可扩展性的影响

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注,创作不易,请多多支持;

👍 公众号:sidiot的技术驿站

🔥 系列专栏:探索 Netty:源码解析与应用案例分享

转载自:https://juejin.cn/post/7252989921441824826
评论
请登录