【Java】ThreadPoolExecutor 线程池任务队列满了再添加任务就会执行拒绝策略?这简直是危言耸听!
问题
大家平时应该都有自己定义过线程池
下面是线程池ThreadPoolExecutor
的构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//省略
}
自定义线程池的时候需要指定一些参数
比较重要的就是corePoolSize
,maximumPoolSize
和workQueue
这几个参数
corePoolSize
核心线程数是指要保持活动状态的最小工作线程数(除非设置了allowCoreThreadTimeOut
)
maximumPoolSize
线程池最大工作线程数
workQueue
用于容纳任务并将任务移交给工作线程的队列
那么大家有没有想过这样一个问题
当线程数量等于核心线程数
corePoolSize
之后这时调用
execute
添加一个新的任务是会放到任务队列还是在最大线程数maximumPoolSize
的范围内新增一个线程来执行
用代码说话
为了得出问题的答案,我写了一个测试方法
public class ThreadPoolExecutorTest {
@Test
public void test() {
int corePoolSize = 1;//核心线程数
int maximumPoolSize = 2;//最大线程数
int workQueueSize = 1;//任务队列大小
long keepAliveTime = 0;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(workQueueSize),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Task task = (Task) r;
System.out.println("第" + task.index + "个任务被拒绝执行");
System.out.println("线程池中的线程数量:" + executor.getPoolSize());
System.out.println("队列中等待的任务数量:" + executor.getQueue().size());
Task peek = (Task) executor.getQueue().peek();
System.out.println("队列中等待的任务是第" + peek.index + "个任务");
System.out.println("------------------------------------\n");
}
});
//第一个任务
executor.execute(new Task(1, executor));
sleep(100);
//第二个任务
executor.execute(new Task(2, executor));
sleep(100);
//第三个任务
executor.execute(new Task(3, executor));
sleep(100);
//第四个任务
executor.execute(new Task(4, executor));
sleep(100);
}
void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//任务
class Task implements Runnable {
int index;//表示这是第几个任务
ThreadPoolExecutor executor;
Task(int index, ThreadPoolExecutor executor) {
this.index = index;
this.executor = executor;
}
@Override
public void run() {
System.out.println("这是第" + index + "个任务");
System.out.println("线程池中的线程数量:" + executor.getPoolSize());
System.out.println("队列中等待的任务数量:" + executor.getQueue().size());
System.out.println("------------------------------------\n");
sleep(100000);//使任务不结束,保证线程占用
}
}
}
指定核心线程数为1,最大线程数为2,任务队列大小为1
然后依次执行四个任务,看看最后结果会是什么样
在运行测试代码之前,不妨让我们先来做个假设
因为第一个任务肯定是会执行的,而第四个任务肯定是会被拒绝的(前三个任务会消耗掉最大线程数和任务队列,2 + 1 = 3)
所以不确定的就是第二个任务和第三个任务
如果任务放到队列中就不会被执行(这里我用sleep
来保证任务不会结束)
那么我们只需要看任务二和任务三哪个不会被执行就能知道是先放队列还是先加线程
假设一:先放队列
任务二会被放到任务队列,任务三会因为队列满了而新加线程执行
我们可以根据程序预测控制台的输出是
这是第1个任务
线程池中的线程数量:1
队列中等待的任务数量:0
------------------------------------
这是第3个任务
线程池中的线程数量:2
队列中等待的任务数量:1
------------------------------------
第4个任务被拒绝执行
线程池中的线程数量:2
队列中等待的任务数量:1
队列中等待的任务是第2个任务
------------------------------------
第一个任务会占用核心线程数,所以线程数量为1,队列为0
第二个任务因为核心线程数已经满了,会被放到队列中,所以不会打印任何信息
第三个任务因为核心线程数和任务队列都满了,会新增一个线程执行,所以线程数为1 + 1 = 2,队列中是任务二,数量为1
第四个任务因为任务一和任务三已经让线程数达到了最大线程数并且任务二让队列满了,所以会被拒绝执行
假设二:先加线程
任务二会新加线程执行,任务三会因为到达最大线程数而被放到任务队列
我们可以根据程序预测控制台的输出是
这是第1个任务
线程池中的线程数量:1
队列中等待的任务数量:0
------------------------------------
这是第2个任务
线程池中的线程数量:2
队列中等待的任务数量:0
------------------------------------
第4个任务被拒绝执行
线程池中的线程数量:2
队列中等待的任务数量:1
队列中等待的任务是第3个任务
------------------------------------
第一个任务会占用核心线程数,所以线程数量为1,队列为0
第二个任务因为核心线程数已经满了,会新增一个线程执行,所以线程数为1 + 1 = 2,队列为0
第三个任务因为核心线程数和任务队列都满了,会被放到队列中,所以不会打印任何信息
第四个任务因为任务一和任务二已经让线程数达到了最大线程数并且任务三让队列满了,所以会被拒绝执行
验证
接下来就让我们来运行测试代码
新机子哇伊自摸一刀子
这是第1个任务
线程池中的线程数量:1
队列中等待的任务数量:0
------------------------------------
这是第3个任务
线程池中的线程数量:2
队列中等待的任务数量:1
------------------------------------
第4个任务被拒绝执行
线程池中的线程数量:2
队列中等待的任务数量:1
队列中等待的任务是第2个任务
------------------------------------
最终的结果是第二个任务会被放到任务队列
所以也就是说会先放队列再加线程
不知道是不是和大家原来的理解一样呢
源码
最后一步,当然是要窥探一下线程池的源码了
首先需要提前说明
在线程池中有一个概念叫Worker
一个Worker
就代表一条线程,Worker
中持有Thread
对象
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
}
话不多说,直接来看execute
方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//这里有一大段注释被我删了
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
我们先来看workerCountOf
这个方法,根据命名就能很好的理解,表示获得当前线程池中有多少线程
接着看if (workerCountOf(c) < corePoolSize)
这个条件语句
这个if
判断当线程数量如果小于核心线程数,那么就调用addWorker
方法
addWorker
方法也很好理解,就是添加一个线程,它的定义是这样的
boolean addWorker(Runnable firstTask, boolean core) {
}
返回值是一个布尔值,返回true
表示添加成功,false
表示添加失败(比如在这个过程中调用了shutdown
或是另一个线程已经抢先添加了一个线程等情况)
第一个参数firstTask
是当Worker
运行之后需要执行的任务
第二个参数core
表示是否占用核心线程数
如果core
为true
那么会用当前线程数和核心线程数进行比较,如果核心线程数已经到上限就会直接返回false
如果core
为false
那么会用当前线程数和最大线程数进行比较,如果最大线程数已经到上限就会直接返回false
这里需要注意,核心线程数和最大线程数是数量,线程本身不存在是核心线程或非核心线程的概念
我们回来继续看这段代码(省略不重要的代码)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
}
也就说当线程数量小于核心线程数时,添加一个占用核心线程数的任务
如果成功就返回,如果失败就说明可能核心线程数满了或是已经shutdown
了
因为我们的测试代码不存在shutdown
的情况
所以只可能是核心线程数到达上限了
代码会走到if (isRunning(c) && workQueue.offer(command))
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//这里有一大段注释被我删了
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//核心线程数满了
if (addWorker(command, true))
return;
c = ctl.get();
}
//会执行这个 if
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
isRunning
判断当前线程池的状态,我们的测试代码中肯定是返回true
这样我们的任务就通过workQueue.offer(command)
被放到了任务队列workQueue
中
如果这个时候workQueue.offer(command)
返回false
说明队列也满了
那么就会到else if
分支调用addWorker(command, false)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//这里有一大段注释被我删了
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//核心线程数满了
if (addWorker(command, true))
return;
c = ctl.get();
}
//队列也满了
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//会执行这个 else if
else if (!addWorker(command, false))
reject(command);
}
相当于添加一个占用非核心线程数的任务
如果这个任务也添加失败了,就说明已经到了最大线程数,直接执行拒绝策略
我们可以尝试还原测试代码的执行逻辑
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//这里有一大段注释被我删了
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//任务一在这里添加成功并执行
if (addWorker(command, true))
return;
c = ctl.get();
}
//任务二在这里被放到了队列中没有执行
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//任务三在这里添加成功并执行
else if (!addWorker(command, false))
//任务四在这里被拒绝
reject(command);
}
结束
其实在我看源码之前,我一直以为会先加线程再放队列
因为在我看来,我们添加的任务一定是想要尽快执行
如果我们设置的是无界队列,那不是基本用不到最大线程
没想到实际的逻辑竟然有点反直觉
转载自:https://juejin.cn/post/7264201112873500709