线程池使用不当导致的hang死问题
背景
最近开发了一个小功能里面用到了线程池来提高处理速度,但是发现点击按钮之后程序并没有出现本应该打印的日志。 计算机没有玄学,一切问题都有其原因,找到它记录下来才能一点点提升。 于是就查下这个问题吧。 代码大致路基如下,创建了一个核心线程数=1,最大线程=10的线程池。点击按钮之后会将一个任务放到现成池中,然后这个任务又会for循环的去创建子任务放到同一个线程池中。父任务等待子任务的运行结果。
AtomicInteger atomicInteger = new AtomicInteger(0);
// 线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 10,
10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() {
final AtomicInteger threadId = new AtomicInteger(0);
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "custom-" + threadId.incrementAndGet());
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
// 模拟点击事件将任务放到线程池中
CompletableFuture.runAsync(() -> {
List<CompletableFuture<Integer>> futures = new ArrayList<>();
// 这个任务for循环创建子任务并放到同一个线程池中
for (int i = 0; i < 100; i++) {
futures.add(CompletableFuture.supplyAsync(() -> {
int i1 = atomicInteger.incrementAndGet();
System.out.println(i1);
return i1;
}, executor));
}
// 这里需要依赖子任务的处理结果,所以等待子任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<Integer> future : futures) {
System.out.println(future.getNow(-1));
}
}, executor);
// 等待
try {
TimeUnit.SECONDS.sleep(60 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
找问题
遇到这个问题一时半会的也很闷逼,于是先将核心线程数调大点看看效果,将 coreSize = 2 再次运行发现这个代码可以正常运行了。 于是猜想应该是父任务 join 等待导致的这个问题!!!
验证猜想
使用arthas来看下线程的运行状态
arthas 官网:arthas.aliyun.com/doc/command…
使用 thread -all | grep 'custom'
看下自定义的线程池的线程状态
86 customer-1 Idea Thread Group 4 WAITING 0.0 0.000 0:2.090 false false
发现这个线程1一直处于 WAITING
状态,基本可以验证猜想正确。
根源
但是最大线程数是10啊,为什么不会去创建线程去处理子任务呢???
于是看了下线程池的源码,线程池在什么时候创建线程、在什么时候销毁线程?
调用线程池都会使用 execute
方法,先看这个方法。
- 获取正在运行的线程数
- 如果数量小于核心线程数调用
addWorker
方法,并return
- 如果当前的线程数量大核心线程那么尝试将
command(task)
放到阻塞队列中。 - 如果队列满了,那么直接调用
addWorker
方法,如果这个方法也没有成功,这时候使用拒绝策略。
我们都知道线程池中的 Worker
对象就是和线程一一对应的工作者。
addWorker
方法会就调用线程池工厂创建线程,具体代码如下
addWorker
方法调用了 Worker
构造函数,这里会调用工厂创建线程。
这里就是线程的创建时刻,**addWorker**
方法第二个参数boolean core控制是创建建比核心线程数多比最大线程数小的线程,
这里为false就创建非核心线程(线程本身没有状态标识时候是否核心),调用的地方在第一张图,队列满了之后调用的。
这里有个注意的地方,Worker
对象是实现了 Runnable
的,并且调用工厂创建线程的的时候将自己作为
待运行的task
传了过去,也就是说创建的这个新线程运行 start
方法的时候会运行Worker
对象的run
方法,调用 runWorker
方法。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 创建线程并将自己当做task传了过去
this.thread = getThreadFactory().newThread(this);
}
// this.thread.start() 方法开始执行的时候会走到这里
public void run() {
runWorker(this);
}
}
runWorker
的代码如下,将 Worker
对象构造的时候的task当做第一个运行的任务,并且会一直while循环的从队列中去拿任务,如果队列空了没有拿到任务,那么这个 Worker
的run
方法也就运行完了,其生命周期也就结束了。
这里就是线程的回收时刻
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 将 Worker 对象构造的时候的task当做第一个运行的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 这里是关键,处理一开始拿到的task之外,还会调用getTask方法拿任务。
// 如果任务队列不空这个while 循环是不会停止的,也就是说这个线程会一直运行。
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
结论:
由于创建线程池使用的阻塞队列没有长度限制,导致所有的子任务都会放到队列中不会创建新的 Worker
线程导致 hang死。
所以这里有两种处理方式,
- 将核心线程数设置成大于1的
- 将阻塞队列设置成有界的,且小于子任务数量的
在代码中还是要避免这种父子任务的写法,因为就算同时修改了上面两点如果出现并发大量的父任务一下打满了最大线程数,子任务在队列无无法处理也会出现线程池hang死问题。
转载自:https://juejin.cn/post/7252521137820598332