滥用线程,导致线上线程池被撑爆的一次意外
事故的背景
简单的描述一下业务场景,项目里面有一个分布式定时job ,定期去扒取数据,那么有两层循环,第一层是大概1000条数据 ,然后第二层循环每个数据下面大概20个子数据,然后通过多线程的方式去扒取数据入库。这就是一个简单的业务背景。这个对刚入行的小白写代码得注意了!
作为程序员的我也是第一次遇到这个问题,虽然这个问题解决很简单,但是造成的影响很大,我觉得还是有必要做一个小总结,小伙伴们以后写类似代码的时候就会有注意,这个问题的造成是我一个同事没有理解透线程池,导致的一个很大的问题。那么先说问题之前先扒拉扒拉线程池。

线程池
首先我先说明一点在企业中一般都是自定义线程池,很少使用jdk 给我们提供的几种线程池方法,这样是为了做到一个可控制。
这里帮大家只是一次简单的回顾吧,具体线程池网上已经一大片一大片的文章了。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
private static ThreadPoolExecutor poolExecutor = new
ThreadPoolExecutor(3,
30,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue(1000),
new MyRejectedExecutionHandler(MAX_QUEUE_SIZE));
那么说到线程池无非就理解这几个参数。

流程
- 假如我们定义的corePoolSize 是5个 maximunPoolSize 是 30, keepAliveTime为一分钟 ,队列大概1000个吧
- 第1个Task来了放入到队列里面,线程池一直从队列里面获取,获取到第一个Task,发现当前线程池里面线程个数小于corePoolSize ,欧克欧克,兄弟我马上给你创建一个线程。
- 此时第2个Task 又来了,线程池又从队列里面获取,发现现在的线程还是小于corePoolSize,马上又开启新的线程继续执行
- 此时第5个Task 来了,那么和corePoolSize 相等了,ok ,那么继续创建一个线程。对于创建的这5个线程是不会销毁的,这也是为什么线程池可以避免重复创建线程代来的消耗。
- 此时第6个Task 来了,线程池发现corePoolSize 里面有刚刚执行完了的线程,那么此时就不需要创建新的线程了,直接拿来使用,达到了一个复用的效果。
- 假如我们的Task 很多很多,又比较耗时,corePoolSize 已经没有空闲的线程了,那么接下来就是maximumPoolSize出场了,发现线程池里面此时corePoolSize < num < maximumPoolSize 那么继续创建线程执行Task
- 那么当线程池数量num > maximumPoolSize,就不会创建新的线程,来的新的Task直接往队列里面仍进去
- 当队列里面都已经超过了最大数量(我们这里定义的1000) ,相当我们线程池已经爆满了没法接收新的Task了,那么不好意思兄弟,我只能拒绝你了,然后就走我们的MyRejectedExecutionHandler
- 上面的参数基本已经提到了,还剩一个keepAliveTime 那么这个时间又什么时候用呢,这种场景下corePoolSize < num < maximumPoolSize 超过corePoolSize 空闲的线程的存活时间,比如核心是5个,最大30个,那么多余的25个线程就是超出的线程,简单的总结就是:超出核心线程的线程 存活的时间。
那么上面也是整个线程池的核心流程做了一个描述。
自定义线程池
上面已经描述了线程池的流程和原理,下面自定义线程池直接贴代码了,就不做过多的阐述了。
// 定义一个线程池类
public class MyWorkerThreadPool {
private static final int MAX_QUEUE_SIZE = 1000;
private static ThreadPoolExecutor poolExecutor = new
ThreadPoolExecutor(3,
30,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue(MAX_QUEUE_SIZE),
new MyRejectedExecutionHandler(MAX_QUEUE_SIZE));
public static void submitTak(Runnable run) {
poolExecutor.submit(run);
}
public static void shutdown() {
poolExecutor.shutdown();
}
}
// 定义一个拒绝策略
public class MyRejectedExecutionHandler implements RejectedExecutionHandler{
private final Log logger = LogFactory.getLog(this.getClass());
private int maxQueueSize;
public MyRejectedExecutionHandler(int maxQueueSize){
this.maxQueueSize=maxQueueSize;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("提交任务失败了" +
"当前队列最大长度" +
"maxQueueSize="+maxQueueSize+
",maximumPoolSize="+executor.getMaximumPoolSize());
if(executor.getQueue().size()<maxQueueSize){
executor.submit(r);
}else{
try {
Thread.sleep(3000);
executor.submit(r);
}catch (Exception e){
//此异常忽略
executor.submit(r);
}
}
}
}
事故代码(伪代码)
那么接下来就是重点了,这里只贴一部分伪代码,前面已经说了这是一个分布式定时job,这里是我精简了同事的代码提炼出来的。
//模拟场景
for(int i= 0;i< 1000;i++){
for(int j = 0;j<20;j++){
MyWorkerThreadPool.submitTak(()->{
// 真实业务场景这里非常耗时,
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
执行结果:
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
这里 第一层模拟了1000条数据,第二层循环30条数据,同事的代码,导致同一时间我们自定义的线程队列爆满,2000*30 这个是要开多少个线程啊。细细想下是不是很恐怖,这么使用会导致什么后果呢,当前业务又很多都被拒绝掉没有执行,另外当线程池爆满后,我们项目的其它功能执行异步方法也会被拒绝掉,结果可想而知。
那么如何解决了,其实很简单,我们跑这个比较耗时的任务我们可以指定10个线程去跑就是了,这样就不会影响到其它的功能业务。 我这里简单的使用了一个计数器,比如超过了10个线程则阻塞等待一会,跑完一个线程则减一,直接上代码
public class TestMain {
public static void main(String[] args) throws Exception{
//模拟场景,这是一个分布式定时任务,所以不会存在并发同时执行的问题
AtomicInteger threadNum = new AtomicInteger(0);
// 模拟当前第一层有1000数据
for(int i= 0;i< 1000;i++){
// 模拟每条线路有20条子数据
for(int j = 0;j<20;j++){
// 多线程拉去爬取网上的数据汇总到数据库
// 一次最多开启10个线程取执行耗时操作,
while (threadNum.get() > 10){
// 可以小睡一会再看是否有资格执行
Thread.sleep(500);
}
// 小增加1
threadNum.incrementAndGet();
int tempI = i;
int tempJ = j;
MyWorkerThreadPool.submitTak(()->{
// 真实业务场景这里非常耗时,
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"执行完第一层数据"+ tempI +"第二层:"+tempJ);
// 执行完减少一个1
threadNum.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
执行结果:
pool-2-thread-2执行完第一层数据0第二层:1
pool-2-thread-1执行完第一层数据0第二层:0
pool-2-thread-3执行完第一层数据0第二层:2
pool-2-thread-2执行完第一层数据0第二层:3
pool-2-thread-3执行完第一层数据0第二层:5
pool-2-thread-1执行完第一层数据0第二层:4
pool-2-thread-3执行完第一层数据0第二层:7
pool-2-thread-1执行完第一层数据0第二层:8
pool-2-thread-2执行完第一层数据0第二层:6
pool-2-thread-1执行完第一层数据0第二层:10
pool-2-thread-3执行完第一层数据0第二层:9
pool-2-thread-2执行完第一层数据0第二层:11
pool-2-thread-3执行完第一层数据0第二层:13
pool-2-thread-2执行完第一层数据0第二层:14
pool-2-thread-1执行完第一层数据0第二层:12
总结
其实我们开发中又很多小细节,只是我们有时候没有注意或对其原理不清楚,有时候写出来的代码就会带来比较糟糕的后果。那么今天这篇就到这里!
转载自:https://juejin.cn/post/6844903998273617933