速度优化:线程池优化
线程是执行任务的基本单元,他的重要性不言而喻,通过合理的使用线程,我们可以更充分的发挥 CPU 的性能,极大的提升程序的体验。如何才能更合理的使用线程呢?这需要我们做很多事情。
比如需要将程序中线程的数量控制在合适的范围,既不能太多也不能太少,线程太多会浪费过多 CPU 资源用于线程调度上,并且会导致程序的主线程因无法获得足够 CPU 资源而发生卡顿等性能问题。而线程太少又没法充分发挥出 CPU 的性能,同样会导致程序的性能体验不佳;比如需要尽量减少线程自身导致的性能损耗,线程频繁的创建销毁,频繁发生状态的切换,都会导致过多的 CPU 损耗。
想要合理的使用线程并不是一件容易的事,但是我们可以通过线程池来做到更合理的使用线程,因此线程池的重要性也是不可忽视的,它是每一个开发者都需要掌握的知识。在这一章,我们就一起来看看如何正确的使用和优化线程池,以此来充分提升线程池的调度效率,让程序有更好的速度与流畅性。
一、默认线程池创建方式
我们先来看看如何创建线程池。线程池作为一个最基本的能力,Java 库给我们默认提供 Executors 工具类来创建线程池,这个工具类里面提供了十几个 “newThreadPool” 的静态方法用来创建线程池,如图 5-16 所示,如果是对线程池的知识了解的并不太多的开发新人,肯定会因为要选择哪一个创建方法而困扰。所以我们看一看 Java 库的 Executors 对象中提供的这些方法是如何创建线程池的。
这些线程池创建方法可以分为三类,第一类是 newSingleThreadExecutor、newFixedThreadPool、newCacheThreadPool 方法,代码实现如下,可以看到这些方法实际都是创建了不同入参的 ThreadPoolExecutor 对象。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
第二类是 newSingleThreadScheduledExecutor、ScheduledThreadPoolExecutor 这两类方法是用来创建调度线程池的,可以用来执行执行延时任务或者周期性任务,通过源码可以看到他们都是创建的 ScheduledThreadPoolExecutor 对象。而 ScheduledThreadPoolExecutor 实际也是继承自 ThreadPoolExecutor 这个对象。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
剩下的 newWorkStealingPool 等方法用了 ForkJoinPool 这个线程池,代码如下,它其实是在 Java 8 才出现的一种线程池,专门用来处理并发类算法,由于使用场景较少,所以很少用到。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
二、线程池配置解析
分析 Executors 对象创建线程池的方法实现,可以发现这些线程池都是 ThreadPoolExecutor 不同入参的实现类。如果我们能将 ThreadPoolExecutor 构造函数中的入参全部熟悉了,那么也就掌握了线程池的用法,所以我们看一下该对象的构造函数中的入参有哪些。ThreadPoolExecutor 对象的构造函数如下。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler)
该构造函数中每个入参的详细解释如下表:
入参 | 解释 |
---|---|
corePoolSize(核心线程数量) | 在线程池被创建时,会预先创建一定数量的核心线程,并将它们保持活动状态,以便能够立即执行任务。除非手动调用了 allowCoreThreadTimeOut 方法用来申明核心线程需要退出,否则核心线程启动后便一直是存活不退出的状态,即使当前没有任务可执行,也不会被销毁。通过设置适当的核心线程数,可以平衡线程池的性能和资源消耗。如果核心线程数设置得太小,可能无法及时处理到达的任务,导致性能下降。而设置得太大则可能会浪费系统资源。 |
maximumPoolSize(最大线程数量) | 当有新任务到来,而核心线程又全在执行任务没法响应这些新的任务时,这些新任务会放在缓存队列中,如果缓存队列也满了,线程池就会启动新的线程来执行这些任务,这些线程被称为非核心线程,非核心线程的数量加上核心线程的数量就是线程池最大线程数量。 |
keepAliveTime(非核心线程的空闲时间) | keepAliveTime 定义了非核心线程在空闲状态下的存活时间。如果一个非核心线程的空闲时间达到了keepAliveTime 所设定的值,那么它就会被线程池回收销毁以减少资源消耗。 |
unit(时间单位) | keepAliveTime 的的时间单位,如秒,分等 |
workQueue(任务队列) | 线程池中用于存储待执行任务的缓存队列,常见的缓存队列有 LinkedBlockingDeque 和 SynchronousQueue 这两种:LinkedBlockingDeque 是一个双向的并发队列,主要用于 CPU 线程池;SynchronousQueue 虽然也是一个队列,但它并不能存储任务,所以该队列会将添加进来的任务直接交给新的线程去处理,而不会存储这些任务,主要用于 IO 线程池中。任务队列在线程池中起到重要的作用,它可以帮助控制并发任务的数量,平衡任务的生产和消费速度,以及提供任务的排队机制。 |
threadFactory(线程工厂) | 用于创建线程的工厂对象。可用于自定义线程的创建方式和属性,包括线程的名称、优先级、线程组等。在虚拟内存优化时,也提到过可以使用自定义的线程工厂,来创建栈空间只有 512 KB 的线程。 |
rejectedExecutionHandler(拒绝策略) | 当线程池已经饱和,无法再接受新的任务时,拒绝策略定义了对这些新任务的处理方式。默认的策略会抛出RejectedExecutionException 异常,并阻止任务的提交。 |
这里需要特别注意的是,只有当 workQueue 缓存队列容量满了,才会开始创建非核心线程用于新任务的执行,我们可以通过 ThreadPoolExecutor 的 execute 方法实现证实这一点,实现代码如下。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当线程数小于核心线程,则直接创建线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//当核心线程已满并且都在运行状态,则将task添加到workQueue缓存队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//如果线程数为 0,则调用addWorker创建线程
addWorker(null, false);
}
//当task往队列添加失败时,才会调用addWorker启动新的线程
else if (!addWorker(command, false))
reject(command);
}
通过对线程池入参的了解,我们知道了一个线程池的配置项是多样性的,需要结合设备的性能和业务的特性去进行合理的配置,以此来提升线程池的调度效率,但是通过 Executors 对象的提供的线程池的创建方法,却没法灵活的去配置这些参数,所以这会导致通过默认方式创建的线程池无法充分的提升调度效率。因此我们接着来继续看看如何自定义创建线程池。
三、线程池类型及创建
想要创建更合理的线程池,我们还需要进一步了解线程池类型有哪些,它们又各有什么特性。这样才能针对业务的场景,去自定义更合适的线程池。在业务中使用最频繁的,主要有调度线程池, CPU 线程池,IO 线程池和这三类线程池。不同类型的线程池有不同的职责,专门用来处理对应类型的任务,如调度线程池用来处理周期或延时任务,如性能指标的采集等;CPU 线程池用来处理 CPU 类型任务,如计算,逻辑操作,UI 渲染等;IO 线程池用来处理 IO 类型任务,如拉取网络数据,往磁盘读写数据等。
我们先看看调度线程池,它继承自 ThreadPoolExecutor,是对 ThreadPoolExecutor 的封装和扩展,调度线程池的构造函数如下所示。
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
可以看到构造函数已经定义好了大部分的入参,我们能够设置的入参只有核心线程数量,线程工厂和拒绝策略,因此对于调度线程池来说,并不需要考虑如何定义入参,用默认提供的方法创建即可。所以我们需要重点掌握的是 CPU 线程池和 IO 线程池。
-
CPU 线程池
CPU 线程池的主要作用是有效地管理和执行 CPU 密集型任务,以达到充分发挥 CPU 资源,提升应用的性能的目的。带着这个目的,我们看一下 CPU 线程池的入参要如何设置。
-
corePoolSize 核心线程数
- 首先是 corePoolSize 核心线程数。CPU 线程池是用来执行 CPU 类型任务的,所以它的核心线程数量一般为 CPU 的核数,理想情况下每一个 CPU 核运行一个线程,这种情况下既能充分发挥 CPU 的性能,还减少了频繁调度导致的 CPU 损耗。虽然程序在实际运行过程中无法达到理想情况,但是将核心线程数设置为 CPU 核数个依然是最稳妥的配置。
-
maximumPoolSize 最大线程数
- 对于 CPU 线程池来说,每个 CPU 核心对应一个线程,就能将 CPU 充分发挥出来,如果线程数量超过了 CPU 核数,只会带来不必要的 CPU 切换和调度导致的性能损耗。因此 CPU 线程池的最大线程数就是核心线程数,当 CPU 线程池中的线程已处于忙碌状态而无法处理新任务时,新来的任务放入到任务缓存容器中。
-
keepAliveTime 线程存活时间
- 既然 CPU 线程池没有非核心线程,所以 keepAliveTime 这个表示非核心线程数的存活时间的值设置为 0 即可。
-
workQueue 任务缓存队列
- CPU 线程池中一般使用 LinkedBlockingDeque,这是一个可以设置容量并支持并发的队列。由于 CPU 线程池的线程数量较少,如果较多任务来到且没有空闲的核心线程可以执行任务时,这些任务就需要放在缓存队列中。缓存队列的容量默认情况下是无限大的,但是这样的容量设置并不是一个好的配置。如果程序有些异常的死循环逻辑不断地往队列添加任务,而这个队列却能一直缓存任务,那么就很难发现异常。但是当我们将这个队列设置成有限的,比如 512 个,那么异常的死循环就会将队列打满,接下来的任务进入到拒绝策略的逻辑中,这样我们就可以在拒绝策略添加监控,就能及时发现这个异常了。
-
拒绝策略
- 当缓存队列中存储的任务达到上限,并且也没有可用的非核心线程来处理这些无法放在缓存队列中的任务,那么这些任务就会进入到一个异常的兜底函数 rejectedExecution 中,Executors 对象创建的线程池使用的是默认的兜底策略,其代码实现如下,可以看到此时会直接抛出异常。
-
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
- 但是直接抛出异常会导致程序崩溃,是会影响用户体验的。为了更好的体验,我们需要自定义拒绝策略,对异常的任务和线程池进行上报,以便后续用于问题的排查和修复,同时还可以将这些任务添加到一个可以执行并发任务的 Hander 中,让 Handler 来兜底执行这些任务,将程序的影响降低到最低,代码实现如下。
-
class CoreRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { String taskName = r.getClass() //异常上报 report(taskName, executor); if (rejectHandlerThread == null) { HandlerThread rejectHandlerThread = new HandlerThread("core-reject"); rejectHandlerThread.start(); sRejectThreadHandler = new LarkHandler(rejectHandlerThread.getLooper()); } //通过handler兜底执行任务 sRejectThreadHandler.post(task); } }
-
ThreadFactory 线程工厂
- 线程工厂可以用来设置线程的属性,因此我们可以通过线程工厂将线程进行统一的命名,统一命名后的线程在分析和排查异常或性能问题时会有很大的帮助,我们还可以统一的提升 CPU 线程池中线程的优先级,以此来提升 CPU 线程池执行任务的效率,自定义线程工厂的方案实现代码如下。在方案中实现中,我们可以通过构造函数传入线程的前缀名以及线程的优先级,为了保障线程优先级设置的成功率,可以将 Runnable 进行包装,然后在线程真正运行的时候,也就是 Runnable 的 run 方法中,通过 Process.setThreadPriority 方法来进行优先级的设置。
-
public class CoreThreadFactory implements ThreadFactory { private static final String TAG = "CoreThreadFactory"; private final AtomicInteger mThreadNum = new AtomicInteger(1); private final String mPrefix; private final int priority; public CoreThreadFactory(String prefix, int priority) { this.mPrefix = prefix; this.priority = priority; } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + "-" + mThreadNum.getAndIncrement(); Thread ret = new Thread(new AdjustThreadPriority(priority, runnable), name); return ret; } public static class AdjustThreadPriority implements Runnable { private final int priority; private final Runnable task; public AdjustThreadPriority(int priority, Runnable runnable) { this.priority = priority; task = runnable; } @Override public void run() { try { //在线程真正运行时在设置优先级 Process.setThreadPriority(priority); } catch (Exception e) { Log.e(TAG, "AdjustThreadPriority run: ", e); } task.run(); } } }
了解了这些参数该如何配置,我们就来看一下要如何创建一个好用的 CPU 线程池。基于架构的扩展性考虑,我们可以创建一个 CoreThreadPoolExecutor 类来继承 ThreadPoolExecutor,后 CPU 线程池和 IO 线程池都可以继承该 CoreThreadPoolExecutor 对象,而不是直接继承 ThreadPoolExecutor,其 UML 图如下图所示
在该 CoreThreadPoolExecutor 中设置一些基础的配置,如拒绝策略等配置,方案实现代码如下。
public class CoreThreadPoolExecutor extends ThreadPoolExecutor {
public CoreThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
BlockingQueue<Runnable> queue,
CoreThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
queue, threadFactory);
setRejectedExecutionHandler(sRejectedExecutionHandler);
}
public void execute(Runnable command) {
super.execute(command);
}
public Future<?> submit(@NotNull Runnable task) {
super.submit(task);
}
……
}
我们接着来实现 CPU 线程池, CPU 线程池可以命名为 CpuThreadPoolExecutor,也可以命名为 FixedThreadPoolExecutor,这两个类名都能反应出其线程池的特性,笔者这里使用 CpuThreadPoolExecutor 作为名称,该对象中需要提供静态方法 getThreadPool 用于创建或获取 CPU 线程池,对于 CPU 线程池,我们可以将优先级设置的高一些,比如 Process.THREAD_PRIORITY_DISPLAY 级别,方案实现代码如下。
class CPUThreadPoolExecutor extends CoreThreadPoolExecutor {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
protected static final int CORE_POOL_SIZE = CPU_COUNT;
protected static final int MAX_POOL_SIZE = CPU_COUNT;
private static final int BLOCK_QUEUE_CAPACITY = 512;
private static ThreadPoolExecutor coreCPUThreadPoolExecutor;
private CoreCPUThreadPoolExecutor(BlockingQueue<Runnable> blockingQueue,
CoreThreadFactory threadFactory) {
super(CORE_POOL_SIZE,
MAX_POOL_SIZE,
0,
blockingQueue,
threadFactory);
}
public static ThreadPoolExecutor getThreadPool() {
if(coreCPUThreadPoolExecutor == null){
synchronized (CPUThreadPoolExecutor.class) {
if (coreCPUThreadPoolExecutor == null) {
coreCPUThreadPoolExecutor = new CPUThreadPoolExecutor(
new LinkedBlockingDeque<Runnable>(BLOCK_QUEUE_CAPACITY),
new CoreThreadFactory("CPU",
Process.THREAD_PRIORITY_DISPLAY));
}
}
}
return coreCPUThreadPoolExecutor;
}
}
-
IO 线程池
当系统在进行 IO 操作时,会交给 DMA(Direct Memory Access,直接存储器访问)硬件来处理,因此不需要通过 CPU 便能进行数据的传输,所以 IO 任务对 CPU 的资源消耗是很少的。对于 IO 线程池来说,由于 IO 任务对 CPU 资源消耗不高,所以每来一个 IO 任务便可以直接交由一个独立的线程去执行,并不需要放入缓存队列中,这样可以保证每个 IO 任务都能及时响应,如果多个 IO 任务复用同一个线程,那么当某个 IO 任务阻塞线程时,会导致其他的 IO 任务也无法执行。了解了这一特性,我们来看看 IO 线程池的入参要如何设置。
-
corePoolSize 核心线程数
- IO 线程池的 corePoolSize 核心线程数没有定性规定,它和我们应用程序的业务场景有关。如果 IO 任务比较多,就得设置得多一些,因为太少了就会因为 IO 线程频率创建和销毁而产生性能损耗。如果业务场景中的 IO 任务不多,直接设置为 0 个也没问题,通过 Exectors 创建出来的 IO 线程池的核心线程数量就是为 0 个。
-
maximumPoolSize 最大线程数
- IO 任务实际上消耗的 CPU 资源是非常少的,当需要读写数据的时候,系统会交给 DMA 芯片去进行操作,此时调度器就会让当前线程进行休眠,并且把 CPU 资源切换给其他的线程去使用。所以对于 IO 线程池中 maximumPoolSize 最大线程数,可以多设置一些,确保每个 IO 任务都能有一个对应的线程来执行,这样可以保障 IO 任务能尽快得到执行。一般来说,中小型应用设置几十个线程数就足够了,即使是大型应用,也不建议将数量设置得特别大,比如通过 Exectors 创建出来的 IO 线程池的最大线程数就是无限大的,这样会导致当出现死循环等异常时,IO 线程池中的任务无法进入到拒绝策略。
-
缓存队列
- 对于 IO 线程池来说,是不需要缓存任务的,因为每来一个任务,线程池都会启用一个独立的线程去执行这个任务,所以对于 IO 线程池来说,一般都是传入 SynchronousQueue 这个容量为 0 的队列。
-
keepAliveTime 线程存活时间
- 非核心线程的存活时间也需要根据业务场景来进行决定,如果业务时很频繁的出现大量 IO 的场景,我们可以将存活时间设置的长一些,如果是低频的大量 IO 场景,可以将存活时间设置的短一些,这样可以减少无用线程对内存资源的消耗。
-
异常兜底策略
- IO 线程池的异常兜底的策略可以和 CPU 线程池的一样,将异常上报,然后将进入到兜底的任务用一个兜底的线程去执行即可。
根据上面的入参配置,IO线程池的创建代码如下。
class IOThreadPoolExecutor extends CoreThreadPoolExecutor {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = 1;
private static final int MAX_POOL_SIZE = 64;
private static final int KEEP_ALIVE_TIME = 30;
private static ThreadPoolExecutor coreIOThreadPoolExecutor;
private IOThreadPoolExecutor(BlockingQueue<Runnable> blockingQueue,
CoreThreadFactory threadFactory) {
super(CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
blockingQueue,
threadFactory);
}
public static CoreThreadPoolExecutor getThreadPool() {
if(coreIOThreadPoolExecutor == null){
synchronized (IOThreadPoolExecutor.class) {
if (coreIOThreadPoolExecutor == null) {
coreIOThreadPoolExecutor = new CoreIOThreadPoolExecutor(
new SynchronousQueue<Runnable>(),
new CoreThreadFactory("IO",
Process.THREAD_PRIORITY_LESS_FAVORABLE));
}
}
}
return coreIOThreadPoolExecutor;
}
}
四、线程池监控
当我们创建完合适的线程池后,还能进一步的完善线程池的功能,比如可以监控运行在线程池中的任务的耗时,如果耗时超过设置的阈值,则可以通过日志输出或者当做异常上报,这样的能力对于合理的使用线程池有很大的帮助。
-
任务耗时监控
要监控任务的耗时,我们只需要对 Runnable 进行封装即可,通过在前面我们定义的 CoreThreadPoolExecutor 的公共积累的,可以对 execute,submit 等方法里面的 Runnable 进行封装,代码如下所示。
public class CoreThreadPoolExecutor extends ThreadPoolExecutor{
……
public void execute(Runnable command) {
Runnable newTask = new CoreTask(command, this);
super.execute(newTask);
}
}
在封装对象 CoreTask 中,我们可以进一步根据线程池的类型,设置耗时阈值,对于超过耗时阈值的任务进程日志打印和上报,方案代码实现如下所示,在展示的代码中,笔者将 CPU 线程池的超时阈值设置成了 1 秒, IO 线程池的超时阈值设置成了 8 秒,在实际开发者,我们需要根据业务场景的特性去设置线程池任务的超时阈值。
public class CoreTask implements Runnable{
private final CoreThreadPoolExecutor mExecutor;
protected final Runnable mCommand;
public CoreTask(@NonNull Runnable r, @Nullable ICoreThreadPool executor) {
mExecutor = executor;
mCommand = r;
}
@Override
public void run() {
long mTaskBeginExecTime = SystemClock.uptimeMillis();
try {
mCommand.run();
} finally {
runTime = SystemClock.uptimeMillis() - mTaskBeginExecTime;
boolean bTaskOverLimit = false;
if (mExecutor != null) {
//根据不同的线程池类型,设置超时阈值。
if (mExecutor instanceof CPUThreadPoolExecutor) {
if (1000 < runtime) {
bTaskOverLimit = true;
}
} else if (mExecutor instanceof IOThreadPoolExecutor) {
if (8000 < runtime) {
bTaskOverLimit = true;
}
}
}
if (bTaskOverLimit) {
Log.w(TAG, poolName + ", taskname: " + orgTaskName +
", dispatchtime & runtime is(ms) " +
dispatchTime + ", " + runtime +
"maxQueueWaitTime & MaxRunTime is " +
maxQueueWaitTimeMS + ", " + maxRunTimeMS);
//根据采样率判断是否需要上报
ifNeedReport(……)
}
}
}
}
-
任务死锁监控
我们已经监控了线程池中的高耗时任务,除了高耗时任务外,死锁任务也是对线程池性能影响很大的一个因素,当任务发生死锁时,会导致任务长时间无法退出,此时会导致该线程不可用,而且死锁发生时,往往不会是一个线程在获取锁,而是会有多个线程因为无法获取锁而处于不可用状态,对于线程数量有限的线程池来说,特别是只有只有数个线程的 CPU 线程池线程,没有足够的线程来进行任务执行必然会对性能有较大的影响。
任务发生死锁时,这个任务并不会退出,所以没法这个任务的耗时来判断该任务是否发生死锁了,此时可以换个思路来进行死锁的判断,我们可以在任务开始前将任务名放入一个容器中,然后在任务结束时从容器中移除该任务,对于容器中长时间没有被移除的任务名,便可以判断该任务发生了死锁。基于这个思路,笔者下面详细介绍一下方案实现:
1)锁死监控的关键步骤是在任务开始执行前,把任务名放入到容器中,这里可以以 Map 做为容器,键(key) 为任务名,值(value)为时间戳,基于设计规范考虑,该 Map 容器可以放入到线程池的 CoreThreadPoolExecutor 基类中,并且 CoreThreadPoolExecutor 去继承定义有 addTaskRecord 和 removeTaskRecord 方法的抽象接口。在这两个接口实现中,将任务名和时间戳进行放入和移除操作。代码实现如下。
public interface ICoreThreadPool {
String getThreadPoolName();
void addTaskRecord(String taskName,int taskHash);
void removeTaskRecord(String taskName,int taskHash);
}
public class CoreThreadPoolExecutor extends ThreadPoolExecutor
implements ICoreThreadPool {
……
private HashMap<String, Long> mTaskMap = new HashMap<>();
@Override
public void addTaskRecord(String taskName, int taskHash) {
if (CoreThreadPool.getCoreThreadPoolServiceSwitch()) {
synchronized (mTaskMap) {
mTaskMap.put(taskName + "#" + taskHash, System.currentTimeMillis());
}
}
}
@Override
public void removeTaskRecord(String taskName, int taskHash) {
long taskBeginTime = 0;
synchronized (mTaskMap) {
if(mTaskMap.get(taskName + "#" + taskHash) == null){
return;
}
taskBeginTime = mTaskMap.remove(taskName + "#" + taskHash);
}
}
}
2)在我们自定义的 CoreTask 中,已经将线程池对象传入进入了,由于线程池实现了 ICoreThreadPool 接口,因此就可以在任务执行前后直接使用 addTaskRecord 和 removeTaskRecord 用于任务名的记录和异常操作,代码实现如下。
public class CoreTask implements Runnable{
private final CoreThreadPoolExecutor mExecutor;
protected final Runnable mCommand;
public CoreTask(@NonNull Runnable r, @Nullable ICoreThreadPool executor) {
mExecutor = executor;
mCommand = r;
}
@Override
public void run() {
long mTaskBeginExecTime = SystemClock.uptimeMillis();
try {
……
//将任务添加到集合中
mExecutor.addTaskRecord(mCommand.getClass().toString(), hashCode());
mCommand.run();
} finally {
……
//将任务从集合中异常
mExecutor.removeTaskRecord(mCommand.getClass().toString(), hashCode());
}
}
}
3)最后,我们还需要一个线程专门去监控 Map 容器中是否有长时间没有移除的任务,因此可以通过周期任务线程池来进行监控,频率可以根据业务来调整,这里设置的是 10 秒钟一次,在检测容器中的任务时,如果发现有超过 30 秒未执行完的任务,便认为该任务发生了死锁。代码实现如下。
//启动周期性任务
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay.
scheduleWithFixedDelay(new TestTask(), 10, 10, TimeUnit.SECONDS);
public static class CheckLockedTask implements Runnable {
@Override
public void run() {
ICoreThreadPool cpuThreadPool = CoreCPUThreadPoolExecutor.getThreadPool();
synchronized (cpuThreadPool.getRunningTaskMap()) {
checkLongRunTask(cpuThreadPool.getRunningTaskMap(),
cpuThreadPool.getThreadPoolName(), 30*1000);
}
ICoreThreadPool ioThreadPool = CoreIOThreadPoolExecutor.getThreadPool()
synchronized (ioThreadPool.getRunningTaskMap()) {
checkLongRunTask(ioThreadPool.getRunningTaskMap(),
ioThreadPool.getThreadPoolName(), 30*1000);
}
}
}
private static void checkLongRunTask(HashMap<String, Long> taskMap,
String threadPoolName, int maxTime) {
for (Map.Entry<String, Long> entry : taskMap.entrySet()) {
if (System.currentTimeMillis() - entry.getValue() > maxTime) {
//key由 taskname#hashcode 拼接
String taskName = entry.getKey().split("#")[0];
long runningTime = System.currentTimeMillis() - entry.getValue();
//打印异常或进行上报
Log.w(TAG, threadPoolName + ", taskname: " +
taskName + "run time over " + runningTime);
}
}
}
转载自:https://juejin.cn/post/7368690450898173952