【多线程03】谈谈Java中的线程池
本文主要有以下内容
- 介绍Java中内置的线程池
- 线程池的七大参数
- 线程池的生命周期
- 线程池的工作原理
- 自定义线程池
Java内置的线程池
线程池:为了节约系统资源,减少创建线程的开销以及更好的管理线程,Java
提供了一套Executor
框架,封装了对多线程的控制,其体系结构如下图所示:
Executor
本身是一个接口,其代码如下:
public interface Executor {
void execute(Runnable command);
}
ExecutorService
接口对该接口进行了扩展,增加很多方法
- shutdown()
- shutdowmNow()
- isShutdown()
- isTerminated()
- awaitTermination()
- submit(Callable<T>)
- submit(Runnable,T)
- submit(Runnable)
- invokeAll()等重载方法
重点关注前五个方法:
shutdown()
: 调用此方法通知线程池shutdown
,调用此方法后,线程池不再接受新的任务,已经提交的任务不会受到影响,会按照顺序执行完毕。不会阻塞调用此方法的线程。shutdowmNow()
,立即尝试停止所有正在运行的任务,返回一个待执行的任务列表。不会阻塞调用此方法的线程。该方法除了尽力去尝试停止线程外,没有任何保证,任何响应中断失败的线程可能永远不会停止(如:通过thread.interrupted()中断线程时)。isShutdown()
:返回一个boolean值,如果已经shutdown
返回true,反之false。awaitTermination(timeout,timeUnit)
:阻塞直到所有任务全部完成,或者等待 timeout ,或者在等待timeout期间当前线程抛出InterruptedException
isTerminated()
: 返回true
如果所有的任务已经完成且关闭,否则返回false
除非在先前已经调用过shutdown()/shutdownNow()
AbstractExecutorService
是一个抽象类,实现了ExecutorService
,其子类ThreadPoolExecutor
进一步扩展了相关功能,在Java中,贴心的Doug Lea
提供了一个工具类供我们去使用ThreadPoolExecutor
,在Executors
中提供了如下几种线程池
方法名 | 描述 |
---|---|
newCachedThreadPool() | 必要时创建新的线程,空闲线程保留60s |
newFixedThreadPool() | 创建固定数目的线程;空闲线程会一直保留 |
newWorkStealThreadExecutor() | 一种适合fork-join 任务的线程池,复杂任务拆分为简单的任务,空闲线程会来帮忙 |
newSingleThreadExecutor() | 只有一个线程的线程池,按顺序执行所提交的任务 |
newScheduledThreadPool() | 用于调度执行的固定线程池 |
newSingleThreadScheduledExecutor() | 用于调度执行的单线程池 |
虽然有这么多的线程池,但都是给ThreadPoolExecutor
的构造函数传递不同的参数罢了!
上面所提到的线程池中,需要注意的一个线程池为newScheduledThreadPool()
,他的源码如下
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
返回的是一个ScheduledThreadPoolExecutor
对象,在这个类中我们需要注意这三个方法的使用
- public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);
schedule()
会在给定的时间进行一次调度,后面的两方法会周期性的对任务进行调用,但是还有些许差异,scheduleWithFixedDelay()
会在上一次任务执行完毕后等待给定的delay时间后再执行,但是如果代码运行的时长大于delay,则会在运行结束后立即运行。scheduleAtFixedRate()
则是在上次任务执行的开始时间之后的period后就执行。
scheduleAtFixedRate()
的使用:
代码示例:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
AtomicInteger i = new AtomicInteger(1);
System.out.println("当前时间是:"+LocalDateTime.now());
executorService.scheduleAtFixedRate(() -> {
System.out.println(LocalDateTime.now());
System.out.println("得到" + i + "次执行拉");
i.getAndIncrement();
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}, 3, 2, TimeUnit.SECONDS);
运行结果如下图:
从上面的运行结果可以知道,基本上每一次调度都是在上一次开始之后的 2s
之后,
注意:如果代码运行的时间超过了等待时间,则上一次调度结束后,立马执行。
打开注释的代码,得到的运行结果如下:
可以看到,每一次运行的结果时间间隔并不是之前的 2s
,而是 3s!!
小结: scheduleAtFixedRate的调度流程
- 先等待 delay 时间后运行
- 此时如果代码运行的时间 < period,则下次运行的时间是上一次开始调度的时间的period时间后。
- 如果代码运行时间 > period,则下次运行的时间是在上一次结束之后立马运行。
scheduleWithFixedDelay()
的用法:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
AtomicInteger i = new AtomicInteger(1);
System.out.println("当前时间是:"+LocalDateTime.now());
executorService.scheduleWithFixedDelay(()->{
System.out.println("运行开始: "+LocalDateTime.now());
System.out.println("得到"+i+"执行拉");
i.getAndIncrement();
// try {
// Thread.sleep(4000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("运行结束: "+ LocalDateTime.now());
},3,2,TimeUnit.SECONDS);
打开注释,得到如下运行结果:
从上面两次运行的结果可以看到 scheduleWithFixedDelay()
的调度间隔和其代码的运行时间没有关系,相邻的间隔时间固定。
线程池的七大参数
在ThreadPoolExecutor
提供了如下的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:核心线程数,即一直存活的线程数量,
-
maximumPoolSize:线程池允许的最大线程数量
-
keepAliveTime:当线程池中的线程数量超过核心线程数量时,空闲线程的最大的存活时间
-
unit:keepAliveTime的时间单位
-
workQueue:工作队列,用于存放未执行的任务的队列
-
threadFactory:线程工厂,创建新线程的地方
-
Handler:拒绝策略,当线程池不接受任务时采取的策略
- DiscardPolicy:直接丢弃
- DiscardOldestPolicy:丢弃等待最长时间的任务u
- AbortPolicy:默认的拒绝策略,抛出
RejectedExecutionException
异常 - CallerRunsPolicy:只要线程池不处有shutdown,则将任务交给调用者线程执行,即调用
execute()
方法的线程,如果处于shutdown,则会被丢弃执行。
在上面的表中,列出了Executors工具类中所提供的创建线程的方法,本质上就是7大参数的不同值。
如newFixedThreadPool
,核心线程等于最大线程则为固定线程
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
在比如newCachedThreadPool()
,如果当前线程池有空闲线程可用,则立即执行,如果没有空闲线程,则立即创建新的线程执行,不把他放入工作队列中去。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),threadFactory);
}
SynchronousQueue
是BlockingQueue
的一种实现,他没有任何的内部容量,往这个队列中进行插入操作必须等待另一个线程的remove
操作,在下一篇并发容器的相关文章中做详细介绍。
线程池的生命周期
在ThreadPoolExecutor
类中定义了如下几种线程池的状态
- RUNNING:接收新任务和处理队列中的任务
- SHUTDOWN:不在接收新的任务,但是会处理等待队列中的任务
- STOP:不在接收新任务且不会处理等待队列中的任务,还要中断正在运行的任务
- TIDYING:所有任务完成且工作线程数为0,调用terminate()会进入到此状态
- TERMINATED:terminate()运行完毕之后会进入这种状态
线程池的工作原理
在ThreadPoolExecutor
这个类中,其execute()
方法,展示了线程池的工作原理
源码如下:
public void execute(Runnable command) {
if (command == null) // 1
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 2
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //3
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 4
reject(command);
}
第一:检查传进来的对象是否为null,如果是则抛出空指针异常。
第二:如果工作线程数小于核心线程数则通过addWoker()
将任务交给线程池处理,即立马执行当前的任务
第三:如果工作线程数大于等于核心线程数,则将任务加入到工作队列
第四:如果工作队列已满,则交给线程池处理,如果当前线程数小于最大线程数,则创建新的线程运行任务,反之拒绝该任务
自定义线程池
在思考如何自定义线程池之前,需要首先回顾下线程池的七大参数:
- 核心线程数:常驻线程池的线程
- 最大线程数:线程池的最多的线程容量
- 存活时间和时间单位:空闲线程的最大存活时间
- 工作队列:存储还没来得及处理的任务的容器
- 线程工厂:创建新线程的地方
- 拒绝策略:说 "No" 的方式
在这7大参数中:核心线程数,最大线程数,以及存活时间和时间单位在我个人看来不是那么重要!毕竟在生产中这些参数在具体场景下都会得到确定,不会有什么特别可以定制的地方,(他好像没那么重要.jpg)
而工作队列的选择和拒绝策略则可以有较多的选择只要是实现了BlockedQueue接口的容器都可以当作工作队列,换句话说就是只要实现了该接口,都可以充当工作队列。同样的,在 jdk 中默认实现的4种拒绝策略,他们都实现了RejectedExecutionHandler
接口!而这个接口的作用就是定义哪些不能够被线程池处理的任务,这个接口里面只有一个方法rejectedExecution(Runnable r, ThreadPoolExecutor executor);
这个方法的调用时机就是线程池无法接收新的任务时!
线程工厂:ThreadFactory
接口中,定义了newThread(Runnable r);
方法进行创建新线程。因此只要实现了改接口,也能够根据自己的意愿去创建新线程!
因此在创建自定义线程池的时候,我们可以进行如下选择
- 实现ThreadFactory:定义创建线程的方式,在Executors中有默认实现,但也可以自己去实现!
- 实现RejectedExecutionHandler:自定义拒绝策略
- 实现BlockingQueue:就可以自定义工作队列
除此之外,在Executors
工具类中,其创建线程的方式如以下几种:
// Executors
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
// ThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
// Executors
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
大多都是根据不同的工作队列,创建了不同特性的线程池!
创建自定义拒绝策略的线程池
参照 netty 的代码,创建如下的拒绝策略:
public class MyRejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
System.out.println("给你机会,你不中用啊");
final Thread t = new Thread(r, "new thread execute new task");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
这里创建的线程池是借助了ThreadPoolExecutor
, 如果对自己的能力自信,可以自己去实现一个自己的"ThreadPoolExecutor"
,来给人们一点小小的震撼!
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
2,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
new MyRejectHandler());
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println("new Task submit " + LocalDateTime.now());
});
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
先打开注释,放慢任务提交,此时没有触发拒绝策略
注释掉这段线程延时代码,可以得到如下输出,打印了七次给你机会,你不中用啊
,这是因为常驻线程为2,可以处理2个任务,工作队列的容量为1,可以保存一个,因此,可以处理3个任务,而通过下面打印的时间,就可以发现在同一时刻,提交了9个任务,因此处理不过来所以打印了 10- 3 = 7次
参考资料:
- Java核心技术卷1
- Java高并发程序设计
- Executors,ThreadPoolExecutor等源码
转载自:https://juejin.cn/post/7240111396870012988