likes
comments
collection
share

ScheduledThreadPoolExecutor 介绍

作者站长头像
站长
· 阅读数 1

ScheduledThreadPoolExecutorThreadPoolExecutor 的子类,提供了对定时任务,周期任务的支持。 要理解 ScheduledThreadPoolExecutor 需要先熟悉

Executors 提供静态方法创建 ScheduledThreadPoolExecutor


private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue()); // 最大可以存储 Integer.MAX_VALUE 个的 Runnable 
}

ScheduledThreadPoolExecutor 构造函数 ScheduledThreadPoolExecutor 介绍

ScheduledThreadPoolExecutor 没有提供

  • 更改线程池最大线程数 maximumPoolSize(默认为 Integer.MAX_VALUE )
  • 更改非核心线程没有任务时线程保留时间 keepAliveTime (默认为 10 毫秒)
  • 更改阻塞队列 workQueue (默认 DelayedWorkQueue) 的构造函数

接口介绍


public interface ScheduledExecutorService extends ExecutorService {
    
    // 提交一个延迟 delay 时间的执行一次性任务 Runnable 
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 提交一个延迟 delay 时间的执行一次性任务 Callable 
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    // 提交一个周期性任务,会按照固定的时间间隔来执行任务。
    // 会按照固定的周期进行调度,跟任务的执行时间无关(除非执行时间大于周期时间,延误了下一个周期的执行)任务之间开始执行的时间是固定的
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    // 提交一个周期性任务,方法会在每次任务执行完成后,再等待固定的延迟时间后再次执行任务。
    // 即任务的执行间隔是根据上一次任务完成的时间来确定的,而不是按照固定的周期。 
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

// ScheduledFuture 继承了 Delayed ,Future
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

// Delayed 继承 Comparable
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

// RunnableScheduledFuture 继承了 RunnableFuture ,ScheduledFuture
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    boolean isPeriodic();
}

提交任务

ScheduledThreadPoolExecutor 提交任务都会包装成 ScheduledFutureTask


public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor 
             implements ScheduledExecutorService {
    // 覆写 execute 方法 
    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS); // 调用 schedule 方法 
    }
    
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
         // 创建  ScheduledFutureTask 对象 
        RunnableScheduledFuture<Void> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit),
                                          // 每次添加一个 command 序列号 +1 
                                          sequencer.getAndIncrement())); 
        delayedExecute(t); // 延时执行
        return t;
    }
    // 
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        // 检查参数正确性                                           
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0L)
            throw new IllegalArgumentException();
        // 创建  ScheduledFutureTask     
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period),
                                          sequencer.getAndIncrement());
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t); // 延时执行
        return t;
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
         // 检查参数正确性                                                  
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0L)
            throw new IllegalArgumentException();
        // 创建  ScheduledFutureTask  ,请注意 period 为负数 
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          -unit.toNanos(delay),
                                          sequencer.getAndIncrement());
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t; 
        delayedExecute(t); // 延时执行
        return t;
    }


    
    // 换算成纳秒 
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
    // 当前时间 加上 延迟时间 ,就是任务应该执行的时间 
   long triggerTime(long delay) {
     return System.nanoTime() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }


    // 直接返回 task ,用于子类覆写 decorateTask
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }
    
    // 延迟执行任务 
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown()) // 如果线程池关闭了,直接拒绝 task ,SHUTDOWN 状态时不添加任务 
            reject(task);
        else {
            super.getQueue().add(task); // 把任务添加到任务队列之中 
            // 检查线程池状态,如果当前状态不应运行,移除任务 
            if (!canRunInCurrentRunState(task) && remove(task))
                task.cancel(false);
            else
                ensurePrestart(); // 开启线程  
        }
    }
    
    boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
        if (!isShutdown()) // 当  RUNNING 状态时 不取消任务运行 
            return true; 
        if (isStopped()) // 当 STOP ,TIDYING ,TERMINATED 状态时 ,取消任务运行 
            return false;
        // 在 SHUTDOWN 状态时 
        return task.isPeriodic() // 判断是否周期性任务 
              // 在 SHUTDOWN 状态时 是否继续周期性任务,默认为 false 
            ? continueExistingPeriodicTasksAfterShutdown  
             // 在 SHUTDOWN 状态时 是否继续执行延迟任务 默认为 true 
            : (executeExistingDelayedTasksAfterShutdown 
               || task.getDelay(NANOSECONDS) <= 0); 
    }
    
    
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get()); // 获取当前的线程数量 
        if (wc < corePoolSize) // 如果当前线程数小于核心线程数 
            addWorker(null, true); // 创建新核心线程 
        else if (wc == 0)  // 如果当前线程数为 0 
            addWorker(null, false); // 创建一个非核心线程,用于完成任务 
    }
}

ensurePrestart() 方法 会调用 addWorker(关于 addWorker 方法请查看 ThreadPoolExecutor ) 开启了线程,就会调用 getTask() 获取 Runnable 进行执行 ,getTask() 会去阻塞队列中 workQueue.take() 获取 ,ScheduledThreadPoolExecutor 的 workQueue 是一个优先队列,take() 时总是会拿距离执行时间最近的任务

DelayedWorkQueue 数据结构

DelayedWorkQueue是一个优先队列,使用二叉堆的数据结构 ,二叉堆是一棵完全或近似完全二叉树 ,满足特性每节点的父节点都小于等于两个子节点,请注意左右子节点没有大小顺序

二叉堆使用数组进行存储,数组索引 0 存储的根节点 a ,索引 1 和 2 分别存储根节点的两个左右孩子 e 和 g,索引 3 和 4 存储 e 的左右孩子 n 和 h ,以此类推 ...

存储结构如下图

ScheduledThreadPoolExecutor 介绍

那么可以得出以下结果

  • 当一个节点存储的索引为 k 那么这个节点的父节点的索引的 k-1 / 2
  • 当一个节点存储的索引为 k ,那么这个节点的两个左右子节点分别为 2k + 1 ,2k+2

优先队列有两个操作

  • 插入数据
    • 当我们要在上图数据中添加一个数据为 x, 当前数组常长度为 11 ,要新插入的索引为 11 ,找到该索引的父节点索引为 (11-1)/2 = 5 ,索引值为 5 的健值为 o , o<x 满足二叉堆的特性 queue [11] = x 就行 如下图

ScheduledThreadPoolExecutor 介绍

  • 当我们要在上图数据中添加一个数据为 c, 当前数组常长度为 11 ,要新插入的索引为 11 ,找到该索引的父节点索引为 (11-1)/2 = 5 ,索引值为 5 的健值为 o , o>c 不满足二叉堆的特性,需要进行替换 queue [11] = o ,再查找索引为5 的父节点索引为2 ,索引2的健值为 g , g>c 还是不满足二叉堆的特性 设置 queue [5] = g ,再查找索引为2 的父节点索引为 1 索引1的健值为 a ,a<c 满足二叉堆的特性 设置 queue [2] = c ,整个添加过程就完成了(此过程称为 siftUp ) 如下图

ScheduledThreadPoolExecutor 介绍

  • 删除最小数据(根节点)
    • 当我们要在上图数据中删除根节点 a,当前数组常长度为 11 ,取出当前最后一个节点 queue [10] 的值 r ,数组长度 -1 ,再把 r 放置合适的位置 ,查找根节点的两个子节点 找出两个节点中最小的为 e (索引为 1) 放在根节点 ,再查找 索引为 1 中的两个节点中最小的为 h (索引为 4) 把 h 放到 索引为 1 的位置 再次查找 索引为 4 的子节点 只有一个为 t 比较 t 和 r的值 r < t 把 r 放到索引为 4 的位置 整个添加过程就完成了(此过程称为 siftDown ) 如下图

ScheduledThreadPoolExecutor 介绍


static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {

    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 阻塞队列默认为 16 
    private final ReentrantLock lock = new ReentrantLock();
    private int size;
   
    // Leader-Follower模式的一般思想是有一组线程,其中一个线程(领导者)主动执行工作,而其他线程(追随者)等待。
    // 当领导者完成工作时,它会通知其中一个追随者接管成为新的领导者。这种模式可以帮助减少争用并在某些场景中提高性能。
    private Thread leader;
    private final Condition available = lock.newCondition();
    
    // Runnable 添加到队列
    public boolean add(Runnable e) {
        return offer(e);
    }
    
    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            if (i >= queue.length)  // 如果队列的大小大于等于数组长度 
                grow();  // 需要扩大数组的长度 
            size = i + 1;
            if (i == 0) { // 当数组中没有任务时 
                queue[0] = e; // 根节点
                setIndex(e, 0);
            } else {
                siftUp(i, e); // 插入数据 ,上浮 
            }
            if (queue[0] == e) {
                leader = null;
                available.signal(); 
            }
        } finally {
            lock.unlock();
        }
        return true;
    }
    // 更新 RunnableScheduledFuture 的索引数 
    private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
        if (f instanceof ScheduledFutureTask)
            ((ScheduledFutureTask)f).heapIndex = idx;
    }

    // 这是一个不断上浮的过程
    // k 为插入数据的索引 ,key 要插入的健值 
    private void siftUp(int k, RunnableScheduledFuture<?> key) {
        while (k > 0) { // k 不为的根节点索引 
            int parent = (k - 1) >>> 1;  // 找到该节点的父节点索引  
            RunnableScheduledFuture<?> e = queue[parent];
            // 如果要插入的健值,大于等于该索引节点的父节点的健值 
            // 说明符合二叉堆的规则,找到了 key 插入的索引值 退出循环 
            if (key.compareTo(e) >= 0) 
                break;  
            // 如果要插入的健值,小于该索引节点的父节点的健值,需要把父子节点的健值存储在 k 索引   
            queue[k] = e;  
            setIndex(e, k);
            k = parent;  // 然后 k 赋值为父节点的索引 ,继续查找父节点 
        }
        queue[k] = key; // 赋值 
        setIndex(key, k);
    }
    
    // 当数组中任务存满时,扩展数组长度 
    private void grow() {
        int oldCapacity = queue.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
        if (newCapacity < 0) // overflow
            newCapacity = Integer.MAX_VALUE;
        queue = Arrays.copyOf(queue, newCapacity); // 把旧任务复制到新数组里面 
    }
    
 // 获取任务
 public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 获取队头任务(根节点),延迟时间最小的任务 
            RunnableScheduledFuture<?> first = queue[0]; 
            if (first == null) // 任务为空时  
                available.await(); // 阻塞 
            else {
                 // 如果任务不为 null ,获取当前还需要延迟多长时间执行 
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)  // 如果小于等于 0 ,已经到了执行时间了 
                    return finishPoll(first); // 取出任务执行 
                first = null; // don't retain ref while waiting
                // leader 不为 null ,说明已有其他线程是 leader线程 Followers无限时等待  
                if (leader != null)   
                    available.await();  // 阻塞  
                else {
                    // 设置当前获取任务的线程为 leader线程 
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay); // leader线程 限时等待 
                    } finally {
                       // 被唤醒,或者等待时间到 ,leader 置为 null 
                        if (leader == thisThread) 
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // leader 为 null ,并且队列不为 null,唤醒其他 Followers 线程作为leader线程 
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

  // 取出任务 
 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f){
        int s = --size;
        // 取出数组中最后一个任务  x 
        RunnableScheduledFuture<?> x = queue[s];
        queue[s] = null; 
        if (s != 0)
            siftDown(0, x); // 下沉,要移除的索引 0(根节点) 
        setIndex(f, -1);
        return f;
    }
    
     // k 需要移除的数组索引 ,key 优先队列中队尾的键值  
    private void siftDown(int k, RunnableScheduledFuture<?> key) {
        // 一开始选择的节点k是堆的根节点,随着操作的进行,k会沿着堆的树形结构向下移动。
        // 当k向下移动到大于等于堆大小的一半时,它已经是叶子节点了,叶子节点不需要向下移动
        // siftDown过程就结束了 这样,可以避免不必要的计算和移动操作,提高代码的性能。
        int half = size >>> 1; 
        while (k < half) {
            int child = (k << 1) + 1; // k 节点的左孩子节点 
            RunnableScheduledFuture<?> c = queue[child];
            int right = child + 1; // k 节点的右孩子节点 
            // 如果右孩子节点小于堆大小,左节点大于右节点 
            if (right < size && c.compareTo(queue[right]) > 0)
               // c 取较小的节点键值 ,并把 child 赋值为较小节点的索引 
                c = queue[child = right];  
            if (key.compareTo(c) <= 0) // 如果要插入任务的键值小于等于 c 跳出循序 
                break;
            queue[k] = c;  // k 索引的置为较小的孩子节点的键值 
            setIndex(c, k);
            k = child;   // 更新 k 索引 
        }
        queue[k] = key;  // key 找到正确的索引位置 
        setIndex(key, k);
    }

}

执行 ScheduledFutureTask

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    private final long sequenceNumber;

    private volatile long time;

    private final long period;

    RunnableScheduledFuture<V> outerTask = this;

    int heapIndex;
    
    ScheduledFutureTask(Runnable r, V result, long triggerTime,
                        long sequenceNumber) {
        super(r, result);  // 调用 FutureTask 构造函数 
        this.time = triggerTime;
        this.period = 0; // 非周期性任务 period 为 0 
        this.sequenceNumber = sequenceNumber;
    }

    ScheduledFutureTask(Runnable r, V result, long triggerTime,
                        long period, long sequenceNumber) {
        super(r, result);  // 调用 FutureTask 构造函数 
        this.time = triggerTime;
        this.period = period;  // 下次运行的周期 
        this.sequenceNumber = sequenceNumber;
    }

    // 获取距当前时间还需延迟多长时间执行  
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - System.nanoTime(), NANOSECONDS);
    }

    // 实现比较器 
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
         // 如果是 ScheduledFutureTask 直接通过执行时间进行比较 
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        // 通过 Delayed接口 进行比较
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    // 不为 0 就是周期性任务 
    public boolean isPeriodic() {
        return period != 0;
    }
    // 下次运行时间 
    private void setNextRunTime() {
        long p = period;
        if (p > 0) // scheduleAtFixedRate 添加的任务 
            time += p;  // 下一个任务执行时间是 上个任务的执行时间加上延迟时间 
        else  // scheduleWithFixedDelay 添加的任务 
            time = triggerTime(-p); // 下一个任务执行时间是 当前时间加上延迟时间 
    }
    // 取消任务 
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = super.cancel(mayInterruptIfRunning);
        if (cancelled && removeOnCancel && heapIndex >= 0)
            remove(this);
        return cancelled;
    }

    public void run() {
        // 如果当前状态不能运行,取消任务 
        if (!canRunInCurrentRunState(this)) 
            cancel(false);
        else if (!isPeriodic())
            super.run(); // 如果不是周期性任务直接运行 
        else if (super.runAndReset()) { // 周期性任务 ,运行任务 ,并把状态重置 
            setNextRunTime(); // 设置周期性任务下次运行时间 
            reExecutePeriodic(outerTask); // 再把当前任务添加添加到线程池 
        }
    }
}

 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
     if (canRunInCurrentRunState(task)) { // 检查当前线程池状态是否能执行
         super.getQueue().add(task); // 添加到任务队列 
         if (canRunInCurrentRunState(task) || !remove(task)) {
             ensurePrestart(); // 确保线程已创建 
             return;
         }
     }
     task.cancel(false);
 }

总结

  1. ScheduledThreadPoolExecutor 最大线程数 maximumPoolSize 没有作用 ,不会增加非核心线程

    • ScheduledThreadPoolExecutor 核心线程数设置为 0 ,会创建一个非核心线程数,进行完成任务。
    • ScheduledThreadPoolExecutor 核心线程数大于 0 ,只会创建核心线程 。
  2. ScheduledThreadPoolExecutor 的工作队列DelayedWorkQueue 是按执行时间作为比较多优先队列,使用二叉堆的数据结构,每次 take 任务时都是拿根节点的任务(距离执行时间最近的)

  3. scheduleAtFixedRate 和 scheduleWithFixedDelay 是通过设置 period 进行判断 一个大于 0 ,一个小于 0 。scheduleAtFixedRate 按照第一个任务设置的执行时间按照 period 时间累加执行,不考虑执行任务花费的时间,scheduleWithFixedDelay 是按上次任务执行完成的时间延迟 period 时间执行