异步超时中断,知其然,也要知其所以然~
异步编排
在业务开发的过程中,我们为了降低接口耗时,经常会用到线程池,书写多线程数据获取、同步阻塞获取结果的业务逻辑。
常见的使用方法如下:
Future
@Slf4j
@SpringBootTest
public class OtherTest {
public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
public static void main(String[] args) {
Future<Integer> submit1 = executor.submit(() -> {
// 业务耗时逻辑1
return 1;
});
Future<Integer> submit2 = executor.submit(() -> {
// 业务耗时逻辑2
return 2;
});
Future<Integer> submit3 = executor.submit(() -> {
// 业务耗时逻辑3
return 3;
});
try {
Integer integer1 = submit1.get();
Integer integer2 = submit2.get();
Integer integer3 = submit3.get();
System.out.println(integer1);
System.out.println(integer2);
System.out.println(integer3);
} catch (Exception e) {
e.printStackTrace();
}
}
}
假设一个接口涉及到3个业务逻辑,如下:
- 业务逻辑1耗时: 50ms
- 业务逻辑2耗时: 30ms
- 业务逻辑3耗时: 70ms
那么如果是传统的串行调用,接口总耗时:150ms
但如果是上面的利用线程池的方式进行调用,那么该接口耗时取决于耗时最长的那个业务逻辑,即该接口耗时为: 70ms
可以看到,接口耗时是有明显降低的~
CompletableFuture
当然,上面虽然对接口进行异步编排后,接口耗时有着下降,但是如果说我们的耗时业务逻辑有着十几二十个?且业务逻辑之间存在依赖关系?那么我们怎么办?
很显然,上面的Future
就不能满足我们的需求了,所以从JDK8开始,JDK提供了CompletableFuture
工具类,为我们异步编排提供了很大的便利~
@Slf4j
@SpringBootTest
public class OtherTest {
public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
// 业务耗时逻辑1
return 1;
}, executor);
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
// 业务耗时逻辑2
return 2;
}, executor);
CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
// 业务耗时逻辑3
return 3;
}, executor);
try {
// 等待任务全部执行完毕
CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).get();
System.out.println(completableFuture1.get());
System.out.println(completableFuture2.get());
System.out.println(completableFuture3.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
由于案例比较简单,无法突出CompletableFuture
编排能力相比于Future
的优势所在,这个在以后的文章里专门会为大家讲解,这不是本文的重点。
超时中断
在上面的案例中,细心的小伙伴可以发现,无论是CompletableFuture
还是Future
,我都是进行阻塞等待任务结束。
这,其实是一个非常危险的行为,如果下游rpc
接口出现波动,那么接口耗时会明显提升,而我们却进行阻塞获取,线程会被一直阻塞无法及时释放,那么随着不断的请求进来,线程池线程、队列很快都会被打满,新任务都会被拒绝掉,从而影响用户体验,从而影响你的工资,从而影响你的工作。
所以,为了杜绝这种情况出现,我们在获取任务结果的时候需要设置等待时间~
Future
和CompletableFuture
的get
方法都支持传入等待时间~
Future超时中断机制
Future提供了get
方法来供我们阻塞获取任务结果,也支持传入超时时间,下面来了解下源码
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// 参数校验
if (unit == null)
throw new NullPointerException();
int s = state;
// 阻塞等待,如果超过超时时间任务还未完成,那么抛出超时异常
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
阻塞等待,timed
为true
代表存在超时时间
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
// 任务状态 > COMPLETING说明已经执行完毕
if (s > COMPLETING) {
// 当前线程不用等待了,将等待节点里的Thread设置为null
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// COMPLETING是任务执行完毕到真正将任务设置为完成态的一个中间状态
// 当任务的处于COMPLETING时,说明任务已经执行完了,但此时cpu时间不够没有继续执行
// 此时需要yield一下,让其他线程执行,从而将任务正确设置为完成状态
Thread.yield();
else if (Thread.interrupted()) {
// 如果当前线程被打断了,则把当前线程从等待该任务完成的阻塞线程链表中删除
removeWaiter(q);
// 抛出打断异常
throw new InterruptedException();
}
else if (q == null) {
// 如果是超时等待,且等待时间<=0,则直接返回当前任务状态
if (timed && nanos <= 0L)
return s;
// 初始化一个等待当前任务执行完的节点,内部包含
q = new WaitNode();
}
else if (!queued)
// 将WaitNode排队到线程等待链表中
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
// 阻塞等待,存在超时时间
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
// 阻塞等待,没有超时时间
LockSupport.park(this);
}
}
上面源码注释已经比较完善了,但我们还是要总结一下
- 任务
COMPLETING
状态,是任务执行完毕到真正将任务设置为完成态的一个中间状态(见FutureTask的run方法
) get
方法无论是否存在超时时间,底层都是通过LockSupport
的park、unpark
方法来达到阻塞的目的- 对于每个任务,其内部会维护一个等待当前任务完成的线程链表
waiters
CompletableFuture超时中断机制
而从JDK 9
开始,CompletableFuture
也提供了 orTimeout
、completeTimeout
方法,来进行异步超时控制。
CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).orTimeout(1, TimeUnit.SECONDS).get();
根据上面代码,我们可以理解到,会等待completableFuture1, completableFuture2, completableFuture3
三个任务执行1秒钟
如果超过1秒,则会抛出java.util.concurrent.TimeoutException
源码如下:
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(new Timeout(this),
timeout, unit)));
return this;
}
public CompletableFuture<T> completeOnTimeout(T value, long timeout,
TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(
new DelayedCompleter<T>(this, value),
timeout, unit)));
return this;
}
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
// 延时任务
return delayer.schedule(command, delay, unit);
}
static final ScheduledThreadPoolExecutor delayer;
static {
// 单线程
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
static final class Timeout implements Runnable {
final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f) { this.f = f; }
public void run() {
// 如果CompletableFuture不为null,且定时任务没有被取消
if (f != null && !f.isDone())
// 设置超时异常
f.completeExceptionally(new TimeoutException());
}
}
static final class DelayedCompleter<U> implements Runnable {
final CompletableFuture<U> f;
final U u;
DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
public void run() {
if (f != null)
// 将任务结果设置为我们给定的value
f.complete(u);
}
}
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
Canceller(Future<?> f) { this.f = f; }
public void accept(Object ignore, Throwable ex) {
// 如果没有异常,且超时任务存在且没有被取消,那么则取消超时任务
// 因为此时说明,CompletableFuture的任务在超时时间内完成了,则不需要在监控超时
if (ex == null && f != null && !f.isDone())
f.cancel(false);
}
}
通过对上面源码的了解,我们可以知道
CompletableFuture
的orTimeout
、completeOnTimeout
底层其实都是通过ScheduledThreadPoolExecutor
来实现的
当我们对一个CompletableFuture
设置了超时时间后,底层其实会通过ScheduledThreadPoolExecutor
启动一个延时任务,延时时间就是我们设置的超时时间,此时有分为两种情况
- 任务在超时时间之内完成,那么在任务完成之后,会去通过
cancel(false)
取消延时任务 - 任务执行时间超过设定的超时时间,则为该任务设置
TimeoutException
,让主线程感知~
另外,我们还能看到,
CompletableFuture
的延时任务并没有进行try-catch
,此处可以了解下->ScheduledThreadPoolExecutor有坑嗷~
而orTimeout
和completeOnTimeout
的区别就在于
- 如果是
orTimeout
,那么超时后会抛出超时异常 - 如果是
completeOnTimeout
,不会抛出异常,则是将任务结果设置为我们传入的value
扩展知识点
在上面了解CompletableFuture
的orTimeout
、completeOnTimeout
时,我们知道了其底层是通过ScheduledThreadPoolExecutor
来实现的,但通过源码发现,ScheduledThreadPoolExecutor
只有一个线程去处理
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
那么,当出现大量设置了超时时间且时间个不一致的CompletableFuture
时,由于是单线程处理,可能我们给任务设置的超时时间是1000ms
,但实际可能因为队列排队,真正处理超时的超时时间会 > 1000ms
也就是说orTimeout
、completeOnTimeout
设置的超时时间并不会那么精确
结尾
我是 皮皮虾 ,会在以后的日子里跟大家一起学习,一起进步!
转载自:https://juejin.cn/post/7207563514650918973