likes
comments
collection
share

ThreadPoolExecutor源码解析

作者站长头像
站长
· 阅读数 2

使用案例

ThreadFactory threadFactory = ThreadFactoryBuilder.create().get();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                3, 5, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println("ThreadPoolExecutor");
            });
        }
        threadPoolExecutor.shutdown();

上面代码是线程池的使用方式,下面我就把代码一步一步拆解,从不同的维度去分析线程池的底层运作。

线程池的结构

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

无关紧要的判断不用管它,主要看他涉及到的几个参数:

  • corePoolSize:核心线程数,也就是一直存在的线程数量
  • maximumPoolSize:最大线程数,核心线程数 + 非核心线程数 = 最大线程数
  • keepAliveTime:当非核心线程空闲要一定时间之后要被回收(也就是你的老板允许你多久不干活,到了这个时间就要开除你了)
  • unit:空闲时间单位,用来修饰keepAliveTime的
  • workQueue:当核心线程都被占用的时候,用来存放任务的队列
  • threadFactory:用来创建线程的工厂(不重要)
  • handler:当任务队列满并且没有空闲的线程可以去处理你提交的任务的时候,要触发的拒绝策略,我这里传的是AbortPolicy,会报错处理。

OK,介绍完这些参数之后,我们还要搞清楚线程池里面用来处理逻辑的几个重要参数:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

这些参数涉及到了二进制的位运算,你如果是小白的话我这只言片语也讲不明白,如果你是大白,那我也没必要讲这个。但是你先别急,这个位运算的具体的值你没必要知道,因为这几个值的主要作用就是用来比大小的,我只需要告诉你哪个大哪个小就行了啊。先看下第1行代码: private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 在这行代码的上方有作者给我们加的注释,你们可以直接看源码。我这里就用大白话来解释了: 这个参数是用来表示两个意思,一部分用来表示当前线程池中已存在的线程数量,另一部分表示当前线程池的状态。

再看下5 ~ 10行代码,表示线程池中一共有几个状态:

  • RUNNING:线程池正在运行,可以正常的接受新任务;可以处理任务队列中的任务
  • SHUTDOWN:线程池正在被外部安全关闭,不能接受新的任务了;可以处理任务队列中的任务
  • STOP:线程池被强制关闭,不能接受新的任务,同时也不能处理队列中的任务
  • TIDYING:队列中已经没有任务,或者线程池中已经没有线程了,线程池马上就要被终止了的过渡状态
  • TERMINATED: 线程池被终止,不能再干活了。

上面的5种状态前面的比后面的小:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED。 最后再看两个重要的方法:

private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
private static int workerCountOf(int c)  { return c & COUNT_MASK; }

不要去关注里面的位运算逻辑,你只需要知道runStateOf方法是用来获取当前线程池的状态的;workerCountOf方法是用来获取工作线程个数的。好,概念性的东西看到这里就可以了,再看就乱了,下面开始进入今天的重点。

用线程池执行任务的时候是调用的execute方法,通过execute方法其实就能把线程池的整个执行流程树立出来,我们先解析execute方法梳理出来整个流程,然后再一层一层的往底层分析到每一个细节。

线程池的执行流程

执行线程池的入口是execute方法,在这个方法中串联了线程池执行流程。下面是execute方法的源码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

我们按顺序先看一下第5行的if,在第4行获取了全局变量ctl,接着调用了workerCountOf方法从ctl中获取了线程的数量判断是不是小于核心线程数,如果小于的话,就调用addWorker(command, true)方法创建核心线程。如果addWorker方法返回false,说明核心线程满了无法创建了,那么就看第8行代码,又重新获取了一遍ctl值,为什么这里又重新获取一次呢?你想,他在代码的第5行判断当前线程数是否小于核心线程数的时候判断通过了,说明核心线程数还没满对吧,但是当他在创建核心线程的时候发现满了,也就意味着此时此刻有其他任务在跟我同时创建线程,并且其他任务已经先我一步创建满核心线程了,那么此时ctl的值肯定变了,所以要再重新获取一遍最新的ctl值。

再看第10行的if,先判断当前线程池是否是RUNNING状态,再把当前的任务放到队列里面。如果入队成功了会再次获取ctl值判断线程池是否是RUNING状态,如果不是RUNNING状态的话会把任务从队列中删掉,然后再触发拒绝策略。 如果是RUNNING状态则会判断当前的线程数量是不是0,如果是0的话就调用addWorker(null,false)。这里有没有疑惑,前面核心线程都满了,这里为啥又没线程了?别急,这里留个问题,后面会分析到,你只需要知道这种情况是会发生的;

如果入队失败说明队列满了,在第17行又调用了一次addWorker(command, false)方法创建非核心线程,如果非核心线程也创建失败了呢?那就会触发拒绝策略,也就是执行第18行。

我们来梳理一下他的整个流程:

  1. 在执行任务的时候先判断当先线程池内的线程是否大设置的于核心线程数,如果不大于就创建核心线程。
  2. 如果大于核心线程或这个在创建核心线程失败的时候会把任务入队。
  3. 如果入队失败,就去创建非核心线程。
  4. 如果非核心线程也创建失败,触发我们设置的拒绝策略。

以上的流程中会涉及到addWorker(command, true)addWorker(null,false)addWorker(command, false)reject(command)这几个方法,其中addWorker方法调用次数最多,但是每次传的参数都不一样。下面就带着今天的案例来点来分析一下这个方法。

addWorker()方法解析

这个方法有三种场景,一种是核心线程,一种是非核心线程,还有一个是参数传null的。

核心线程流程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();

                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

一上来就是嵌套的两层死循环,咱们一行一行的看:

ThreadPoolExecutor源码解析

在外层循环获取了ctl值调用runStateAtLeast方法判断当前线程的状态是否大于等于SHUTDOWN,我们才刚开始执行任务什么都没干了,状态肯定是RUNNING,if不满足,再往下走进入内层循环,调用workerCountOf方法获取到线程数量判断是否大于等于核心线程数3,不满足,再看绿色框内的代码,通过CAS的方式将线程数+1,如果CAS失败了则继续内循环,如果成功了就跳出外循环,继续走后面的逻辑。这里有一点要注意,什么情况下会导致CAS失败呢?那肯定是出现了并发安全问题嘛,肯定是多个线程都在同时CAS,成功的那个线程就继续走,不成功的那个线程就继续重试,目的就是要保证线程数量是正确的。再看循环后面的代码:

ThreadPoolExecutor源码解析

在第1段代码中new了一个Worker对象,将你要执行的任务传了进去。

在第2段代码中判断线程池的状态是否是RUNNING,如果是,就把worker对象放到了一个workers容器中,看一下workers具体是个啥:

ThreadPoolExecutor源码解析 这个HashSet其实就是用来保存线程的,我们总说线程池线程池的,这个池子到底长什么样子,其实他就是个HashSet。

再看第3段代码:t.start(),这个t是什么?往上看,t是从Worker对象里面拿出来的一个Thread。先不管Worker长啥样,敢肯定的一点是这个Thread就是最后执行任务的线程,调用t.start()之后肯定会执行他里面的runnable对象的run方法。这里涉及到Worker对象

Worker对象

Worker对象的结构

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    Runnable firstTask;
}

通过这段代码我们可以发现几个点:

  1. Worker实现了Runnable接口,是一个可被Thread执行的任务。
  2. 在Worker对象内部维护了两个重要的属性,一个是执行任务的线程,另外一个是线程要执行的任务 再来看一下构造器:
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

在new Worker的时候会创建一个线程,把自己作为Runnable传给了thread。也就是说当执行t.start()方法的时候,会调到Worker的run()方法。那就看一下run方法做了什么,跳过套娃的代码,直接点到runWorker方法: ThreadPoolExecutor源码解析 第1段代码从当前的Worker对象中拿到要执行的任务,注意,这个任务是在创建Worker对象的时候就赋予的,不是任务队列里面的。你就理解为,你公司把你招过去,直接就丢给了你一个任务,这个任务属于手头上的任务,后面分配的任务在公司的一个需求清单里面等着呢。你手头上的任务就相当于这里的firstTask,需求清单就是任务队列。

第2段代码先校验task对象是否为null,此时task的值是从Worker里面拿出来的,肯定不是null,然后就到了第3段代码,调用task的run方法,也就是调到了我们自己写的这段代码:

threadPoolExecutor.execute(() -> {
    System.out.println("ThreadPoolExecutor");
});

在执行完我们自己传进去的任务之后,在第4段代码的地方把局部变量task改为null了,也就是说在下次while循环的时候会执行(task = getTask()) != null这段逻辑,如果getTask方法返回null的话就会结束循环,这个线程就结束了。我们要知道一个点,就是线程池里面缓存了很多线程,可以达到线程复用,所以不可能就这么轻易的结束掉,那么怎么能让线程不结束呢?要么处理任务,要么一直让他阻塞处于等待状态。

getTask:

ThreadPoolExecutor源码解析 在代码段1的位置还是先拿到ctl的值,在代码2中获取线程数量,代码 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 中涉及到一个allowCoreThreadTimeOut变量,我们没有给他赋任何值,所以默认是false。回去判断当前的线程数量是否大于核心线程数。现在肯定是不大于的,所以timed是false。 代码3目前还不会走,先忽略,直接看代码4,因为timed是false,所以会执行workQueue.take(),从队列中获取任务,这个队列就是我们在创建线程池的时候传的那个阻塞队列,还有印象吧,就是这个代码: ThreadPoolExecutor源码解析 既然是阻塞队列,那么让队列中没有任务的时候执行workQueue.take()肯定会阻塞线程,直到获取到任务为止。

非核心线程流程

如果你真的把创建核心线程场景的流程看明白的话,非核心线程场景看起来就容易了很多,他们的整体流程其实都一样,不同的是中间会有几个参数判断不一样

ThreadPoolExecutor源码解析 当core为false的时候,在addWorker方法中唯一的区别就是工作线程数量是和最大线程数量做判断的,如果小于最大线程数则通过CAS的方式把线程数量+1,不成功就一直重试。后面的就都一样了,当CAS成功之后就创建一个Worker对象,里面封装任务和线程,最后调用t.start()

getTask()

其实从这里也发现一个点,就是当释放非核心线程的时候其实是通过CAS竞争,竞争成功的就被释放,竞争失败的重试,最后留下来的就是核心线程。在线程池内部是没有对核心线程和非核心线程做明确区分。

任务为null的线程流程

在文章的最开始分析execute方法的时候我们留了一个问题:调用addWorker(null,false)的时候第一个参数传了null,也就是说我要创建一个Worker,但是没有给他分配任务。那他是如何执行的呢?如果你真的把前面的内容看的很明白的话,这个问题相信都不用解析了,其实答案就在runWorker方法里面: ThreadPoolExecutor源码解析 当创建了线程但是没有给他分配任务的时候,就会调用getTask()方法从队列中去拿任务。

线程池的关闭

shoutdown()

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

当我们调用shutdown方法来关闭线程池的时候会

  1. 在第6行将线程池的状态以CAS的方式从RUNNING改为SHUTDOWN
  2. 在第7行给线程池中的所有能拿到锁的Worker发送中断信号(拿不到锁说明有Worker正在执行任务,拿到锁说明这个Worker是空闲的):t.interrupt()

此时再有新的任务提交进来的时候,会先判断线程池的状态: ThreadPoolExecutor源码解析 如果状态是大于等于SHUTDOWN并且firstTask不为null的话说明线程池正在被关闭,但是还有新的任务进来,那我就不处理了。直接返回false。

这个过程不影响正在执行的线程,因为shoutdown操作会和runWorker竞争同一把锁,如果shoutdown拿到锁说明这个Worker是空闲的;如果runWorker拿到了锁说明Worker正在执行任务,无法发送中断信号interrupt。

ThreadPoolExecutor源码解析

ThreadPoolExecutor源码解析

以上加锁的方式是针对线程还在运行的场景,但是在getTask()方法中,假设线程正在workQueue.take()workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)的时候就会不起作用。所以在代码中有一个异常的包裹: ThreadPoolExecutor源码解析 当发起中断信号interrupt的时候,正在阻塞的线程会触发InterruptedException异常,把timedOut改为false,重新循环

ThreadPoolExecutor源码解析 此时再执行到这个if的时候会判断线程池的状态是否大于等于SHUTDOWN并且队列是否为空,如果队列有任务还会继续处理,如果队列没有任务则return null,线程结束。

也就是说当线程池处于shutdown状态的时候不会处理新的任务,但是会把队列中已有的任务处理完毕。

shoutdownNow()

ThreadPoolExecutor源码解析

ThreadPoolExecutor源码解析

ThreadPoolExecutor源码解析

当我们调用shutdownNow方法来关闭线程池的时候会

  1. 在第6行将线程池的状态以CAS的方式从RUNNING改为STOP
  2. 在第7行给线程池中的所有Worker发送中断信号:t.interrupt()

ThreadPoolExecutor源码解析 当正在workQueue.take()workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)的线程被interrupt唤醒后,再次进入这个if会return null, 线程结束,此时队列中可能还有任务没有处理完,他也不管了。

也就是说当线程池处于STOP状态的时候不会处理新的任务,也不会处理队列中的任务

补充

在线程池执行原理的小节中,遗留了一个问题: ThreadPoolExecutor源码解析 上面的if已经判断出来当前的线程数量是大于核心线程数量的,为什么到了这里会出现当前线程数量为0的情况呢?

其实从整个线程池的执行流程和线程池的生命周期结合起来看的话,就会发现,这个判断是是有线程池状态不是RUNNING的时候才会进来,因为execute方法是没有加锁的,所以此时正在执行execute方法的同时外部有人调用了shutdown或shutdownNow方法更改了线程池状态,释放了全部线程。

现实和理想之间,不变的是跋涉;暗淡与辉煌之间,不变的是开拓。