JUC(2)线程池一览
前言
本篇主要介绍线程池相关的知识点和面试题知识。
一、什么是线程池
说到线程池,我们可以类比 JDBC 连接池。在之前我们操作数据库连接的时候,有一个创建连接和销毁连接的过程,这个资源开销是比较大的。而真的业务即拿到连接后执行的语句消耗的资源通常是较小的,为了减少在创建连接和销毁连接的资源开销,提高系统的性能,我们引入了 JDBC 连接池,系统启动的时候,先放入很多连接放入池子里面,使用的时候,直接从连接池中获取一个,完毕之后返回到池子里面,继续给后续调用者使用,这就整体提高了系统的性能。
线程池和数据库连接池的原理也差不多,创建线程去处理业务,可能创建线程的时间比处理业务的时间还长一些,所以我们就可以使用线程池。创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池中。
二、为什么使用线程池
线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程进行阻塞队列排队等待,执行其他线程执行完毕,再依次执行。
主要特点:线程复用、控制最大并发数、管理线程。
- 1、降低资源消耗,减少资源创建和销毁的性能开销;
- 2、提高响应速度,当任务到达时,任务可以不需要等待线程创建就立即执行;
- 3、提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
\
备注:就记住这三个好处即可。来自《java 并发编程艺术》
三、线程池的使用
常见的线程池有 FixedThreadPool,SingleThreadExecutor、CachedThreadPool 和 ScheduledThreadPool。这几个都是 ExecutorService(线程池)实例。
3.1 Executors.newFixedThreadPool(int)
newFixedThreadPool创建的线程池 corePoolSize 和 maximumPoolSize 值是相等的,它使用的是LinkedBlockingQueue 执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 适用场景:适合处理 CPU 密集型的任务,在长期被工作线程使用的情况下,核心线程不会被回收。但在任务比较多的时候会导致 OOM,不会拒绝任务。
3.2 Executors.newSingleThreadExecutor()
newSingleThreadExecutor 创建的线程池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue一个任务一个任务的执行,一池一线程.
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 适用场景:适合串行执行任务的场景。
3.3 Executors.newCachedThreadPool()
newCachedThreadPool创建的线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程池
* Arrays
* Collections
* Executors
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
//List list = new ArrayList();
//List list = Arrays.asList("a","b");
//固定数的线程池,一池五线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5); //一个银行网点,5个受理业务的窗口
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); //一个银行网点,1个受理业务的窗口
ExecutorService threadPool = Executors.newCachedThreadPool(); //一个银行网点,可扩展受理业务的窗口
//10个顾客请求
try {
for (int i = 1; i <=10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
四、ThreadPoolExecutor 底层原理
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize :核心线程大小。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创建新线程,等到工作的线程数大于核心线程数时就不会在创建了,如果调用了线程池的 prestartAllCoreThreads 方法,线程池会提前把核心线程都创造好,并启动。
- maximumPoolSize:线程池允许创建的最大线程数,此值必须大于等于1。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果我们使用了无界队列,那么所有的任务会假如队列,这个参数就没有什么效果了。
- keepAliveTime :多余的空闲线程的存活时间,当前线程池中线程数量超过 corePoolSize 时,当空闲时间,达到 KeepAliveTime 时,多余线程会被销毁直到只剩下 corePoolSize 个线程为止,如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率。
- unit : keepAliveTime 的时间单位,可以选择的单位有 天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。
- workQueue : 任务队列,被提交但尚未被执行的任务,用于缓存待处理任务的阻塞队列
- threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可。也可以通过线程工厂给每个创建出来的线程设置更有意义的名字
- handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何拒绝请求执行的 runnable 的策略。
线程池的执行原理是根据线程池的参数设置有关,比如核心线程数,最大线程数,阻塞队列等。每当有新的任务来临时,它的任务是这样的。
调用线程池的 execute 方法处理任务,执行 execute 方法的过程:
- 1、判断线程池中运行的线程个数是否小于核心线程数,如果是,则创建线程来处理任务,如果否,则执行下步;
- 2、尝试将任务放入 workQueue 指定的队列中,如果队列满了,则执行下步;
- 3、判断线程池中运行的线程数是否小于 maximumPoolSize,如果是:则新增线程处理当前传入的任务,否:将任务传递给 handler 对象的拒绝策略方法处理。
- 4、当一个线程完成任务时,它会从队列中取下一个任务来执行;
- 5、当一个线程无事可做超过一定时间,线程会判断:如果当前运行的线程数大于 核心线程数,那么这个线程就会停掉;所以线程池的所有任务完成后,它最终会收缩到 核心线程数的大小。
五、拒绝策略
5.1 线程池的拒绝策略
当阻塞队列排满了,且线程池中的线程数达到了最大线程数,这个时候就需要拒绝策略机制来处理新来的请求任务。
JDK 内置的四种拒绝策略:
- AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行,拒绝任务,并抛出异常。
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是调用任务线程执行当前任务。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交任务
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略
以上四种策略均实现了 RejectedExecutionHandle 接口
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo5 {
static class Task implements Runnable {
String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "处理" + this.name);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{" +
"name='" + name + ''' +
'}';
}
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
1,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
(r, executors) -> {
//自定义饱和策略
//记录一下无法处理的任务
System.out.println("无法处理的任务:" + r.toString());
});
for (int i = 0; i < 5; i++) {
executor.execute(new Task("任务-" + i));
}
executor.shutdown();
}
}
无法处理的任务:Task{name='任务-2'}
无法处理的任务:Task{name='任务-3'}
pool-1-thread-1处理任务-0
无法处理的任务:Task{name='任务-4'}
pool-1-thread-1处理任务-1
输出结果中可以看到有3个任务进入了饱和策略中,记录了任务的日志,对于无法处理多任务,我们最好能够记录一下,让开发人员能够知道。任务进入了饱和策略,说明线程池的配置可能不是太合理,或者机器的性能有限,需要做一些优化调整。
5.2 生产中合理的设置参数
要想合理的配置线程池,要分析任务的特性,可以从以下几个角度分析:
- 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务
- 任务的优先级:高、中、低;
- 任务的执行时间:长、中、短
- 任务的依赖性:是否依赖其他的系统资源,如数据库连接
性质不同任务可以用不同规模的线程池分开处理。
- CPU密集型任务应该尽可能小的线程,如配置cpu数量+1个线程的线程池。
- 由于IO密集型任务并不是一直在执行任务,不能让cpu闲着,则应配置尽可能多的线程,如:cup数量*2。
- 混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这2个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。可以通过Runtime.getRuntime().availableProcessors()方法获取cpu数量。优先级不同任务可以对线程池采用优先级队列来处理,让优先级高的先执行。
使用队列的时候建议使用有界队列,有界队列增加了系统的稳定性,如果采用无解队列,任务太多的时候可能导致系统OOM,直接让系统宕机。
线程池汇总线程大小对系统的性能有一定的影响,我们的目标是希望系统能够发挥最好的性能,过多或者过小的线程数量无法有消息的使用机器的性能。咋Java Concurrency inPractice书中给出了估算线程池大小的公式:
Ncpu = CUP的数量
Ucpu = 目标CPU的使用率,0<=Ucpu<=1
W/C = 等待时间与计算时间的比例
为保存处理器达到期望的使用率,最有的线程池的大小等于:
Nthreads = Ncpu × Ucpu × (1+W/C)
1、CPU 密集型
// 查看CPU核数
System. out .println(Runtime. getRuntime ().availableProcessors());
2、IO 密集型
由于 IO 密集型任务线程并不是一直执行任务,则赢配置尽可能多的线程,如 CPU 核数 * 2
六、BlockingQueue 阻塞队列
线程1往阻塞队列里添加元素,线程2从阻塞队列里移除元素
当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
种类分析:
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于
- LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用这个队列
- LinkedTransferQueue:由链表组成的无界阻塞队列。
- LinkedBlockingDeque:由链表组成的双向阻塞队列。
注意:建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点。
七、自定义线程池
/**
* @author xiao lei
* @Date 2020/11/6
* @module
*/
public class CustomThreadPoolExecutor {
private ThreadPoolExecutor poolExecutor = null;
/**
* 线程池初始化方法
* <p>
* corePoolSize 核心线程池大小----1
* maximumPoolSize 最大线程池大小----3
* keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit
* TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES
* workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(5)====5容量的阻塞队列
* threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂
* rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,
* 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),
* 任务会交给RejectedExecutionHandler来处理
*/
public void init() {
poolExecutor = new ThreadPoolExecutor(
7,
7,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(300),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
}
public void destory() {
if (poolExecutor != null) {
poolExecutor.shutdownNow();
}
}
public ExecutorService getCustomThreadPoolExecutor() {
return this.poolExecutor;
}
private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 核心改造点,由blockingqueue的offer改成put阻塞方法
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
spring中使用,可动态配置
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
@ConfigurationProperties(prefix = "xiaoleimall.thread")
@Component
@Data
@Primary
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
#配置线程池
xiaoleimall.thread.coreSize=20
xiaoleimall.thread.maxSize=200
xiaoleimall.thread.keepAliveTime=10
八、阿里开发规范
1、【强制】在创建线程或线程池时,请指定有意义的线程名称,方便出错时回溯。
public class TimeTaskThread extends Thread{
public TimeTaskThread(){
super.setName("TimeTaskThread");
}
}
2、【强制】线程资源必须通过线程池提供,不允许在应用中自行显示创建线程
【说明】:使用线程池的好处是减少资源开销。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者cpu切换太频繁的问题。
3、【强制】线程池不允许使用 Executors 创建,而是通过 ThreadPoolExecutor的方式创建,这样的处理方式能让编写代码的工程师更加明确线程池的运行规则,规避资源耗尽的风险。
说明:
- Executors 返回的线程池对象的弊端如下
-
- 1)FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
- 2)CachedThreadPoll 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM
转载自:https://juejin.cn/post/7129013968038789156