likes
comments
collection
share

彻底搞懂ScheduledThreadPoolExecutor

作者站长头像
站长
· 阅读数 7

介绍

ScheduledThreadPoolExecutor继承ThreadPoolExecutor来重用线程池的功能

彻底搞懂ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 内部构造了两个内部类 ScheduledFutureTaskDelayedWorkQueue:

  • ScheduledFutureTask: ScheduledThreadPoolExecutor使用专门的任务类型ScheduledFutureTask 来执行周期任务,它继承了FutureTask,还分别实现了Runnable、Future、Delayed接口,说明它是一个可以延迟执行的异步运算任务。

  • DelayedWorkQueue: 这是 ScheduledThreadPoolExecutor 为存储周期或延迟任务专门定义的一个延迟队列。它内部只允许存储 RunnableScheduledFuture 类型的任务。

源码解析

ScheduledFutureTask内部类

构造函数
private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
 
    /** 顺序编号,当两个任务有相同的延迟时间时,按照 FIFO 的顺序入队*/
    private final long sequenceNumber;

    /** 下次任务执行时的时间 */
    private long time;

    /** 执行周期时间*/
    private final long period;

    /** 重新入队的任务*/
    RunnableScheduledFuture<V> outerTask = this;

    /** 延迟队列的索引 */
    int heapIndex;

    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Callable<V> callable, long ns) {
        super(callable);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

ScheduledFutureTask继承自FutureTask,可以通过返回Future对象来获取执行的结果,将 Runnable/Callable 封装为 ScheduledFutureTask

run方法
public void run() {
    // 是否为周期任务
    boolean periodic = isPeriodic();
    // 当前线程池运行状态下如果不可以执行任务,取消该任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        // 不是周期性任务,调用FutureTask中的run方法执行
        ScheduledFutureTask.super.run();
    // 周期任务    
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下次执行时间
        setNextRunTime();
        // 重新入队
        reExecutePeriodic(outerTask);
    }
}

setNextRunTime()方法 用来设置下一次运行的时间

private void setNextRunTime() {
    long p = period;
    if (p > 0)
     // 固定频率,上次执行时间加上周期时间
        time += p;
    else
       // 固定延迟执行,使用当前系统时间加上周期时间
        time = triggerTime(-p);
}

period正数表示固定速率执行(为scheduleAtFixedRate提供服务),负数表示固定延迟执行(为scheduleWithFixedDelay提供服务),0表示不重复任务。

cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
    // 调用父类 FutureTask.cancel 
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // 是否从队列中移除此任务
    if (cancelled && removeOnCancel && heapIndex >= 0)
        remove(this);
    return cancelled;
}

ScheduledThreadPoolExecutor线程池

构造函数
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

由于ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,ScheduledThreadPoolExecutor的构造方法都是直接调用其父类ThreadPoolExecutor类的构造方法。

Schedule延迟任务
// 无返回值的延迟任务
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    // decorateTask是一个钩子函数,子类利用它可以对任务进行重构过滤等操作
    RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));
    // 执行任务
    delayedExecute(t);
    return t;
}

// 有返回值的延迟任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    // decorateTask是一个钩子函数,子类利用它可以对任务进行重构过滤等操作
    RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));
    // 执行任务
    delayedExecute(t);
    return t;
}

schedule主要用于执行一次性(延迟)任务,可以看到这两个方法的入参一个是Runnable对象,一个Callable对象。传入的任务会封装成一个RunnableScheduledFuture对象,其实也就是ScheduledFutureTask对象。

scheduleAtFixedRate和scheduleWithFixedDelay周期任务
// 创建一个周期执行的任务,下一次执行时间相当于是上一次的执行时间加上period
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

// 创建一个周期执行的任务,下一次执行时间相当于是上一次任务执行完的系统时间加上period
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

可以看到这两个方法大体上没什么区别,主要区别在于 unit.toNanos方法,scheduleAtFixedRate传的是正值,而scheduleWithFixedDelay传的则是负值,这个值就是 ScheduledFutureTask 的period属性

我们上面介绍延迟任务和周期任务,最后都通过调用delayedExecute()方法来延时执行任务,返回一个ScheduledFuture对象。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 线程池已关闭,执行拒绝策略
    if (isShutdown())
        reject(task);
    else {
        // 任务入队
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            // 移除任务
            task.cancel(false);
        else
            // 启动一个新的线程等待任务
            ensurePrestart();
    }
}

ensurePrestart()方法

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

该方法在父类ThreadPoolExecutor中定义,调用了addWorker方法。线程池中的工作线程是通过该方法来启动并执行任务的。

shutdown取消任务
public void shutdown() {
    super.shutdown();
}

@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    // shutdown的情况下是否继续执行现有延迟任务
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // shutdown的情况下是否继续执行现有周期任务
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                // 取消任务
                ((RunnableScheduledFuture<?>) e).cancel(false);
        // 清除等待队列
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { 
                    // 如果任务已经取消,移除队列中的任务
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    // 终止线程
    tryTerminate();
}

线程池关闭方法调用了父类ThreadPoolExecutor的shutdown()方法,执行shutdown方法时调用了 onShutdown方法。onShutdown方法是ThreadPoolExecutor中的钩子方法,在ThreadPoolExecutor中什么都没有做。

通过这里我们可以看到通过参数(continueExistingPeriodicTasksAfterShutdownexecuteExistingDelayedTasksAfterShutdown)来决定线程池关闭后是否关闭已经存在的任务

总结

ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor,使用了专门的任务类型(ScheduledFutureTask)和专门存储延迟队列(DelayedWorkQueue),为任务提供延迟或周期执行。

ScheduledThreadPoolExecutor被关闭(shutdown)之后支持可选的逻辑来决定是否继续运行周期或延迟任务。