java并发库提供的线程池总结
通常开发者都是利用 Executors 提供的通用线程池创建方法,去创建不同配置的线程池,主要区别在于不同的 ExecutorService 类型或者不同的初始参数。
Executor的基本组成
Executor
它是一个基础的接口 设计初衷是将任务提交和任务执行解耦
void execute(Runnable command);
ExecutorService
它是Executor的完善版本 提供service的管理功能(shutdown等方法)、更加全面的任务提交机制(如返回Future而不是void的submit方法)
<T> Future<T> submit(Callable<T> task);
这里输入是Callable,它解决了Runnable无法返回结果的困扰
ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool
这些线程池的设计特点在于其高度的可调节性和灵活性,以尽量满足复杂多变的实际应用场景
Executors
它从简化的使用角度,提供了各种方便的静态工厂方法
Executors目前提供了五种不同的线程池创建配置
newCachedThreadPool()
用于处理大量短时间工作任务的线程池,它会是缓存线程并重用,当无缓存线程可用时,会创建新的工作线程;如果线程闲置时间超过60秒,就会被终止并移出缓存;长时间闲置时,这种线程不会消耗什么资源;其内部是使用SynchronousQueue作为工作队列
适用场景:
任务数量动态变化:当任务数量随着时间变化较大,且无法预测时,newCachedThreadPool
可以自动调整线程数量来处理这些任务。
短生命周期的任务:如果任务执行时间短,那么创建和销毁线程的开销相对较小,此时使用 newCachedThreadPool
可以减少线程的创建和销毁次数,提高系统性能。
资源利用率:由于线程池中的线程会在60秒(默认值)内回收,所以它适合于需要高资源利用率的场景
newFixedThreadPool(int nThreads)
指定数目的线程,使用的是无界队列,任何时候最多有nThreads个线程是活动的;任务多余线程数,则会等待空闲线程,如果有工作线程退出或终止了,将会创建新的线程,补足nThreads数量;
适用场景:
固定数量的任务:当你预先知道需要处理固定数量的任务时,使用newFixedThreadPool
可以创建一个固定大小的线程池,从而避免资源的浪费。
资源限制:由于newFixedThreadPool
使用的是固定大小的线程数组,它可以避免因为创建过多线程而导致的资源耗尽问题。
顺序执行:如果任务需要顺序执行或者需要一定的执行顺序,固定大小的线程池可以保证这一点。
简化同步:使用固定大小的线程池可以简化同步逻辑,因为你可以确信在任何给定时间只有固定数量的线程在执行。
长时间任务:对于一些可能需要较长时间才能完成的任务,使用固定线程池可以在任务执行期间保持线程数量稳定。
newSingleThreadExecutor()
它的特点是工作线程的数目限制为1,操作一个无界工作队列;所有任务都是顺序执行,最多只会有一个任务处于活动状态,并且不允许使用者修改线程实例,因此可以避免改变其线程数目
这里创建线程出来 corePoolSize和maximumPoolSize都是1,通过FinalizableDelegatedExecutorService包装保证线程池无法被修改。这个工作模式下ThreadFactory会被使用一次 用于创建线程池中的唯一线程,如果线程因为异常终止,ThreadFactory可能会被再次调用创建一个新的线程
newSingleThreadExecutor
的应用场景主要适合于以下情况:
顺序执行任务:由于newSingleThreadExecutor
限制了工作线程数为1,因此它能保证所有提交的任务都按照提交顺序串行执行,这对于需要顺序处理任务的场景非常有用。
简化同步:在单线程的上下文中,不需要考虑线程同步的问题,因为不会有多个线程同时访问共享资源。
资源限制:当你需要限制某个操作或任务的最大并发数量时,使用newSingleThreadExecutor
可以确保同一时刻只有一个任务在执行。
定时任务:尽管newSingleThreadExecutor
本身不直接提供定时任务的功能,但它可以结合其他工具来实现简单的定时任务处理。
package com.chinairi.redisson.redisson.util.lock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
// 创建一个单线程线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 提交多个任务
for(int index = 0; index < 5; index++) {
final int taskId = index;
executorService.submit(() -> {
System.out.println("任务ID:" + taskId + ";线程名称:" + Thread.currentThread().getName());
});
}
// 关闭线程池
executorService.shutdown();
}
}
newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize)
创建的是个ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程
适用场景:
需要定时执行的任务:当你有一些需要按计划在将来的某个时间点执行的任务时,例如定时发送消息、定时更新状态等。
周期性执行的任务:当你需要周期性地执行某个任务,例如每隔一段时间进行数据统计、检查服务等。
需要顺序执行的任务:由于它只使用一个工作线程,因此它可以保证所有提交的任务都会按照提交顺序串行执行。
package com.chinairi.redisson.redisson.util.threads;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class SingleThreadScheduledExample {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
// 延迟2秒后执行一次任务
executor.schedule(() -> System.out.println("执行任务了!"), 2, TimeUnit.SECONDS);
// 延迟2秒后,每隔3秒执行一次任务
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
// 任务内容
System.out.println("这里是延迟两秒,每隔三秒执行一次!");
}, 2, 3, TimeUnit.SECONDS);
}
}
newWorkStealingPool(int parallelism)
java8新建的方法 内部会构建ForkJoinPool 利用Work-Stealing算法 并行处理任务 不保证处理顺序
适用的场景:
并行计算:当你有一组任务可以并行处理,并且任务之间没有依赖关系时,可以使用 newWorkStealingPool
。这种线程池能够动态地分配工作线程,提高 CPU 利用率。
大量短生命周期的任务:对于那些生成很多短生命周期的子任务的工作负载,工作窃取算法可以帮助重用线程,减少线程创建和销毁的开销。
减少线程上下文切换:由于工作窃取算法倾向于在有空闲线程的处理器上执行任务,这样可以减少跨多个处理器的线程上下文切换。
无界队列任务处理:由于 ForkJoinPool
默认使用无界队列,适合于处理大量任务而不会因为队列满而阻塞。
package com.chinairi.redisson.redisson.util.threads;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WorkStealingPoolDemo {
public static void main(String[] args) {
// 创建具有特定并行度的线程池
int parallelism = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newWorkStealingPool(parallelism);
// 提交任务
executor.submit(() -> {
// 这里执行任务代码
System.out.println("任务执行了!");
});
executor.shutdown();
}
}
分析下线程池的设计与实现
工作队列
工作队列的任务是存储用户提交的任务,这个队列可以是容量为0的SyschronousQueue(使用newCachedThreadPool), 也可以是固定大小的LinkedBlockingQueue(使用newFixedThreadPool)。
private final BlockingQueue<Runnable> wordQueue;
内部线程池-工作线程管理
线程池中管理线程的创建、销毁。例如对于带缓存的线程池,当任务压力大的时候会创建新的线程,当任务压力退去的时候又会闲置一些线程(闲置线程60秒后会结束)
private final HashSet<Work> workers = new HashSet<>();
线程池的工作线程被抽象为静态内部类Worker 基于AQS算法实现
ThreadFactory
提供工作线程管理中的所需要的创建线程的逻辑 如果从应用逻辑提交任务被拒绝 比如线程池已经处于shutdown状态,需要为其提供处理逻辑,java标准库提供了类似ThreadPoolExecutor.AbortPolicy(终止策略)等默认实现
从上面可以看到线程池的几个基本组成部分,都能在线程池的构造函数中提现,具体的构造函数参数有:
corePoolSize
核心线程数,也就是长期驻留的线程数目 不同的线程池 值的差距很大 newFixedThreadPool的核心线程数就是nThreads,newCacheThreadPool则是0
maximumPoolSize
线程不够的时候能够创建的最大线程数。newFixedThreadPool的最大线程数依然是nThreads;newCacheThreadPool这是Integer.MAX_VALUE。
keepAliveTime和TimeUnit
指定额外的线程能够闲置多久 一个是时间一个是时间单位
workQueue
指定工作队列 必须是BlockingQueue
ThreadFactory
用于创建新线程的工厂
RejectedExecutionHandler
当任务无法被线程池执行的时候的拒绝策略
例如:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)
那么 线程的生命周期又是如何的呢?
execute的工作方式
public void execute(Runnable command) {
…
int c = ctl.get();
// 检查工作线程数目,低于corePoolSize则添加Worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// isRunning就是检查线程池是否被shutdown
// 工作队列可能是有界的,offer是比较友好的入队方式
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次进行防御性检查
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 尝试添加一个worker,如果失败意味着已经饱和或者被shutdown了
else if (!addWorker(command, false))
reject(command);
}
使用线程池 应该注意:
避免任务堆积
如使用newFixedThreadPool,其创建线程的数量是指定的,但是其默认工作队列是无界的,如果工作线程太少,但是又有大量的任务入队,导致处理速度跟不上入队的速度则可能会占用大量的系统内存,甚至内存溢出(当然 也可以修改线程池的工作队列为有界的ArrayBlockingQueue)。诊断时,使用jmap之类的工具查看是否有大量对象入队
避免过度扩展线程
如果可以预料到任务压力,可以适当的指定线程数量,但是在实际运用中,很难预料任务压力,通常都是用缓存线程池(newCacheThreadPool);在最新的HTTP/2 client API中,目前的默认实现就是是用的缓存线程池
线程数目不断增长问题
线程数目不断增长 可以使用jstack等工具来检查,也有可能是线程泄露,这种往往是任务逻辑问题导致工作线程不能被释放
避免死锁问题
注意线程不要相互调用 当A持有锁1 B持有锁2的时候A去获取2 B去获取1 这种场景要避免出现
线程池使用中不操作ThreadLocal 工作线程的生命周期通常会超过任务的生命周期
线程池大小的选择策略
任务主要是计算
当任务主要是用于计算逻辑 那么会比较占用CPU的资源 不能使用太多的线程 线程太多会增加上下文切换的开销 这种通常建议按照cpu的核数N或者N+1
较多等待的任务
I/O操作较多 可以参考Brain Goetz推荐的计算方法
线程数 = CPU核数 * 目标CPU利用率 * (1 + 平均等待时间/平均工作时间)
转载自:https://juejin.cn/post/7381372390317735947