深入理解Java线程池,ThreadPoolExecutor源码剖析
Java线程池
什么是线程池
线程池是一种线程资源,我们通过把线程池化,可以带来许多好处
线程池的好处
- 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程
- 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
- 可以对线程做统一管理、监控、调优。
线程池的应用非常丰富,随便翻开一个开源项目都能看到线程池的身影。
Java线程池快速开始
public static void main(String[] args) {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
// 向线程池提交任务
executor.execute(()->{
System.out.println(1);
});
}
// 关闭线程池
executor.shutdown();
// 等待所有任务完成
while (!executor.isTerminated()) {
}
System.out.println("线程池所有任务完成");
}
线程池的创建方式
- new ThreadPoolExecutor
- 通过Executors工具类创建线程池,这种方法本质是对第一种的封装
阿里规约禁止工程项目通过Executors工具类创建线程池,但规矩是死的人是活的,实际上像dubbo源码中也用到了Executors创建的线程池,但足以见得ThreadPoolExecutor的重要性。
那现在的问题是:Java如何做到池化线程资源?我们知道一个线程只能被调用一次start方法,那么线程在执行完一个线程后,如何执行下一个任务?没有任务时线程就一直消耗资源吗?我们来扒一扒ThreadPoolExecutor源码
Java线程池体系结构
Java中的线程池顶层接口是
Executor
接口,ThreadPoolExecutor
是这个接口的核心实现类, 我们从上往下慢慢看。
Executor接口
public interface Executor {
// 执行任务 Runnable是我们封装的线程体,是一段可执行的方法
void execute(Runnable command);
}
Executor往下走是ExecutorService
ExecutorService
public interface ExecutorService extends Executor {
// 关闭线程池
void shutdown();
// 返回未完成任务的列表
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
// 提供了3种submit,返回值都是Future
<T> Future<T> submit(Callable<T> task);
// 共4种invoke 支持超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit);
}
ExecutorService是对Executor的增强,从提供执行任务的基础功能扩展:
- 增强了对线程池本身的管理功能,可以关闭线程池
- 额外提供了3种有返回值的执行任务的方法,以及4种invoke的提交方法
关于invoke提交有啥用我们后面再聊,最核心的提交任务的方式是:execute > submit
在往下是AbstractExecutorService,看到是抽象类,那基本上就是对公共逻辑的实现封装了。
AbstractExecutorService
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
AbstractExecutorService主要是对上面的3种submit和4种invoke的实现,提供了两个newTaskFor方法。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
FutureTask不陌生,我们知道要获取异步执行线程的结果,光有callable是不够的,需要用FutureTask做一层封装
而submit方法最终还是调用了execute方法,只是在调用前将Callable封装成FutureTask而已。
所以到这我们可以基本确定:执行任务的核心就是execute方法。
那最后就是ThreadPoolExecutor了。
ThreadPoolExecutor源码也并不多,只有2000行,作为比较,ConcurrentHashMap有6000行。
那么线程复用的核心就藏在这2000行代码里了。
ThreadPoolExecutor
核心字段
ctl和Worker是至关重要的两个字段
ctl:控制核心
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 1 << 29 - 1 ,即CAPACITY为 后29位全1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits(前3位)
// Accept new tasks and process queued tasks
private static final int RUNNING = -1 << COUNT_BITS;
// Don't accept new tasks, but process queued tasks
private static final int SHUTDOWN = 0 << COUNT_BITS;
// Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
private static final int STOP = 1 << COUNT_BITS;
// All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
private static final int TIDYING = 2 << COUNT_BITS;
// terminated() has completed
private static final int TERMINATED = 3 << COUNT_BITS;
源码上的注解非常长,也很详细,大致意思是这样的:
ctl用一个字段封装了两个含义:
- workerCount:the effective number of threads that have been permitted to start and not permitted to stop.
- runState:线程池的状态,共有5种,具体的含义已经在上方源码中注释
因为ctl有32位,因此:用3位表示runState(5种状态 > 4),剩下的表示workerCount
并且提供了取出这两个数据的方法:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
主要字段
// 1.2. 核心/最大线程数
private volatile int corePoolSize;
private volatile int maximumPoolSize;
// 3.4. 超时时间
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
// 5. 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 6. 线程工厂
private volatile ThreadFactory threadFactory;
// 7. 拒绝策略
private volatile RejectedExecutionHandler handler;
// 锁 以及等待通知机制 线程池会添加/删除线程,就依赖mainLock来保证线程安全
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
// 工作线程的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
给它们进行了编号,实际上这些参数是可以通过构造方法自定义的。
线程池工作原理概述
现在有必要对这些字段有一些稍微的了解:
线程池的线程分为两种:核心线程,和非核心线程。用了两个字段维护它们的最大值,但需要注意:maximumPoolSize是所有线程数量的上限,而不单单是非核心线程的。
超时时间:线程是会占用资源的,瞬间的大量请求可能会导致线程池线程数量很多,但又回归平静没啥请求了,就会导致大量线程白白占用资源,尽管可以阻塞但仍然不够优雅。因此我们需要在空闲时能够释放它们。并且用allowCoreThreadTimeOut来控制是否核心线程允许超时过期,默认不允许。
阻塞队列:线程池在极端情况下会被提交很多任务,靠线程来不及一下子执行完,当然需要一个数据结构来存储这些线程,同时是个多线程场景,因此用了阻塞队列。
主锁mainLock:线程池在管理(创建和删除)线程时,也是会面临线程安全问题的,因此用这把锁来保证。
对于一个新来的任务:
- 如果核心线程数量都还没满,就创建一个线程来执行;
- 如果核心线程数满了,就放入阻塞队列中存储任务
- 阻塞队列也满了,就尝试创建非核心线程来执行任务
- 非核心线程数量也满了,只能触发拒绝策略
现在上面的字段会让你陌生的可能有ThreadFactory,RejectedExecutionHandler,Worker。我们依次来看:
ThreadFactory:线程工厂
ThreadFactory比较好猜:就是工厂模式嘛
源码也非常短,ThreadFactory接口只规定了一个方法:newThread
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
这就解释了线程池中线程名字的由来,并且:非守护线程,优先级为5。当然ThreadFactory我们是可以自定义修改的,实际上在项目中也大都需要自定义,修改线程名字便于定位和排查问题。
Rejected...Handler:拒绝策略
RejectedExecutionHandler是拒绝策略,即线程池不再允许放入任务时的行为。RejectedExecutionHandler是个接口,就一个方法。
ThreadPoolExecutor提供四种拒绝处理的策略(RejectedExecutionHandler的实现类)为:
- ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
来看一下AbortPolicy的源码:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
下面再看一下Worker
在线程池中真正执行任务的Worker
Worker是个内部类。
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable{
final Thread thread;
Runnable firstTask;
public void run() {
runWorker(this);
}
}
发现Worker居然继承了AQS,是个独占的锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setState(0);
return true;
}
源码注释说到:这个锁是为了说明线程工作状态,无锁代表空闲,有锁代表正在工作,当然细节远不止这么一些,但我们主线任务是线程池的原理,这个稍后说。
构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 线程用ThreadFactory生成
this.thread = getThreadFactory().newThread(this);
}
发现构造方法仅在一处被调用,那就是addWorker。
addWorker:新增Worker
private boolean addWorker(Runnable firstTask, boolean core) {
// 1. CAS自旋尝试将Ctl的workerCount + 1
int wc = workerCountOf(c);
// 如果超过线程数直接add失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 因为判断比较多 写的比较长 省略部分
try {
// 创建Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck 线程池状态,线程是否存活等
// 插入set集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
正常情况:就是new一个Worker,维护一下HashSet,然后启动线程。
线程复用核心原理:start
其实要做到线程复用,因为start方法确定只能调用一次,那么问题肯定是处在start方法上,也就是Worker的run方法,而run直接调用runWorker方法。我们不妨来看一下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 先执行firstTask,getTask方法就是从阻塞队列里取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 检查是否需要中断自己,比如线程池状态变了
try {
// 这是一个空方法,是一个扩展点,源码的注解是:
// may be used to re-initialize ThreadLocals, or to perform logging
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} finally {
// 和beforeExecute一样是个空方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 这个方法就是线程出现异常如何处理的关键,我们稍后单独分析
processWorkerExit(w, completedAbruptly);
}
}
总之,先执行Worker创建时绑定的方法,然后死循环从阻塞队列中取任务并执行。
Worker的start并不是单单执行一个任务,而是不断获取任务来执行,这就是线程复用的核心。
Worker就差不多讲完了,当然还有很多细节我们后面再讲,线程池更多的就是对Worker的管理和调度。现在把目光重新聚焦于ThreadPoolExecutor本身:
重新审视线程池状态
虽然源码上有注解,但不了解线程池大致工作原理容易看不明白,现在了解了大致工作原理了,我们可以重新看线程池状态会有一个更清晰的认知。我们再回顾一下源码的注释:
// runState is stored in the high-order bits(前3位)
// Accept new tasks and process queued tasks
private static final int RUNNING = -1 << COUNT_BITS;
// Don't accept new tasks, but process queued tasks
private static final int SHUTDOWN = 0 << COUNT_BITS;
// Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
private static final int STOP = 1 << COUNT_BITS;
// All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
private static final int TIDYING = 2 << COUNT_BITS;
// terminated() has completed
private static final int TERMINATED = 3 << COUNT_BITS;
我列了一张表格,会更加直观:
线程池状态 | 新来的任务 | 阻塞队列中的任务 | 执行任务中的线程 | 空闲的线程 |
---|---|---|---|---|
RUNNING | 接受 | 正常执行 | 正常执行 | 阻塞 |
SHUTDOWN | 拒绝 | 正常执行 | 正常执行 | 中断 |
STOP | 拒绝 | 丢弃 | 中断 | 中断 |
TIDYING | 拒绝 | 没有任务 | 没有线程 | 没有线程 |
TERMINATED | 拒绝 | 没有任务 | 没有线程 | 没有线程 |
TIDYING和TERMINATED有啥区别?源码注释说的很清楚了:the thread transitioning to state TIDYING will run the terminated() hook method,大概就是:一旦到了TIDYING状态,就会调用terminated()方法,这个方法调用完,线程池就变为TERMINATED状态。
线程池状态的转换
我们new出来的线程池初始当然是RUNNING状态
- 一旦手动调用shutdown,就会转化为SHUTDOWN;
- 一旦手动调用shutdownNow,就会转化为STOP;
等到SHUTDOWN或是STOP忙完了自己的事,线程池会自己陷入TIDYING,最后是TERMINATED。
shutdown:转化为SHUTDOWN
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// CAS 修改状态为shutdown
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
// 在ThreadPoolExecutor 这是个空方法
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow:转化为STOP
大部分源码都和shutdown差不多,下面列出不同点:
// CAS的目标为 STOP 状态
advanceRunState(STOP);
// 会中断所有线程
interruptWorkers();
// 将阻塞队列所有未完成的任务返回
tasks = drainQueue();
返回的类型是List<Runnable>
,实际类型是ArrayList
。
这两个方法的共同点是:最后都调用了tryTerminate();
方法
tryTerminate:可能的终止
tryTerminate();
方法被调用的地方很多,源码的注释是这样说的:
This method must be called following any action that might make termination possible -- reducing worker count or removing tasks from the queue during shutdown.
tryTerminate();
方法必须在任何可能导致线程池termination的地方被调用,比如:减少Worker数,在shutdown时删除阻塞队列中的任务。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 中断一个空闲的线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// Worker数为0
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 先转化为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 空方法
terminated();
} finally {
// 最后转化为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
线程池的构造方法
以下是ThreadPoolExecutor
构造函数需要的参数,共7个:
- int corePoolSize:该线程池中核心线程数最大值
- int maximumPoolSize:该线程池中线程总数最大值
- long keepAliveTime:非核心线程闲置超时时长
- TimeUnit unit:keepAliveTime的单位
- BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象
以上五个是必选项,剩下两个是可选项
- ThreadFactory threadFactory创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。
- RejectedExecutionHandler handler拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略。
可选项默认的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
核心方法:execute(主要工作原理)
线程池最重要的功能当然是执行任务,我们直接看execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.当前活跃的线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果不小于corePoolSize,则将任务添加到workQueue队列(阻塞队列)
// 这里调用offer方法而非put,因为阻塞队列满了需要特殊处理,而不是像get阻塞等待消费者唤醒
// offer方法,要是竞争锁失败,不会等待直接返回false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 如果线程池状态以及不是Running了,则remove这个任务,然后执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 核心线程数为0的特殊情况
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果放入workQueue失败(workQueue满了),则创建非核心线程执行任务,
else if (!addWorker(command, false))
// 如果这时创建非核心线程失败(当前线程总数不小于maximumPoolSize时),就会执行拒绝策略。
reject(command);
}
// addWorker(command, true) 第二个参数true表示核心线程,false表示非核心
文字小结一下:
- 线程总数量 < corePoolSize,无论是否有线程空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。
- 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去阻塞队列中取任务来执行(体现了线程复用)。
- 当阻塞队列满了,说明这个时候任务已经很多了,会创建非核心线程去执行这个任务。
- 阻塞队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理
整个execute方法还是很清晰的,如果忘记了addWorker方法可以回顾一下上文。
好,那么线程池最核心的部分就讲完了,是不是有点突然,才只看了一个execute方法,但那也是因为前文我们铺垫的足够久。当然从剖析源码的角度,我们还需要关注一下别的细节:
线程池的其他细节
核心线程预热
- prestartCoreThread 创建一个核心线程
- prestartAllCoreThreads 创建核心线程直到满
其实就是调用了addWorker方法
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
线程池异常处理/线程回收
在Worker线程的runWorker方法中,退出「不断从阻塞队列取任务并执行的循环」后,会执行processWorkerExit,要不是线程空闲太久了退出,要不是线程异常了退出,我们来看一下这个方法:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 线程异常退出时 completedAbruptly为true
// CAS WorkerCount - 1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 从HashSet中移除这个Worker对象
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 删除掉 这个Worker对象
workers.remove(w);
} finally {
mainLock.unlock();
}
// 可能终止线程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 涉及极端情况的细节考虑 但如果是正常的线程过期 就直接return
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 重新添加一个Worker,非核心线程
addWorker(null, false);
}
}
因此,如果线程池中的线程抛出异常没有处理,会把它移除掉,再添加一个新的线程进去。
如何区分核心/非核心线程
这里我们要提一个细节:
private volatile int corePoolSize;
private volatile int maximumPoolSize;
字段中有两个值,分别是核心线程数,和最大线程数,但只用了一个WorkerCount(ctl维护)统计线程数量,线程池是如何区分核心线程和非核心线程的?
实际上线程池并不严格区分核心线程和非核心线程,因为从Worker的角度来说它们没有什么区别,因此线程池只在乎数量。下面马上来看一下回收线程时的判断:
线程过期回收的实现
我们之前在分析线程复用核心原理start的方法时,getTask方法是个黑盒,并且getTask返回null时Worker线程就会退出循环并执行processWorkerExit(w, completedAbruptly);
,现在我们来看一下getTask这个方法:
getTask()方法返回null的时机如下:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
allowsCoreThreadTimeOut:是否允许核心线程过期被回收
因此,要么允许核心线程过期,要么线程数超过了核心线程数,此时才会执行poll方法,可能返回null,因为:
- take阻塞直到获取到任务
- poll(time,unit) :阻塞一段时间,还没获取到任务,返回null
用了allowCoreThreadTimeOut || wc > corePoolSize;
这个判断,因此线程池不需要严格区分核心线程和非核心线程。
那一旦返回了null,就可以用上文提到的processWorkerExit来处理。
单线程线程池有什么用
单线程线程池,再加上阻塞队列的FIFO特性,可以保证提交到该线程池的任务的有序性。
自定义线程池实战
熟悉了线程池工作原理,就可以自定义线程池并应用了,那么问题在于,线程池的构造函数复杂,该如何配置线程池的参数呢?
线程池的参数配置问题
线程池的参数并不好配置。最核心的三个:corePoolSize、maximumPoolSize,workQueue
IO密集型和CPU密集型运行起来的情况差异非常大。
CPU密集型:线程数等于CPU核数+1
IO密集型:线程数可以多一些,来充分利用CPU资源
像这个方案是太笼统了。
参数配置解决方案
比较知名的有:disruptor、动态线程池等,也可以自己根据业务不断改善和调整。想了解动态线程池的可以参考:Java线程池实现原理及其在美团业务中的实践
打破常规逻辑:Tomcat线程池
JAVA JDK线程池策略,比较适合处理 CPU 密集型任务,但是对于 I/O 密集型任务,如数据库查询,rpc 请求调用等,不是很友好,所以Tomcat在其基础上进行了扩展。
它里面的线程池的运行过程是先把最大线程数用完,然后再提交任务到队列里面去的。
tomcat的阻塞队列workqueue重写了offer
(排队任务的方法),(还记得怎么回收非核心线程吗,getTask方法如果超过核心线程数,会调用offer而不是task,因此可能被回收)该方法中,当当前线程数大于核心线程数,小于最大线程数时,返回false
,导致workQueue.offer(command)
为false
,进而导致该分支代码不被执行,执行addWorker(command, false)
方法,创建新线程。
核心源码offer方法如下:
// 对offer方法的包装
public boolean force(Runnable o) {
if (parent == null || parent.isShutdown()) {
throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
}
return super.offer(o);
//forces the item onto the queue, to be used if the task is rejected
}
// 重写offer方法
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) {
return super.offer(o);
}
//getPoolSize 方法是获取线程池中当前运行的线程数量
//也就是所有线程都创建完了,放入阻塞队列
if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
// getSubmittedCount 获取的是当前已经提交但是还未完成的任务的数量
// 其值是队列中的数量加上正在运行的任务的数量。
// 这块逻辑可能的情况就是,有些线程执行完了,空闲,那么我把任务放入队列里,就能立刻执行
// 没有必要再创建线程了
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}
//if we have less threads than maximum force creation of a new thread
// 这里就是实现创建线程的关键,return false就会去创建新线程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
return false;
}
//if we reached here, we need to add it to the queue
return super.offer(o);
}
上文源码的parent是什么?
taskqueue.setParent(executor);
构造线程池处有一行代码,设置了阻塞队列的parent为线程池
常见面试题
execute和submit的区别
submit本质只是将线程体(可能是callable,可能是runnable)封装成FutureTask再去执行execute方法
- execute只能提交Runnable类型的任务,没有返回值;
- submit既能提交Runnable类型任务也能提交Callable类型任务,返回Future类型。
- execute方法提交的任务异常是直接抛出的;
- submit方法是是捕获了异常的,当调用FutureTask的get方法时,才会抛出异常。
这种处理异常的方式不是被线程池所决定的,而是被FutureTask在try块捕获并封装到结果中,从而在get时才抛出。
为什么Worker要继承AQS实现锁
这个锁是为了说明线程工作状态,无锁代表空闲,有锁代表正在执行任务。并且Worker是不可重入的。目的是不希望正在工作中的线程被中断。
提交任务的方式有哪些
有4种invoke,3种submit,和一个execute。
submit是对execute的增强,主要是用FutureTask对线程体做了封装从而能够获取异步执行的结果。
invoke是对功能的扩展,支持提交多个任务,并且所有(All)/某个(Any)任务执行完毕才返回,它们也都用FutureTask做了封装,支持获取执行结果。
invokeAll:执行所有任务
线程池会把 list 里面的任务一个个的去执行,执行完成后返回一个 Futures 列表。
invokeAll方法:
- 把传入进来的任务封装为一个 FutureTask 对象,先放到一个 ArrayList 里面,然后调用 execute 方法,也就是扔到线程池里面去执行。
- 遍历每个Future 对象,调用get方法阻塞直到得到Future的结果
invokeAll与shutdownNow潜在的bug
- 在invokeAll期间执行shutdownNow方法,会导致有的任务阻塞在Future.get。因此程序无法退出。
shutdownNow返回未完成任务的列表,我们对list内的每个FutureTask执行cancel方法,从而使程序正常退出。
但采用了DiscardPolicy,你没有任何地方让这个被静默处理的 Future 抛出异常,也没用任何地方能调用它的 cancel 方法,所以这里就会一直阻塞。
说一下线程池里面的几把锁
除了mainlock和Worker,阻塞队列里也有两把锁
线程池如何处理异常
转载自:https://juejin.cn/post/7282692887665836086