likes
comments
collection
share

Java线程池详解一:Future的使用和实现

作者站长头像
站长
· 阅读数 16

提交到线程池中执行的异步任务都会返回一个任务的 Future,所以这里先介绍一下 Future 的使用和实现。

异步任务通常会被提交到线程池中去执行,但任务并非提交到线程池后就不管不顾了,在某些时刻我们希望能够取消任务,同时也希望在任务执行完成后获取到任务的执行结果。

Java 提供了 Future 代表了一个异步任务的结果,可以用来等待任务执行完成并获取任务的执行结果和取消任务。Future 中生命周期只能前进不能后退,某个任务完成后,它将永远停留在“完成”状态上。

Java 中表示线程池的 ExecutorService 接口中定义的用于提交任务的方法都会返回提交的任务的 Future。

 public interface ExecutorService extends Executor {
    // 提交Callable任务并返回任务的Future
      Future submit(Callable task);
    // 从返回的Future中获取到执行结果为result
      Future submit(Runnable task, T result);
    // 从返回的Future中获取到执行结果为null
    Future submit(Runnable task);
}

Future 相关的类的类图如下:

Java线程池详解一:Future的使用和实现

  • Future 接口表示一个异步任务的执行结果。
  • RunnableFuture 接口表示一个 Runnable 的任务和其执行结果。
  • FutureTask 是 RunnableFuture 的具体实现,它可以发起任务的执行、等待任务的执行结果、取消任务。

1. 使用 Future

1.1. 获取任务执行结果

Future 提供可一个不带参数 get 方法和一个带超时参数的 get 方法用于获取任务的执行结果:

package java.util.concurrent;

    public interface Future {    
        /**
         * 等待任务执行完成,并获取其结果。
         * Throws:
         *  CancellationException - 如果任务被取消
         *  ExecutionException - 如果任务发生异常
         *  InterruptedException - 如果当前线程在等待时被中断
         */
        V get() throws InterruptedException, ExecutionException;
        
        // 等待指定时间任,并获取其结果。若指定时间内任务还在执行则抛出TimeoutException
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
}

调用 get 方法时,若任务已经执行结束,那么 get 会立即返回或者抛出一个 Exception,如果任务没有完成,那么 get 将阻塞直到任务结束。也可以使用带超时的 get 方法,如果在指定时间内任务没有执行完成,那么将抛出 TimeoutException。

ExecutorService es = Executors.newFixedThreadPool(1);
Future f = es.submit(new Callable() {
    @Override
    public Date call() throws Exception {
        Thread.sleep(5000);
        return new Date();
    }
});
try {
    f.get(2000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
    System.out.println("timeout");
}

// 输出:
timeout

如果任务抛出了异常,那么 get 方法会将异常重新封装为 ExecutionException 并抛出,可以通过 getCause 来获得被封装的初始异常。如果任务被取消,那么 get 将抛出 CancellationException。

ExecutorService es = Executors.newFixedThreadPool(1);
Future f = es.submit(new Runnable() {
    @Override
    public void run() {
        throw new RuntimeException("task throw error test");
    }
});
//  等待任务执行完成,并获取其结果。
try {
    f.get();
} catch (ExecutionException e) {
    System.out.println(e.getCause().getMessage());
}

输出:
task throw error test

1.2. 取消任务

通过 Future 可以在需要的时候通过 cancel 方法来取消任务,例如 get 超时后抛弃任务时应该立即停止这些任务,从而避免继续为一个不再需要的结果浪费计算资源。

package java.util.concurrent;

public interface Future { 
    // 如果任务无法取消,通常是因为它已经完成,则返回false;否则返回true。
    boolean cancel(boolean mayInterruptIfRunning);
}

尝试取消执行的任务时,会有以下三种可能:

  • 任务已经完成、已被取消或由于其他原因无法取消,则此尝试将失败,返回false。
  • 任务还未启动,则该任务将不会被执行, 同时返回 true。
  • 任务已经启动,则 mayInterruptIfRunning 参数确定是否应中断执行此任务的线程以尝试停止该任务。

通过 cancel 来取消任务时若任务还未发起则任务将不会被执行,但若任务已经发起了执行则只能通过中断来尝试终止任务,但这依赖于任务自身要响应中断并退出。如下:

Future f = es.submit(new Runnable() {
@Override
public void run() {
    Thread thread = Thread.currentThread();
    // 未被中断则一直执行
    while (!thread.isInterrupted()) {
        System.out.println(thread.getName() + ": running");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // sleep方法会影响中断抛出异常并清除中断状态,所以这里恢复中断状态供while判断
            thread.interrupt();
        }
    }
    System.out.println(thread.getName() + ": end");
}
});
// 等待一会确保任务被发起执行
Thread.sleep(2_000);
// 取消任务,通过发送中断来取消
f.cancel(true);

1.3.  获取任务状态

Future 提供了 isDone 和 isCancelled 两个方法来获取任务的状态。

 /*
 * 任务是否已完成。
 * 完成可能是由于正常终止、异常或取消,在所有这些情况下,该方法将返回true。
 */
boolean isDone();

/*
 * Future是否已经被取消
 * 在调用cancel后,无论cancel是否返回true改返回都将始终返回true。
 */
boolean isCancelled();

2.  FutureTask

FutureTask 是 Future 的一个具体实现,它实现了 RunnableFuture 接口。

Executor 框架下的线程池基本都使用的 FutureTask。在 AbstractExecutorService 中所有提交的任务都会被先封装为 FutureTask,然后在提交到 execute,基本上大部分线程数都继承自 AbstractExecutorService,使用的也都是其实现的提交方法,各个线程池更专注于 execute 方法的实现。如下:

Java线程池详解一:Future的使用和实现

newTaskFor 统一为任务生成 FutureTask:

protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
    return new FutureTask(runnable, value);
}

除了提交到线程池的任务会生成 FutureTask 外,我们也可以直接为 Runnable 和 Callable 的任务创建一个 FutureTask并使用,使用时也可以再当前线程中执行它。如下:

FutureTask f = new FutureTask<>(new Callable() {
    @Override
    public Date call() throws Exception {
        return new Date();
    }
});
f.run();
Date d = f.get();

不使用线程而直接在当前线程使用 FutureTask 的效果与同步执行相同,唯一用处是可以通过它获取到同步执行的 Future,与其它异步任务的 Future 形成统一的视图方便统一处理。

当在其它线程中异步执行时,往往是在任务执行前先将任务封装为 FutureTask 并返回,然后在异步线程中执行 FutureTask 的 run方法,在其它线程中执行 FutureTask 的 get 方法等待执行结果。

2.1. FutureTask 的状态

FutureTask 中的 state 字段记录了任务的当前状态,FutureTask 中的操作基本上发起前都要先校验状态,操作后又要更新状态。

public class FutureTask implements RunnableFuture {
    private volatile int state;

    private static final int NEW          = 0;
    // 任务执行结束,更新执行结果中 
    private static final int COMPLETING   = 1;
    // 任务正常结束
    private static final int NORMAL       = 2;
    // 任务抛出异常而结束
    private static final int EXCEPTIONAL  = 3;
    // 任务被取消
    private static final int CANCELLED    = 4;
    // 通过中断取消任务中
    private static final int INTERRUPTING = 5;
    // 任务被中断取消
    private static final int INTERRUPTED  = 6;
}

初始化时为 NEW 状态,NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED 为终态,分别表示正常执行结束、执行抛出异常而结束、被取消、被中断。

状态更新流程如下:

Java线程池详解一:Future的使用和实现

通常调用 get 方法获取任务结果的线程和执行任务线程并非同一个线程中,所以为保证 state 字段的线程安全性,state 字段的更新都是通过 Unsafe的compareAndSwapObject 方法使用 CAS 算法进行更新的。

2.2.  get 方法的实现

当调用 FutureTask 的 get 方法时会阻塞直到任务执行完成或等待超时。其实现为 在 get 方法中通过一个 for(;;) 的循环一直循环到任务结束或者超时,循环中操作:

  • 第一次循环时若任务还未结束则会为当前线程创建一个 WaitNode 的等待节点。
  • 第二次循环时若任务还是未结束则会将当前线程的 WaitNode 解答加入到等待队列中去。
  • 第三次循环时若任务还是为结束则会将当前线程通过 LockSupport 阻塞,若 get 指定了超时则阻塞的指定的超时时间,否则一直阻塞直到被主动唤醒。

当任务正常或异常执行完成后,会唤醒阻塞队列中的所有线程。线程被唤醒后会返回任务的状态。

这期间若调用 get 的线程被中断则会中断时立即抛出 InterruptedException。若任务被取消则会抛出 CancellationException。

get的实现如下,在get 中调用 awaitDone 等待任务执行结果,并调用 report 方法根据状态返回对应的结果:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

await的实现如下:

Java线程池详解一:Future的使用和实现

2.3. FutureTask 的执行

生成 FutureTask 时需要指定异步任务,如指定的任务类型是 Runnable 类型的则会被转换为 Callable 类型并将任务保存在 callable 字段中。

public class FutureTask implements RunnableFuture {
    // 为Callable任务生成FutureTask
    public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;
    }
    // 为Runnable任务生成FutureTask
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }
}

同时也可以看到创建 FutureTask 时其初始状态为 NEW。

通常 FutureTask 的任务会在一个异步线程中执行,即在异步线程中执行其 run 方法。而 FutureTask 的 run 方法会调用其持有的异步任务的 call方法,即 callable 字段的 call 方法,获取 call 的执行结果来更新 FutureTask 的结果和状态。

实现如下: Java线程池详解一:Future的使用和实现

初始时任务的状态为 NEW 和执行任务的线程(runner 字段保存了执行任务的线程)为 null,所以若 run 开始时非 NEW 状态或 runner 非空则说明任务已经被执行或者在执行中,为避免重复发起执行这里会直接返回。

call 正常执行结束后或者抛出异常而结束时都会使用返回的结果或异常去更新状态。

若执行正常完成,则将 outcome 字段将设置为执行返回的结果,FutureTask 状态最终更新为正常结束 ‘NORMAL’。

Java线程池详解一:Future的使用和实现

若执行时抛出异常而结束则将 outcome 设置为抛出的异常,FutureTask 状态最终更新为非正常结束‘EXCEPTIONAL’。

Java线程池详解一:Future的使用和实现

任务结果更新完成后就会通知到阻塞等待结果的线程。任务执行结束后遍历等待队列中的所有节点,将节点的被阻塞等待的线程逐个唤醒,并移除节点。这些操作由 finishCompletion 实现:

Java线程池详解一:Future的使用和实现

finishCompletion 中会调用 done 方法,done方法为一个空方法,我们可以继承 FutureTask 覆盖 done 方法定制任务执行结束后的操作。

public class FutureTask implements RunnableFuture {
    protected void done() { }
}

2.4.  cancel 方法实现

canncel 可以取消任务,取消后未发起的任务将不会被发起,对于已经发起的任务可以选择是否尝试中断。

FutureTask 中只有状态为 NEW 时才可以 cancel,因为状态为 NEW 的任务表示还未执行或者还在执行中,若是其它状态则表示任务已经正常或异常执行结束。

取消时若选择了中断则先状态更新为 INTERRUPTING,然后对执行任务的线程发出中断,之后再将任务状态更新为 INTERRUPTED。

在中断和状态更新完成后,则表示任务已经被取消,所以最后也需要调用 finishCompletion 唤醒所有等待这个任务执行结束的线程。

实现如下:

Java线程池详解一:Future的使用和实现