likes
comments
collection
share

线程池ThreadPoolExecutor源码解析

作者站长头像
站长
· 阅读数 12

本文主要内容

  • 什么是线程池
  • 线程池的使用
  • 线程池的原理
  • 线程池中的位运算
  • 源码解析

什么是线程池

如果频繁创建线程,也是会影响整体资源或效率的,线程池的产生是为了避免重复的创建线程和回收线程。线程池有以下几个作用:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建、销毁线程造成的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果入限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。

线程池的使用

先来看看线程池的核心构造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //异常处理略过
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

需要用户指定以下几个关键变量:

  • corePoolSize:核心线程数量
  • maximumPoolSize:最大线程数量
  • allowCoreThreadTimeOut:是否允许线程超时(设置为true时与keepAliveTime,TimeUnit一起起作用)
  • keepAliveTime:线程存活时间(当线程池允许线程超时且运行中的线程数量超过- corePoolSize时,会按照此变量设置时间关闭线程)
  • TimeUnit:单位(一般与keepAliveTime同时使用,供线程池判断是否满足关闭线程的条件)
  • workQueue:缓冲队列
  • RejectedExecutionHandler:拒绝处理任务类(默认:AbortPolicy 会抛异常,见下方实例)
  • threadFactory:线程工厂(默认:DefaultThreadFactory)

明白这些核心数据的作用,就可以随意使用线程池了

在Executors类中,封装好了几个接口用于初始化线程池,具体可参考源码,只处只列举其中一个,不再详述

ExecutorService executor = Executors.newFixedThreadPool(5);

线程池的原理

如果让我们自己来设计一个线程池,我们应该怎么做呢?线程池最大的功能在于线程重复使用,不需要每执行一个任务就重新构造新线程。

如果用户提交的任务比较多,显然我们需要一个缓存队列用于存储任务。

为了达到线程重复使用的目的,线程应该不停地从缓存队列中获取新的任务,并且执行它。

线程池ThreadPoolExecutor源码解析

线程池中的位运算

线程池中使用一个AtomicInteger对象来表征线程池的状态和大小,为了达到1个int型数据能保存2个数据的目的,源码采用了非常精妙的位运算来实现。

//线程池的各个状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
//两个常量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//获取状态、线程池大小的方法以及一个或方法
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//关键变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

从源码可知,其实RUNNING等5个状态的值的前4位分别是:

  • 1110 、0000、 0010 、0100、 0110

各状态后面接了28个0。

CAPACITY的值,前4位为0,后28位为1。

如果要获取线程池状态,则调用runStateOf此方法,ctl的值将与 ~CAPACITY 作与操作,得到的正是ctl值的前4位。

获取当前线程池大小,则是与CAPACITY 作与操作,正是取ctl的低位值。

所以 ctl 的高位值用于表征线程池状态,而低们值用于表征线程池的大小。

ctl的初始化,调用ctlOf方法,正是拿RUNNING与0作或操作,结果还是RUNNING,所以可知,在线程池初始化的时候,默认ctl的状态值为RUNNING,而线程池大小为0。

ctlOf方法的含义也可知,正是组合状态值与线程池大小。

源码解析

一般来说,线程池添加任务有两种方式,一种是execute方法,另一种是submit方法,submit方法其实也是会调用execute,所以在此只研究execute方法即可。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取线程池状态
    int c = ctl.get();
    //如果线程池大小少于核心线程,则直接添加新的Worker并返回
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果核心线程数量已满,则考虑将任务添加进入缓存队列,然后再次检查状态
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //最后核心线程数量已满,缓存队列已满,那么直接交给拒绝策略处理
    else if (!addWorker(command, false))
        reject(command);
}

再来看非常关键的addWorker方法:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        //获取线程池状态
        int rs = runStateOf(c);
        // 对状态进行检测,如果线程池已经被调用shutDown方法时,返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //获取线程池的大小
            int wc = workerCountOf(c);
            //如果线程池大小已经超过核心线程池大小,那么也直接返回,这种情况应该把任务缓存到队列中去
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //正常情况下,线程池大小加1即可
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //新建一个Worker,在执行Worker构造方法时,创建了新的线程,即下面的t,具体可参见Worker的构造方法。
    Worker w = new Worker(firstTask);
    Thread t = w.thread;

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int c = ctl.get();
        int rs = runStateOf(c);

        if (t == null ||
            (rs >= SHUTDOWN &&
             ! (rs == SHUTDOWN &&
                firstTask == null))) {
            decrementWorkerCount();
            tryTerminate();
            return false;
        }
    
        workers.add(w);

        int s = workers.size();
        if (s > largestPoolSize)
            largestPoolSize = s;
    } finally {
        mainLock.unlock();
    }
    //启动线程
    t.start();
    // It is possible (but unlikely) for a thread to have been
    // added to workers, but not yet started, during transition to
    // STOP, which could result in a rare missed interrupt,
    // because Thread.interrupt is not guaranteed to have any effect
    // on a non-yet-started Thread (see Thread#interrupt).
    if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
        t.interrupt();

    return true;
}

在addWorker方法中,启动了一个新的线程,那我们来看看线程中的run方法,其实即是runWorker方法:

final void runWorker(Worker w) {
    Runnable task = w.firstTask;
    w.firstTask = null;
    boolean completedAbruptly = true;
    try {
        //请注意,这是一个while循环,如果当前Worker的task不为空,则取当前Worker的task,执行任务
        //如果当前task为空,则从缓存队列中取任务来执行,这就是线程池中最重要概念线程复用的体现
        while (task != null || (task = getTask()) != null) {
            w.lock();
            clearInterruptsForTaskRun();
            try {
                beforeExecute(w.thread, task);
                Throwable thrown = null;
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我们来看看getTask方法是否真的是从缓存队列中取任务:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        boolean timed;      // Are workers subject to culling?

        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //正常情况下,线程池数量应该少于最大值,并且也不会timedOut,
            //如果真的大于了最大值,则应该删除一个线程
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }

        try {
            //从缓存队列中取出任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

线程池分析到这,基本结束了,内中最多的细节,比如同步锁的使用,比如状态的判断,比如最后停止线程池等等,不再细说了,线程池介绍到这基本原理都清楚了,有点类似于android中的Looper,死循环中取消息,分发消息并执行对应操作等,线程池也是这样。

线程池中的位运算,刚开始可能看不懂,但在纸上写下各个数,基本就能明白各个方法的含义了。

最后再补充一个小细节,负数在计算机中的表示,为了更方便实现2进制数的加减法,负数使用补码表示,具体则是正数部分取反加1,而正数的补码则是正数本身。正因为如此,RUNNING 前4位才是1110,具体的各位可以计算下,补码的意义可以自行搜索。