ThreadPoolExecutor线程池核心原理
大家好,我是小趴菜,在面试中,我们经常会被面试官问到线程池的一些问题,比如
- 1:线程池执行流程
- 2:线程池是如何回收非核心线程
- 3:线程池如何维护核心线程
- 4:.........
今天我们来分析一下线程池的整体核心流程,帮助大家彻底了解线程池的底层核心原理
关于线程池的用法如下,我们要分析的入口就是submit()方法,这是将任务提交到线程池的方法
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1,2,3000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(120);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(120);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();
public Future<?> submit(Runnable task) {
//判断提交的任务是否为空,为空就直接抛出空指针异常
if (task == null) throw new NullPointerException();
//将我们的任务封装成一个RunnableFuture,后续可以通过Future.get()方法获取我们任务的返回结果
RunnableFuture<Void> ftask = newTaskFor(task, null);
//核心方法,将任务提交给线程池执行
execute(ftask);
return ftask;
}
execute(ftask);最后进入ThreadPoolExecutor这个类中
public void execute(Runnable command) {
//对我们的任务再次进行判空处理
if (command == null)
throw new NullPointerException();
//拿到我们线程池中此时存活的线程数
int c = ctl.get();
//如果存活的线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果此时线程池存活的线程数不小于核心线程数
//isRunning(c):判断此时线程池的状态,如果是Running,就可以接收新的任务
//workQueue.offer(command): 将任务放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//线程池的拒绝策略,也就是拒绝了这个任务
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列满了,也就是存活线程数不小于核心线程数并且队列也已经满了
//也就是判断线程数是否大于线程池最大的线程数
else if (!addWorker(command, false))
//线程池的拒绝策略,也就是拒绝了这个任务
reject(command);
}
在execute(ftask);方法中,出现最多的就是addWorker()这个方法,接下来我们看下这个方法的作用是什么
//firstTask:是我们要提交给线程池的任务
//core:true表示的是这个线程是核心线程,也就是线程数小于核心线程数时候创建的线程,
false,表示是可以回收的线程,表示此时线程数已经不小于核心线程数了
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//对于我们要了解线程池核心原理,这个for(;;)其实没必要去深入了解,我们可以直接忽略
//我们先把整体的流程搞明白,对于一些不重要的步骤我们可以直接忽略
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程池的状态,有RUNNING,STOP,SHUTDOWN等一些状态,只要不是RUNNING状态
//那么线程池就不会接收新的任务,所以这里直接会返回false,但是我们此时的
//线程池状态是RUNNING,所以不会返回进入这个if分支
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//拿到此时线程池存活的线程数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//我们直接从这里开始分析
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//将我们的任务封装成一个Worker对象,此时这里就创建了一个新的线程
w = new Worker(firstTask);
//从Worker对象中拿到执行当前任务的线程
final Thread t = w.thread;
//判断线程是否为空
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//这里还是判断线程池的状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将我们创建的新的线程放入到workers中
//private final HashSet<Worker> workers = new HashSet<Worker>();
//其实workers就是一个Set集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
//调用线程的start()方法去真正执行我们的业务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
因为我们的任务被封装成一个Worker的对象,而Worker又实现了Runnable接口,所以在调用start()方法时候,就会执行它的run()方法
// Worker中的run()方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//拿到我们提交给线程池的任务,注意:此时task不为空!!!!!!
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//因为此时的task不为空,所以会直接进入到while循环中去
while (task != null || (task = getTask()) != null) {
w.lock();
//判断线程是否被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务的前置业务逻辑,因为我们没有实现,所以这里不做任务处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正去调用我们的业务方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务的后置业务逻辑,因为我们没有实现,所以这里不做任务处理
afterExecute(task, thrown);
}
} finally {
//此时将task设置为null
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
第一次进入while循环就结束了,结束之后将task设置为null,因为这是个while循环,所以会再次进入这个while循环中去
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//再次进入到这个while循环中来,此时的task已经为null了,所以第一个判断为false
//接下来就会执行task = getTask()方法
while (task != null || (task = getTask()) != null) {
w.lock();
//判断线程是否被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务的前置业务逻辑,因为我们没有实现,所以这里不做任务处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正去调用我们的业务方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务的后置业务逻辑,因为我们没有实现,所以这里不做任务处理
afterExecute(task, thrown);
}
} finally {
//此时将task设置为null
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//这里是个死循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果此时线程池状态等于RUNING,并且队列已经为空了,就直接返回null
//但是此时我们的线程池状态是RUNNING,所以不会进入到这个if分支
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//将线程池的线程数减1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//allowCoreThreadTimeOut:false 表示如果没有任务,那么核心线程在超时时间达到之后就会回收
// true:表示核心线程不会被回收
// 我们可以通过 threadPoolExecutor.allowCoreThreadTimeOut(false);来设置
// allowCoreThreadTimeOut默认值就是false,所以timed的值就是由wc > corePoolSize来控制
//如果此时存活的线程数大于核心线程数,那么就是true,否则就是false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果此时存活的线程数大于最大线程数并且队列为空
// 就执行compareAndDecrementWorkerCount(c),将线程数减1,直接返回null,
// 返回null之后,上层while循环就直接退出,然后将这个线程回收
// 这个if的作用就是如果此时线程数大于最大线程数了,此时队列又没有任务,这时候就将
// 那么就直接将这些线程回收
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//如果timed = true:表示此时的线程数大于核心线程数,但是小于最大线程数,此时直接调用
// 阻塞队列的poll方法,并在我们设置的存活时间之后如果还没有获取到元素,就直接回收这条线程
// 如果timed = false:表示此时的线程数小于或等于核心线程数,那么此时就调用阻塞队列的
// take()阻塞方法,知道队列中有任务
//所以核心线程之所以能存活,就是因为调用了take()方法一直阻塞在这里,
//非核心线程是有阻塞时间的,超过这个时间没有任务就会被回收
try {
Runnable r = timed ?
//非核心线程会回收的核心原理
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//核心线程存活的核心原理
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
转载自:https://juejin.cn/post/7258266800739237947