likes
comments
collection
share

并发-AQS之ThreadPoolExecutor源码解读(一)

作者站长头像
站长
· 阅读数 13

ThreadPoolExecutor是Java中一个线程池的实现类,可以用于管理和调度多个线程执行任务。线程池中维护了一个线程池容量,当有任务需要执行时,线程池中的线程会被分配执行任务,执行完毕后又会返回线程池等待下一次执行任务。 整体UML类图如下

并发-AQS之ThreadPoolExecutor源码解读(一)

基础概念

核心参数

构造线程池需要7个参数,下面是ThreadPoolExecutor提供的构造方法

ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)

ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)

ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)

corePoolSize

线程池核心线程数

private volatile int corePoolSize;

maximumPoolSize

线程池最大线程数

private volatile int maximumPoolSize;

keepAliveTime

空闲线程等待工作的超时时间(以纳秒为单位)。当存在超过corePoolSize数量的线程或者允许核心线程超时allowCoreThreadTimeOut时,线程使用此超时时间。否则,它们将永远等待新的工作任务。 它的单位是是由unit确定 线程池成员变量keepAliveTime的单位是纳秒nanoseconds

private volatile long keepAliveTime;

unit

主要讲构造参数keepAliveTime转换成单位为

this.keepAliveTime = unit.toNanos(keepAliveTime);

workQueue

阻塞队列,用来存放任务线程 其提供方法如下

public interface BlockingQueue<E> extends Queue<E> {
    // 如果队列已满,无法插入新元素,则会抛出IllegalStateException异常。否则,将元素插入到队列的尾部,并返回true
    boolean add(E e);

    // 如果队列已满,无法插入新元素,则会返回false。否则,将元素插入到队列的尾部,并返回true
    boolean offer(E e);

    // 用于将指定元素插入到队列尾部。如果队列已满,则会阻塞当前线程,直到有空间可用为止。
    void put(E e) throws InterruptedException;

    // 用于将指定元素插入到队列尾部。如果队列已满,则会等待指定的时间,在超时时间内如果仍无法插入元素则返回false
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    // 用于从队列头部取出并移除一个元素。如果队列为空,则会阻塞当前线程,直到有元素可用为止。
    E take() throws InterruptedException;

    // 用于从队列头部取出并移除一个元素。如果队列为空,则会等待指定的时间,在超时时间内如果仍无法取出元素则返回null
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 用于获取当前队列可用空间的大小
    int remainingCapacity();
    
    boolean remove(Object o);

    public boolean contains(Object o);

    // 将队列中所有元素移除,并将它们添加到指定的集合中。移除的元素数量会作为方法的返回值返回。
    int drainTo(Collection<? super E> c);

    // 将队列中最多 maxElements 个元素移除,并将它们添加到指定的集合中。移除的元素数量会作为方法的返回值返回
    int drainTo(Collection<? super E> c, int maxElements);
}

其在JUC包中的实现类如下

  • ArrayBlockingQueue:一个基于数组的阻塞队列实现,有固定的容量,当队列满时,会阻塞插入操作,直到队列中有空间可用
  • LinkedBlockingQueue:一个基于链表的阻塞队列实现,可以指定容量,如果未指定容量,则默认容量为Integer.MAX_VALUE。插入操作在队列已满时会阻塞,取出操作在队列为空时会阻塞
  • PriorityBlockingQueue:一个具有优先级的阻塞队列实现,元素按照它们的优先级顺序被插入到队列中。如果两个元素具有相同的优先级,则它们将按照它们被插入到队列中的顺序进行比较
  • SynchronousQueue:一个不存储元素的阻塞队列实现,它的每个插入操作必须等待一个对应的删除操作,否则插入操作会一直阻塞
  • DelayQueue:一个具有延迟时间的阻塞队列实现,插入的元素必须实现Delayed接口,队列会按照延迟时间的大小来进行排序,延迟时间最小的元素将被首先取出

threadFactory

线程工厂,主要用于创建线程

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

handler

RejectedExecutionHandler 饱和策略,默认情况下使用AbortPolicy

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

其有四个实现策略

  • AbortPolicy:默认的拒绝策略,当 Executor 无法接受新任务时,会抛出 RejectedExecutionException 异常
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
  • CallerRunsPolicy:当 Executor 无法接受新任务时,会使用调用线程来执行该任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
  • DiscardPolicy:当 Executor 无法接受新任务时,会直接丢弃该任务,不会抛出任何异常。
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
  • DiscardOldestPolicy:当 Executor 无法接受新任务时,会丢弃队列中最早的任务,并尝试重新提交新的任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

其他成员变量

ctl

一个原子整型变量,用于记录线程池的状态和线程数目。 ctl 的高 3 位表示线程池的状态,低 29 位表示线程数目

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

线程池状态

  • RUNNING:线程池正在运行中,可以接受新的任务。
  • SHUTDOWN:线程池正在关闭中,不再接受新的任务,但会继续处理队列中的任务。
  • STOP:线程池已经停止,不再接受新的任务,也不再处理队列中的任务,并会中断正在执行的任务。
  • TIDYING:线程池正在整理线程,即将所有线程都终止,转换为 TERMINATED 状态。
  • TERMINATED:线程池已经终止,不再接受新的任务,也不再处理队列中的任务。
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

RUNNING 状态:当线程池被创建时,初始状态为 RUNNING。 SHUTDOWN 状态:当调用 shutdown() 方法时,线程池会进入 SHUTDOWN 状态。此时线程池不再接受新的任务,但会继续执行队列中的任务。 STOP 状态:当调用 shutdownNow() 方法时,线程池会进入 STOP 状态。此时线程池不再接受新的任务,并中断正在执行的任务。 TIDYING 状态:当线程池完成所有任务后,会进入 TIDYING 状态。在此状态下,线程池会将所有线程终止,并执行一些清理操作,如将工作队列置为 null。 TERMINATED 状态:当线程池完成清理操作后,会进入 TERMINATED 状态。此时线程池已经终止,不再接受新的任务。

allowCoreThreadTimeOut

一个布尔型参数,用于设置是否允许核心线程超时。如果设置为 true,则在任务执行完毕后,空闲的核心线程会在 keepAliveTime 时间内超时退出。

默认情况下,ThreadPoolExecutor 中的核心线程不会超时。即使在没有新任务可执行时,核心线程也会一直保持活动状态。这是为了保证线程池的响应速度,因为创建和销毁线程都需要一定的时间 通过下列方法设置

public boolean allowsCoreThreadTimeOut() {
    return allowCoreThreadTimeOut;
}

largestPoolSize

用于记录线程池中曾经出现过的最大线程数。具体来说,largestPoolSize 记录了线程池中在任何时刻的最大线程数(包括核心线程数和非核心线程数) largestPoolSize 可以用来记录线程池中曾经创建的最大线程数,以便于监控线程池的状态和性能

completedTaskCount

用于记录线程池中已经完成执行的任务数量。具体来说,completedTaskCount 记录了线程池中已经成功执行完成的任务数量,包括已经从工作队列中移除的任务和已经执行完毕的任务。 获取总的执行完成任务需要加上每个work的执行数量

public long getCompletedTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers)
            n += w.completedTasks;
        return n;
    } finally {
        mainLock.unlock();
    }
}

workers

线程池中的工作线程。 Worker 继承自 AQS类,实现了 Runnable 接口。每个 Worker 对象都对应着一个工作线程。 Worker 类实现了 AQS 类的 acquire() 和 release() 方法,用于实现工作线程的锁定和解锁。这些方法的主要作用是保证同一时间只有一个工作线程在执行任务,从而保证线程池的安全性。

final Thread thread;

Runnable firstTask;

volatile long completedTasks;

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
    runWorker(this);
}

protected boolean isHeldExclusively() {
    return getState() != 0;
}

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

关键方法

execute(Runnable command)

将任务异步提交到线程池处理

public void execute(Runnable command) {
    //如果传入的任务是空,抛异常
    if (command == null)
        throw new NullPointerException();
    //获取当前线程池状态
    int c = ctl.get();
    //1. 如果活动线程数<核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //创建核心线程来处理传入的任务
        if (addWorker(command, true))
            return;
        //如果创建失败,说明有其他线程介入
        //再次获取最新线程池状态
        c = ctl.get();
    }
    //2. 如果当前线程池是运行态,并且成功将任务放入工作队列
    if (isRunning(c) && workQueue.offer(command)) {
        //添加任务队列成功
        //再次检查最新线程状态
        int recheck = ctl.get();
        //如果当我们将任务放入队列后出现了:
        //1. 线程池不再是运行态 --> 将任务从队列中剔除,并执行拒绝策略(非运行状态,拒绝接受任务)
        //2. 活动线程数为0 --> 创建非核心线程,处理的任务由非核心线程自主去工作队列中获取
        if (! isRunning(recheck) && remove(command))
            //线程池状态不正确,执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //情况1:RUNNING状态,但设置核心线程数为0,此时任务放入了工作队列,但没人处理
            //情况2:SHUTDOWN状态,但remove任务失败。在此之前工作线程处理完所有任务后全部停止工作,为了保证队列为空,需要再创建一个空任务非核心线程,处理这个队列中的剩余任务
            addWorker(null, false);
    }
    //3. 如果线程池不是运行态,或者是运行态,但任务队列已满,尝试添加非核心线程处理当前任务
    else if (!addWorker(command, false))
        //添加非核心线程失败,执行拒绝策略
        reject(command);
}

整体流图基本如下

并发-AQS之ThreadPoolExecutor源码解读(一)

第一步:如果活动线程数<核心线程数,创建核心线程来处理传入的任务

计算线程数量

private static int workerCountOf(int c)  { 
    return c & CAPACITY; 
}

添加核心线程数addWorker(command, true)

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //如果出现以下情况,将不添加工作线程 
        //1. 线程池非运行态 
        //2. 在1的条件下,线程池为STOP状态(既不是运行态也不是SHUTDOWN态),且传入了任务 
        //3. 在2的条件下,工作队列不为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        //如果线程池状态为运行态,进入该循环,自旋cas尝试增加工作线程数
        for (;;) {
            int wc = workerCountOf(c);
            //创建失败条件: 
            //1. 如果当前活动线程数已经到达了最大线程数 
            //2. 如果请求创建核心线程,但核心线程数已经满了 
            //3. 如果请求非核心线程数,但总线线程数已经满了
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //如果还可以创建,先自旋cas尝试增加工作线程数
            if (compareAndIncrementWorkerCount(c))
                //中断循环退出
                break retry;
            //如果还可以创建,先自旋cas尝试增加工作线程数
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                //如果线程池运行状态变化(变成非运行态)重新进入retry循环查看条件
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新的Worker,Worker封装了线程、任务 
        //初始Worker的state是-1
        w = new Worker(firstTask);
        final Thread t = w.thread;
        //健壮性判断
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                //1. 如果是运行态 
                //2. SHUTDOWN态且传入任务为空,这个情况创建线程的目的是处理工作队列中剩余任务。
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker主要是添加工作线程,然后再让线程跑起来,我们看下Worker的run方法

public void run() {
    runWorker(this);
}

最终调用的runWorker方法

final void runWorker(Worker w) {
    //获取当前线程环境
    Thread wt = Thread.currentThread();
    //取出Worker封装的初始任务
    Runnable task = w.firstTask;
    //将Worker封装的初始任务清空
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //循环获取可执行的任务,进入循环的条件:
        //1. 如果worker中有初始任务
        //2. 获取到工作队列中的任务
        while (task != null || (task = getTask()) != null) {
            //线程是临界区资源,处理任务的过程需要将线程上锁
            w.lock();
            //1.如果线程为STOP状态,或者TIDYING、TERMINATED状态,那么就将线程中断(将中断标记设为true)
            //2.如果线程中断标记为真,且状态此时变为上述状态,那么就将线程中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //如果线程没被中断,为正常运行态
                //执行任务前的钩子函数,注意,钩子函数抛出的异常是没人管的,会直接进入到finally代码块
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //在当前线程环境执行任务的run()方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务后的钩子函数
                    afterExecute(task, thrown);
                }
            } finally {
                //处理完任务,将task置空,等待GC
                task = null;
                //累计完成任务数+1
                w.completedTasks++;
                //处理任务完成,将线程资源释放
                w.unlock();
            }
        }
        //如果执行到这里之前有人抛出异常,且无人捕获,则不执行这句,直接进入finally
        //(工作线程 task.run() 即使抛了异常,也被捕获了。只有可能在 getTask,w.lock(), 钩子函数 等无人捕获异常的地方抛出异常,才有可能跳过这句话的执行,直接进入finally)
        //如果执行到这里都没人抛出异常,或者抛出的异常全都被捕获过,则执行这一句
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

上述方法有几个重要的方法,我们先看getTask()

private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        //获取线程池最新状态
        int c = ctl.get();
        int rs = runStateOf(c);

        //如果线程非运行态
        //如果
        //1. 是STOP、TIDYING、TERMINATED状态
        //2. 是SHUTDOWN状态,且工作队列为空
        //则自旋cas减小工作线程数,并return null,让所在线程获取任务失败,退出获取任务的循环,从而结束线程。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
	//如果线程为运行态
        //或者是SHUTDOWN状态,但工作队列不为空
        //就需要将任务取出处理
        int wc = workerCountOf(c);
        
	//allowCoreThreadTimeOut 一般都是 false
        //判断当前线程是否有限时,是需要判断当前线程是否为核心线程的,这里用了很聪明的方法:判断工作线程数是否大于核心线程数
        //如果工作线程数小于核心线程数,说明当前线程一定是核心线程,那么不限时
        //反之,就直接认定进入到当前判断的是非核心线程,让它限时。
        //注意,这里没有上锁
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
	//如果工作线程超过了最大值,或者当前线程超时了,那么当前线程应当被废弃
        //额外条件:且此时 有别的工作线程(有别人处理任务) 或者 工作队列为空(没有任务需要处理)
        //那么当前线程就确定可以废弃,return null,结束当前线程。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //CAS减小工作线程数,不取出任务
            if (compareAndDecrementWorkerCount(c))
                return null;
            //自旋
            continue;
        }
        //如果当前线程可用
        try {
            //如果限时,就通过限时方式poll()获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            //如果不限时,直接阻塞获取任务
            workQueue.take();
            if (r != null)
                //如果拿到了任务,直接返回执行
                return r;
            //进到这里说明超时了还没拿到任务
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

如果线程获取工作队列任务失败,或者被判定为当前线程应当结束,将会退出 runWorker() 的 while 循环,并进入 processWorkerExit() 处理线程结束的工作

//移除当前工作线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果执行了 completedAbruptly=false 语句,说明当前线程异常退出了。需要将线程池工作线程数更新-1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    //有线程退出的时候,将要操作 workers 这个非线程安全集合,它是临界区资源
    //对临界区资源的 检查+修改,需要原子性操作,需要上锁。对 mainLock 上锁。 
    // mainLock 逻辑上用于 workers 资源的上锁, 这里顺便同时也对 completedTaskCount 这个互斥资源进行逻辑上的上锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //工作记录,统计线程池总完成任务数
        completedTaskCount += w.completedTasks;
        //将该线程从 workers 集合中取出
        workers.remove(w);
    } finally {
        //释放对 workers 资源的 mainLock 锁
        mainLock.unlock();
    }

    //释放线程之后,尝试终止线程池
    //终止条件:SHUTDOWN状态且工作线程数为0且工作队列为空 || STOP状态且工作线程数为0
    tryTerminate();

    //获取当前线程池状态
    int c = ctl.get();
    //如果线程池状态为 RUNNING 或者 SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        //如果是正常状态溢出当前线程,进入if方法块
        if (!completedAbruptly) {
            // 核心线程数最小值允许多少?
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //工作队列不为空,设置工作线程的最小值为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //如果工作线程数大于等于工作线程最小值,说明还有工作线程在线程池中,那就return,什么也不干。
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //1. 如果是不正常的方式移除了当前工作线程,再补一个工作线程!
        //2. 线程池工作队列不为空,并且没有工作线程,再添加一个工作线程。
        addWorker(null, false);
    }
}

上述在线程退出后尝试终止线程池tryTerminate 如果当前状态为 SHUTDOWN 或者 STOP ,而且满足变为 TIDYING 状态的条件,就CAS一次尝试将状态变为 TIDYING 。如果变为 TIDYING 成功,将继续变为 TERMINATED。在真正变为 TERMINATED 之前,会给一个钩子函数 terminated() 由外界/子类 实现,处理变为 TERMINATED 状态之前的最后操作。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //如果是运行态,就不终止线程
        if (isRunning(c) ||
            //如果已经是TIDYING状态了,就不作处理
            runStateAtLeast(c, TIDYING) ||
            //如果是SHUTDOWN,但是工作队列还有任务,不作处理( SHUTDOWN需要到工作队列为空的时候,才可以变成TIDYING)
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //如果是SHUTDOWN或者STOP状态,但是工作线程还没清空
        if (workerCountOf(c) != 0) { // Eligible to terminate
            //中断一个当前运行的线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        //如果可以变为TIDYING状态,上锁,因为terminated中的操作可能会是临界区资源
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //自定义处理线程池终止前的最后操作
                    terminated();
                } finally {
                    //最终设置为TERMINATED线程池状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

最后我们看下addWorker中添加工作线程失败的addWorkerFailed方法

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            //移除工作线程
            workers.remove(w);
        //工作线程数量减一
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

第二步:活动线程数>核心线程数且是运行态,添加队列中

关键方法前面已经贴了,再贴一下

if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}

其他方法前面已经解读过,我们看下reject(command)

final void reject(Runnable command) {
    //调用丢弃策略处理
    handler.rejectedExecution(command, this);
}

第三步:如果线程池不是运行态,或者是运行态,但任务队列已满,尝试添加非核心线程处理当前任务

else if (!addWorker(command, false))
    reject(command);

submit(Runnable task, T result)

用于提交一个 Runnable 任务,并返回一个 Future 对象,该对象可以用于获取任务执行的结果。 主要包含下面3个方法

//AbstractExecutorService
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

其核心方法是newTaskFor,最主要实现是FutureTask

//AbstractExecutorService
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

FutureTask 是 Java 中的一个类,实现了 Future 和 Runnable 接口,可以用于异步执行一个任务,并在任务完成后获取其结果。FutureTask 可以用于手动创建一个 Future 对象,或者作为 ThreadPoolExecutor 的任务提交到线程池中执行。 后续再阅读FutureTask

本文篇幅较长,后续内容请在下面网页查看 并发-AQS之ThreadPoolExecutor源码解读(二)

JDK中实现AQS简介

同步工具与AQS关联详细介绍
AQS原理讲解AQS原理介绍并发-AQS原理讲解
ReentrantLock使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。AQS之Reentrantlonk源码解读
Semaphore使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。Semaphore 源码分析以及AQS共享加解锁
CountDownLatch在多线程并发执行任务时,有时需要让某些线程等待某些条件达成后再开始执行,这时就可以使用CountDownLatch来实现CountDownLatch 源码分析
ThreadPoolExecutor创建线程池中的工作线程worker继承AQS,实现独占资源参考 并发-AQS之ThreadPoolExecutor源码解读(一)
CyclicBarrier多个线程等待彼此达到一个共同的屏障点,然后同时继续执行。并发-AQS之CyclicBarrier源码解读
ReentrantReadWriteLock可重入读写锁,它允许多个线程同时读取一个共享资源,但只允许一个线程写入该共享资源。参考 并发-AQS之ReentrantReadWriteLock源码解读(一)