对于ThreadPoolExecutor 代码的一些思考
基本介绍
ThreadPoolExecutor,我们通常用来创建线程池进行执行我们的任务,构建出来后执行excecte或者submit方法来执行我们的任务。 里面的原理及其设计思想到底是什么呢,我们接下来看下。 接下来的源码部分都是基于jdk8.
线程池状态
线程池在不同时期是有对应状态的。
以下为源码
public class ThreadPoolExecutor extends AbstractExecutorService {
//32-3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 11100000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00100000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
// 01000000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
// 01100000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
}
直接使用 32 位的高3位来表示对应线程池状态状态
低29位来表示线程池的最大容量 CAPACITY = 00011111 11111111 11111111 11111111
线程池基本属性
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
private final BlockingQueue<Runnable> workQueue;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private volatile long keepAliveTime;
private final ReentrantLock mainLock = new ReentrantLock();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile boolean allowCoreThreadTimeOut;
private volatile int maximumPoolSize;
}
ctl
初始值默认为 11100000 00000000 00000000 00000000
存储的是线程池状态及当前线程数,高三位为状态,低29位为线程数。
源码表明线程池创建时状态为running,线程数为0
线程池的方法
基本使用
public class TestThreadPoolExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(1, 1, 100L,
TimeUnit.SECONDS,new ArrayBlockingQueue(100));
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("test");
}
});
}
}
execute()
public class ThreadPoolExecutor extends AbstractExecutorService {
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException()
int c = ctl.get();
//现存线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 新建线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断当前线程池状态,加入到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 二次检查,如果不是运行中,从队列中剔除
if (!isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
//工作线程为空则添加工作线程,但添加的是一个空值
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
//状态不是运行中,尝试addWork
} else if (!addWorker(command, false))
//拒绝策略
reject(command);
}
// CAPACITY =`00011111 11111111 11111111 11111111`
// 0 与上任何值都为0,所以这里只是判断高32位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
}
功能:提交任务给线程池进行执行。
基本流程:
- 判断是否达到核心线程数,如果还未达到,则新建核心线程加入线程池中
- addWork后直接返回
- 注意,这里只是单纯addWork,并没有加入到阻塞队列中
- 当新增核心线程失败或者新增非核心线程时
- 会多次判断线程池状态,保证线程池为运行时状态才会加入到队列中
- 加入阻塞队列后会二次判断线程池状态,如果不是运行时,则将刚才加入的从队列中剔除,并执行拒绝策略
- 如果加入阻塞队列后,为运行中状态,且线程池线程数为0,则也执行addWork,但执行的command为null
- 会多次判断线程池状态,保证线程池为运行时状态才会加入到队列中
- 非核心线程如果在线程池状态为非running状态addwork,失败后也是执行reject
如果二次判断时,线程池运行中,并且线程数为0,则这时候也会addWork,但是加入的是command为null,这是为什么?之后再说
submit()
public class TestThreadPoolExecutor {
public static void main(String[] args) {
Future<String> test_call = threadPoolExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("test Call");
return "callable has result";
}
});
System.out.println(test_call.get());
}
}
源码
public class ThreadPoolExecutor extends AbstractExecutorService {
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
}
功能:提交任务,能够返回结果,实现callable,赋值给RunnableFuture,最终执行execute();
addWork()
public class ThreadPoolExecutor extends AbstractExecutorService {
private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// >= SHUTDOWN,也就是状态非运行中。
//且只有状态不为SHUTDOWN,或command为空,或队列为空,才不允许再新增
//也就是如果为SHUTDOWN状态时,如果队列不为空,且我们传入的command为空时要执行新增
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判断线程数是不是已经超过了
//如果是要新增核心线程数则跟核心线程数比
//如果新增的不是核心线程数则跟最大线程数比
//线程数超过都直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas 对ctl设值,设置成功直接跳出所有循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
//线程池状态已经更改了,再次重新进来
if (runStateOf(c) != rs)
//跳出第二层循环,再次进来
continue retry;
}
}
//上面给线程池线程数+1 ,执行创建work
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁,保证并发时的安全性
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//线程池状态为STOP|TIDYING|TERMINATED
//或者线程池为SHUTDOWN,但传入的command 为null
//这时候如果执行的线程还是存活状态,则直接抛出异常
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//否则直接addWork成功,works其实是个set
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//addWork 成功。开启线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
}
addWok方法包含了新增及除法work运行的功能。
成功将线程池数通过cas加1 后,将新增的work加入works是会执行一次加锁操作防止并发,加入成功后,会触发work对应线程的run方法来执行。
在创建work前,会判断能否addWork
- 当如果线程数大于我们规定的对应线程数还有大于我们的ThreadPoolExecutor规定的最大线程数时,是不允许再新增work的
- 当线程池不处于running状态时,也有特殊情况不允许addWork
- 状态为SHUTDOWN,如果队列中有数据则继续往下执行,但不允许新增任务
- TERMINATED、TIDYING、STOP 状态,都不允许addWork
Work
work是什么?
public class ThreadPoolExecutor extends AbstractExecutorService {
private volatile ThreadFactory threadFactory;
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
//初始化aqs中资源state值为-1
setState(-1);
this.firstTask = firstTask;
// work 中的thread 通过线程池设置的工厂进行创建
this.thread = getThreadFactory().newThread(this);
}
//实现的Runnable 中的run方法
public void run() {
runWorker(this);
}
//实现了设置资源的方式
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//实现了释放资源的方式
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
final void runWorker(Worker w) {
// 先省略这部分代码
}
}
根据work源码,可以看出,worker实现了Runable的 run 方法,并且将 thread设置为woker的属性。
一个work代表一个线程,workers 的size其实也是对应我们线程池的线程数,addWork 的其中一个主要功能,也就是为我们的线程池创建对应的线程来执行任务。
线程池、线程池,也是就是对应创建出的work保存到的works,HashSet<Worker> workers
就是我们用到的线程池。
Work 继承 AbstractQueuedSynchronizer,具有AQS特性(用于对work 设置状态值方面防止出现并发问题,像我们对work的线程进行设置中断等操作)
在work中,我们重写了AQS中的tryAcquire(int unused)
及tryRelease(int unused)
方法,跟ReentrantLock不同的的一个大点是,这里是不可重入的。
runWork()
我们看addWork()与Work的代码,可以看出,addWork()中,获取work的thread属性执行run方法,其实就是执行Work中所实现的run 方法,最终也去到了runWorker这个方法。
public class ThreadPoolExecutor extends AbstractExecutorService {
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池状态是stop/TIDYING/TERMINATED,确保线程标记为中断
//如果不是,则保证线程不被标识为中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 将work退出,当中途发生异常时,completedAbruptly=true
processWorkerExit(w, completedAbruptly);
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试转换成TERMINATED
tryTerminate();
int c = ctl.get();
//SHUTDOWN 或者running状态 才执行下面的规则
if (runStateLessThan(c, STOP)) {
//为false才进去这里,根据调用链路查看道,只有runWork正常结束才是false
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 当前工作线程数>=核心线程数则直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//异常结束情况,则新增任务为null,来触发执行队列中的任务
addWorker(null, false);
}
}
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
}
从源码可以看出,当task也就是我们addWorker(Runnable firstTask, boolean core)
中的Runnable属性,为空时,会执行getTask()。前面也说过,当线程池状态为运行中,但工作线程为空时,加入的执行任务为null。
看到这里,其实有点能get到execute()中的,加入队列后,当前线程池中线程数为0,我们执行addWorker(null, false);
操作。因为他需要触发队列的执行。
while (task != null || (task = getTask()) != null)
中,第一个判断,其实是基于创建核心线程的时候执行的,因为核心线程创建出来然后触发运行时完全不需要走队列,第二个判断,是从队列中获取任务进行执行。
在runWork过程中如果出错,最后也是会addWorker(null, false);
来重新触发新增work来消费队列中的任务,因为runWork 的finally 操作所调用的processWorkerExit
,无论你结果怎样,他都会先帮你把原有work给remove掉。按源码,全部任务执行完后,所有的work都会被remove掉,核心线程数都归0。
runWork 获取到任务后会进行加锁,因为我们work里的实现就是一个不可重入锁,也表明,一个work一次只能执行一次任务。
在执行task.run();
的前后,提供了两个抽象方法给实现具体操作。
public class ThreadPoolExecutor extends AbstractExecutorService {
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
}
getTask方法中,从队列获取线程这里,是用一个while包围起来,当前work从队列中拿去任务并执行完后,会继续拿取。
getTask()
public class ThreadPoolExecutor extends AbstractExecutorService {
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 状态不是running,只要队列没有东西,直接返回null
// 状态为TERMINATED 或TIDYING 或 STOP,无论队列有没有东西,直接返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//线程池线程数-1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当前线程数超过最大线程数 或者 等待超过keepalive时间
if ((wc > maximumPoolSize || (timed && timedOut))
// 当前线程数大于1 或者 阻塞队列为空
&& (wc > 1 || workQueue.isEmpty())) {
//线程池线程数-1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//超时了的处理
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//未超时时的处理
workQueue.take(); //未超时,从队列中拿任务
if (r != null)
return r;
timedOut = true;//拿到的任务是空,则设置状态未超时
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}
getTask()
方法,一进来,一个大大的自旋for (;;)
,要么拿到对应任务返回,要么继续循环尝试获取。
getTask()
里会有关于当前线程池的状态判断来决定是否要继续执行获取队列中的任务来进行执行- 只要线程池状态为running,如果队列为空,则直接返回null
- 如果状态为TIDYING、TERMINATED,无论队列中有没有东西,直接返回null,这也表明了状态为TIDYING、TERMINATED、STOP,就不再执行任务了
从代码可以看出,每次addwork就是执行创建work,并且将work加入到队列中,
而 getTask()
方法的具体实现,就是从队列中获取 runable 任务,来进行执行,也就是runWorker方法主要做两件事,一件就是根据传入的 worker 来执行,一件是触发从队列中获取任务执行,因为runWorker()
的获取任务的实现,是基于 for(;;)
,也就是会一直执行到队列为空为止。
getTask()
中也作了对非核心线程空闲时的对应处理,结合了 keepAliveTime
进行判断超时。
tryTerminate()
在执行完队列任务后,work退出后,会执行tryTerminate()
public class ThreadPoolExecutor extends AbstractExecutorService {
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//运行中
//或者状态为TIDYING或者TERMINATED状态
//或者当前状态为SHUTDOWN但是队列有任务
//则直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
//当works不为0,随机挑一个空闲线程标记线程中断
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//尝试将线程池状态标记成TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//将线程池状态设置成 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
protected void terminated() { }
}
- 以下会有几个可能不会被标记中断
- 还在运行中
- 已经被标记成TIDYING或者TERMINATED
- 被标记成SHUTDOWN,但是队列还有数据
作者提供了一个protected void terminated() { }
抽象方法给我们自己实现,当我们进行将线程池设置成终止状态的时候,也就是线程池要关闭的时候可以作为后置处理一些逻辑。
TIDYING 跟 TERMINATED 的关系其实就是一个过渡的关系,其实也都是表明,线程池限制在关闭。
但当works数量不为0 时,为什么会随机选一个来标记中那段?这个还不知道为啥 = =
shutdown
不再接受新任务,但继续执行旧任务,执行shutdown方法会状态会变为shutdown
public class ThreadPoolExecutor extends AbstractExecutorService {
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//标记一下线程池状态为shutDown
advanceRunState(SHUTDOWN);
//给workers里所有的thead设置中断状态
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//执行尝试终止
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
void onShutdown() { }
}
shutdown这个方法只是将状态值设置成shutDown,然后进入tryTerminate方法。 tryTerminate 方法上面已经讲过,当线程状态值等于shutDown但队列不为空,不会执行任何操作,也就是等到runWork里的getTask都执行完后,因为runWork执行完后也会执行tryTerminate,这时候队列就可能为空了,最后走向TERMINATED。 这也是验证了我们常说的,shutdown不会马上关闭,会等到所有任务都执行完了才关闭。
shutdownNow()
public class ThreadPoolExecutor extends AbstractExecutorService {
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
}
shutdownNow() 就很直接,标记STOP状态,把队列中所有的任务全删掉。
线程池核心参数
在我们创建线程池时,我们需要设置对应的一些属性参数。从上面的源码,我们可以得出对应参数所对应的意义
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
corePoolSize
核心线程数,线程池基本的一个线程数量,当存在线程数小于核心线程数时,新进一个线程就创建一个新线程,即使线程池存在其他空闲线程
maximunPoolSize
最大线程数,线程池允许的最大线程数量。 活动线程大于核心线程数,并且池中正在运行的线程数小于maximumPoolSize,对于新加入的任务,新建线程。
keepAliveTime
非核心线程的存活时间,就是maximunPoolSize - corePoolSize 的那几个线程
BlockingQueue
堵塞队列/工作队列,执行任务前保存任务的地方
threadFactory
创建线程的工厂
handler
拒绝策略,当队列里的任务数达到上限,并且池中正在运行的线程数等于maximumPoolSize,对于 新加入的任务, 执行拒绝策略(线程池默认的拒绝策略是抛异常)
设计思想
线程池的整个大致的流程其实就是用个容器set保存多个线程,然后我们的任务放置队列中,线程各自一个个从队列中获取任务,处理任务,执行完再从队列中获取,直到没有。有点异步处理的意思,有个队列放着我们所需要处理的任务,让线程各自拿取处理,线程池外部我们的主线程继续做自己的事情。
状态总结
从上源码解析可知
- TERMINATED、TIDYING、STOP,表明线程池已经在关闭的过程了,TERMINATED表明是已关闭状态
- 不允许addWork
- 当队列中还有任务,也不再执行了
- STOP,执行方法ShutDownNow的时候的所设置的状态,表明要马上关闭
- TERMINATED、TIDYING 是线程池执行关闭时的状态值,TIDYING->TERMINATED
- 所有的work执行完都会去尝试将线程池转换到TERMINATED状态,也就是关闭线程池
- SHUTDOWN,执行方法ShutDown的时候的所设置的状态
- 不允许新增任务
- 如果队列中存在任务,会去执行完队列中所有任务。
转载自:https://juejin.cn/post/7227363129457066021