FutureTask源码解析,搞懂它的“前世今生”!
本文我们将介绍一下FutureTask的“前世今生”。
前言
Java中一般通过继承Thread类或者实现Runnable接口这两种方式来创建多线程,但这两种方式都不会返回结果,也不能抛出被检查的异常。基于这个缺陷,在JDK1.5之后出现了Callable
和Future
接口,通过它们就可以在任务执行完毕之后得到任务的执行结果。
1.Callable接口
public interface Callable<V> {
V call() throws Exception;
}
- 可以看到Callable是个泛型接口,泛型
V
就是要call()方法返回的类型; - 类似于
Runnable
接口实现run()方法,而对于Callable接口,需要实现call()
方法; - 不能使用Callable创建线程,只能使用Runnable创建线程;【重点】
- call()方法可以引发异常,而run()则不能。
2.Future接口
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Future接口定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
-
cancel()
尝试取消当前任务的执行。 如果任务已经完成、已被取消或由于其他原因无法取消,则此尝试将失败,返回false。 如果成功,并且调用cancel方法时当前任务尚未启动,则此任务永远不会运行。 如果任务已经开始,则
mayInterruptIfRunning
参数确定是否应中断执行此任务的线程以尝试停止任务。【true→中断执行任务的线程并返回true;false→不会中断任务执行d的线程并返回true】 -
isCancelled()
判断任务是否被取消,如果此任务在正常完成之前被取消,则返回true。
注意:若cancel()返回true,调用isCancelled()将返回true
-
isDone()
判断当前任务是否已经完成,完成返回true,未完成返回false。
注意:完成可能是由于正常终止、异常或取消,在这些情况下,同样会返回true。若cancel()返回true,调用isDone()将返回true。
-
get()
等待任务计算完成,若任务没有完成,阻塞等待直到任务执行完成,然后获取其结果。
如果任务被取消会抛出
CancellationException
异常;如果任务执行过程发生异常则会抛出
ExecutionException
异常;如果当前线程在等待时被中断则会抛出
InterruptedException
异常; -
get(long timeout, TimeUnit unit)
与get()类似,只不过可以指定阻塞等待的时间。如果在阻塞等待过程中超时则会抛出
TimeoutException
异常。
Callable与Runnable类似,封装了要在一个线程A上运行的任务,而Future用于存储从另一个线程A获得的结果。因此,Future也可以与Runnable一起使用。
而FutureTask【Future实现类】,该类型实现Runnable和Future,并方便地将两种功能组合在一起。
引入FutureTask
综上,提出三个特点:多线程/有返回/异步任务
Thread构造方法只接受实现了Runnable接口的线程,但Runnable接口的run方法无返回,只有Callable接口的call方法有返回值;因此我们如果能够找到一个接口/类能够满足上面三个特点的话,岂不美哉!
我们在Runnable接口下面找到了一个继承此接口的RunnableFuture
的接口,同时也继承了Future接口,此时如果实现类也能够满足有返回值就可以了,看到一个实现类FutureTask
,但是没有实现Callable接口;发现FutureTask的构造方法支持Callable接口的注入【适配器设计模式】—完美!!!
public class FutureTask<V> implements RunnableFuture<V>;
public interface RunnableFuture<V> extends Runnable, Future<V>;
可见FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以作为Runnable交给Executor执行,也可以获取Callable的执行结果。

FutureTask的使用
可以把FutureTask交给Executor执行;也可以通过ExecutorService.submit()方法返回一个 FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel())方法。
ExecutorService threadPool = Executors.newFixedThreadPool(5);
Future<Object> submit = threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
进入到ExecutorService实现类中查看
// AbstractExecutorService类
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
--------------------------------------------------------------------------
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用 FutureTask。假设有多个线程执行若干任务,每个任务最多只能被执行一次。当多个线程试图 同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才 能继续执行。下面是对应的示例代码。
private final ConcurrentMap<Object, Future<String>> taskCache =
new ConcurrentHashMap<Object, Future<String>>();
private String executionTask(final String taskName)
throws ExecutionException, InterruptedException {
while (true) {
Future<String> future = taskCache.get(taskName); // 1.1,2.1
if (future == null) {
Callable<String> task = new Callable<String>() {
public String call() throws InterruptedException {
return taskName;
}
};
FutureTask<String> futureTask = new FutureTask<String>(task);
future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
if (future == null) {
future = futureTask;
futureTask.run(); // 1.4执行任务
}
}
try {
return future.get(); // 1.5,2.2线程在此等待任务执行完成
} catch (CancellationException e) {
taskCache.remove(taskName, future);
}
}
}

当两个线程试图同时执行同一个任务时,如果Thread 1执行1.3后Thread 2执行2.1,那么接 下来Thread 2将在2.2等待,直到Thread 1执行完1.4后Thread 2才能从2.2(FutureTask.get())返回。
FutureTask源码解析
0.FutureTask的状态
-
state由volatile修饰,保证可见性,确保只要有任何一个线程修改了这个变量,那么其它所有的线程都会知道最新的值。
-
可能的状态转移
NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED

private volatile int state;
//尚未执行完的任务,初始状态。
private static final int NEW = 0;
// 一种临界状态,正在结束,outcome还没有保存正常结束/异常结束的结果
private static final int COMPLETING = 1;
// 任务正常结束,并保存了结果到outcome中
private static final int NORMAL = 2;
// 任务执行过程中发生了异常,结果异常信息保存到了outcome中
private static final int EXCEPTIONAL = 3;
// 任务执行完成前[还没执行,或执行过程中],该任务就被取消,调用cancel(false)
private static final int CANCELLED = 4;
// 任务执行完成前[还没执行,或执行过程中],调用cancel(true),但还没有完成中断,中断中...
private static final int INTERRUPTING = 5;
// 当前任务已经中断
private static final int INTERRUPTED = 6;
所有值大于COMPLETING
【1】的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。

1.解析构造方法
FutureTask有两个构造方法:
注入Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
// 设置初始状态:尚未执行
this.state = NEW;
}
注入Runnable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
// 设置初始状态:尚未执行
this.state = NEW;
}
-----------------------------------------------------
// Executors类
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
- 可见,第二种构造方法就是利用适配器设计模式将runnable转换为callable,并封装传过来的result(没传则为null)→【看FutureTask的使用中的内容】
2.解析run()方法
public void run() {
// 条件一:说明该任务已经执行过或被cancel,非NEW状态,就不处理了
// 条件二:CAS原子比较this对象指定位置字段,若是期望值null,则更新该字段为当前线程,失败就返回【说明其它线程已经抢占】
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// 走到这,说明满足NEW状态,并当前线程抢占成功
try {
// 注入的Callable或者由适配器将Runnable转换后的Callable
Callable<V> c = callable;
// c!=null,防止调用call出现空指针异常
// state==NEW,防止外部线程将当前任务取消掉,谨慎操作!
if (c != null && state == NEW) {
// 结果引用
V result;
// 表明任务执行是否成功
boolean ran;
try {
// 执行任务逻辑
result = c.call();
// 未抛异常,执行成功
ran = true;
} catch (Throwable ex) {
// 出现异常
result = null;
ran = false;
// 保存异常信息
setException(ex);
}
if (ran)
// 保存结果到outcome中
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
// 如果任务被中断,执行中断处理
handlePossibleCancellationInterrupt(s);
}
}
3.解析setException()方法
protected void setException(Throwable t) {
// CAS原子比较this对象指定位置字段,若是期望值NEW,则更新该字段为COMPLETING[完成中,设置outcome值,上面有提及]
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 保存异常信息
outcome = t;
// 懒惰原子操作把当前任务状态从COMPLETING变更为EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 下面会解析
finishCompletion();
}
}
4.解析set()方法
protected void set(V v) {
// CAS原子比较this对象指定位置字段,若是期望值NEW,则更新该字段为COMPLETING[完成中,设置outcome值,上面有提及]
// 其它线程可能会调用cancel(),故使用CAS操作,小概率事件
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 保存正常返回值
outcome = v;
// 懒惰原子操作把当前任务状态从COMPLETING变更为NORMAL,正常结束状态
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 下面会解析
finishCompletion();
}
}
5.解析handlePossibleCancellationInterrupt()方法
- 若一直处于中断中[cancel(true)]...就释放CPU。
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
6.解析get()方法
任务发起线程可以调用get()方法来获取任务执行结果,如果此时任务已经执行完毕则会直接返回任务结果,如果任务还没执行完毕,则调用方会阻塞直到任务执行结束返回结果为止。get()方法实现如下:
public V get() throws InterruptedException, ExecutionException {
// 获取当前任务状态
int s = state;
// 成立条件:未执行,正在执行,正在结束,调用awaitDone()方法阻塞调用get方法的外部线程
if (s <= COMPLETING)
// 获得线程状态
s = awaitDone(false, 0L);
// 把不同的任务状态映射成任务执行结果
return report(s);
}
7.解析get(long timeout, TimeUnit unit)方法
与get()类似,只不过可以指定阻塞等待的时间。如果在阻塞等待过程中超时则会抛出TimeoutException
异常。
- timeout:时间长度
- unit:时间单位
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// 该字段不能为空
if (unit == null)
throw new NullPointerException();
// 获取当前任务状态
int s = state;
// 未执行,正在执行,正在结束的条件下调用awaitDone()方法阻塞调用get方法的外部线程,判断结果
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
8.解析awaitDone()方法
当调用get()获取任务结果但是任务还没执行完成的时候,调用线程会调用awaitDone()方法进行阻塞等待,该方法定义如下:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算等待截止时间。System.nanoTime():获取表征当前时间的数值,
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 下面会将当前线程封装为WaitNode对象
WaitNode q = null;
// 表示封装后的WaitNode有没有入队
boolean queued = false;
// 自旋操作
for (;;) {
// 1.当前进程已唤醒:被其它线程通过中断方式[interrupted()]唤醒,进入下面逻辑,将中断标记重置回false
if (Thread.interrupted()) {
// 将q出队
removeWaiter(q);
// get()方法抛出中断异常
throw new InterruptedException();
}
// 获取当前任务的最新状态
int s = state;
// 2.说明任务已经结束(要么正常结束,要么异常结束,要么被取消)
// 则把thread置空[help GC],并返回当前状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 3. 如果状态处于中间状态COMPLETING
// 表示任务已经结束但是任务执行线程还没来得及给outcome赋值
// 这个时候让出CPU执行权让其他线程优先执行,很快会变为下一个状态,抢占到CPU后再进行自旋
else if (s == COMPLETING)
Thread.yield();
// 4.第一次自旋:将当前线程封装进一个WaitNode中
else if (q == null)
q = new WaitNode();
// 5.第二次自旋:创建了WaitNode,但还未入队
else if (!queued)
// 当前WaitNode节点的next指向队列的头结点
// CAS原子比较this对象指定位置字段,若是期望值为q.next = waiters,则更新该将首节点替换原来waiters(waiters始终指向队列头结点)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 6.第三次自旋
else if (timed) {
// 如果需要等待特定时间,则先计算要等待的时间
// 如果已经超时,则删除对应节点并返回对应的状态
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞等待特定时间
LockSupport.parkNanos(this, nanos);
}
else
// 7.阻塞等待直到被其他线程唤醒
LockSupport.park(this);
}
}
流程
- 第一轮for循环,执行的逻辑是q == null,所以这时候会新建一个节点q。第一轮循环结束。
- 第二轮for循环,执行的逻辑是!queue,这个时候会把第一轮循环中生成的节点的netx指针指向waiters,然后CAS的把节点q替换waiters。也就是把新生成的节点添加到waiters链表的首节点。如果替换成功,queued=true。第二轮循环结束。
- 第三轮for循环,进行阻塞等待。要么阻塞特定时间,要么一直阻塞直到被其他线程唤醒。
- 若通过finishCompletion()【下面会解析】中的LockSupport.unpark()唤醒的话,会执行2/3;若被其它线程通过中断方式[interrupted()]唤醒,下次自旋进入到1中。

9.解析removeWaiter()方法
private void removeWaiter(WaitNode node) {
if (node != null) {
// 将节点中封装的thread置空,根据node.thread是否为空在自旋中完成删除
node.thread = null;
retry:
// 自旋
for (;;) { // restart on removeWaiter race
// pred→当前节点前驱;q→当前节点;s→当前节点的下一个节点
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
// 拿到当前节点的下一个节点
s = q.next;
// 当前节点线程不为空,一直向后面寻找
if (q.thread != null)
// 前驱指向当前节点
pred = q;
// 当前节点【待删除节点】不是头结点的情况
else if (pred != null) {
// 删除操作,前驱指向当前节点的下一个节点
pred.next = s;
// 前驱结点也打算出队,重新自旋操作执行出队操作
if (pred.thread == null) // check for race
continue retry;
}
// ②当前节点【待删除节点】是头结点的情况
// CAS原子比较this对象指定位置字段,若是期望值q,则更新该字段为s,完成删除
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
// 重头自旋看是否有待删除的节点
continue retry;
}
break;
}
}
}
- node.thread = null是为了在下面自旋中找到对应节点,并将其删除
- 两个else if是两种情况:1.当前节点是头结点;2.当前节点不是头结点
10.解析finishCompletion()方法
- 任务执行异常,任务正常执行完毕或者取消任务,最后都会调用finishCompletion()方法
- 唤醒所有等待线程并发出信号。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// CAS原子比较this对象指定位置字段,若是期望值q,则更新该字段为null,完成置空操作
// CAS是因为其它线程调用cancel()取消当前任务,取消同样会触发finishCompletion方法
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 自旋
for (;;) {
// 第一次自旋,会拿到等待队列中的头结点的线程
Thread t = q.thread;
// 当前线程不为空
if (t != null) {
q.thread = null; // help gc
// 唤醒当前节点的线程,awaitDone中park的该线程就会唤醒
LockSupport.unpark(t);
}
// 拿到当前节点的下一个节点
WaitNode next = q.next;
// 若next为空,则等待队列全部处理完毕
if (next == null)
break;
q.next = null; // help gc
q = next;
}
break;
}
}
// 扩展用的
done();
// 将callable设置为null,help gc
callable = null;
}
11.解析cancel()方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 条件一:处于运行中或处于等待(任务)队列中,这两种状态才可以被取消
// 条件二:CAS原子比较this对象指定位置字段,若是期望值NEW,则更新该字段为INTERRUPTING(mayInterruptIfRunning→true)/CANCELLED(mayInterruptIfRunning→false)
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// true→表明需要给当前线程发送一个中断信号
if (mayInterruptIfRunning) {
try {
// ①若t为null,说明当前任务还在等待(任务)队列中;不为空,说明正在执行
Thread t = runner;
if (t != null)
// 发送中断信号
t.interrupt();
} finally {
// CAS懒惰原子操作把任务状态修改为INTERRUPTED【中断完成】
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒所有get()阻塞的线程
finishCompletion();
}
return true;
}
get方法和cancel方法的执行示意图如下所示

12.解析report()方法
private V report(int s) throws ExecutionException {
Object x = outcome;
// 1.正常执行结束
if (s == NORMAL)
return (V)x;
// 2.任务取消,抛出异常
if (s >= CANCELLED)
throw new CancellationException();
// 3. 其他状态,抛出执行异常ExecutionException
throw new ExecutionException((Throwable)x);
}

Future优缺点分析
1.优点
-
Future+线程池异步多线程任务配合,能显著提升程序的执行效率
public static void main(String[] args) { long startTime = System.currentTimeMillis(); // 三个任务暂停毫秒 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.println("花费时间为:" + (endTime - startTime) + " 毫秒"); }
使用线程池优化之后
public static void main(String[] args) throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); // 创建线程池 ExecutorService threadPool = Executors.newFixedThreadPool(3); // 开启多个异步线程 FutureTask<String> task1 = new FutureTask<String>(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "task1 over"; }); // 在这里如果多次new的话,会降低性能,有new就会有垃圾回收,因此可以使用线程池复用线程 // new Thread(task1).start(); threadPool.submit(task1); System.out.println(task1.get()); FutureTask<String> task2 = new FutureTask<String>(() -> { try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } return "task2 over"; }); threadPool.submit(task2); System.out.println(task2.get()); FutureTask<String> task3 = new FutureTask<String>(() -> { try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } return "task3 over"; }); threadPool.submit(task3); System.out.println(task3.get()); long endTime = System.currentTimeMillis(); System.out.println("花费时间为:" + (endTime - startTime) + " 毫秒"); threadPool.shutdown(); }
2.缺点
-
get()阻塞
futureTask.get()方法一定得等到结果才会继续向下执行,容易程序阻塞
public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(() -> { System.out.println(Thread.currentThread().getName() + "\t ----- come in"); // 暂停几秒钟 TimeUnit.SECONDS.sleep(5); return "task over"; }); new Thread(futureTask).start(); System.out.println(futureTask.get()); System.out.println(Thread.currentThread().getName() + "\t 主线程忙其它任务了。。。。。"); }
比如上面这段代码就会等待结果打印出来之后,才能向下执行主线程。因此我们一般将get()放到最后,不要影响其它线程的执行,如果我们不想等待太久的时间,希望会自动离开,过时不候。我们可以使用下面这个方法
futureTask.get(3, TimeUnit.SECONDS)
在指定的时间没有获取到结果就会报出异常java.util.concurrent.TimeoutException
但是这种处理方法不优雅,不健壮
另一种方法就是使用轮询去获取结果
while(true) {
if(futureTask.isDone()) {
System.out.println(futureTask.get());
break;
} else {
// 暂停一段时间之后再次轮询访问
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("正在处理中,不要再催了。。。。");
}
}
- 轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果。
- 如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞
结论:
- Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
- 上面这条结论也就是为什么
JDK8
会引入CompletableFuture
这个类。
转载自:https://juejin.cn/post/7194424656421584955