线程池究竟是怎么发挥作用的
线程池这个概念在整个计算机体系中都起着至关重要的作用。
介绍
在计算机编程中,线程池是一种用于程序并发执行的软件设计模式。在线程池中会维护多个线程(等待被分配),以此来避免频繁地创建、销毁线程(执行短暂任务)而导致的延迟增加和性能下降。而在Java
中也存在着一个标准的线程池工具类,即ThreadPoolExecutor
。下面,我们将看一下官方文档是如何描述ThreadPoolExecutor
的(JDK 1.8
)。
-
ThreadPoolExecutor
是JDK
中继承自ExecutorService
(接口)的一种线程池工具类,对于每次提交的任务它都会使用线程池中的线程来执行。 -
ThreadPoolExecutor
(泛指线程池)主要能帮我们解决如下2
个问题:- 提升性能,在执行大量异步任务时减少对每个任务调用的负载(创建、销毁等)。
- 提供限制和管理资源(包括线程)的方法,例如在执行一组任务时限制和管理其中最大执行任务数。
-
ThreadPoolExecutor
提供了一系列的配置参数让我们可以对线程池进行灵活地调整:-
Core and maximum pool sizes
:ThreadPoolExecutor
将会根据corePoolSize
参数(通过getCorePoolSize
方法查看)和maximumPoolSize
参数(通过getMaximumPoolSize
方法查看)对其线程池大小(通过getPoolSize
方法查看)进行自动调整。在一个新任务提交到
execute
方法时:- 如果正在运行的线程数小于
corePoolSize
,则会创建一个新的线程去处理这个请求(即使存在其他工作线程处于空闲状态)。 - 如果正在运行的线程数大于
corePoolSize
但小于maximumPoolSize
,并且任务队列未饱和(未满),则会将任务添加到队列中进行排队(如无空闲线程)。 - 如果正在运行的线程数大于
corePoolSize
但小于maximumPoolSize
,并且任务队列已饱和(已满),则会创建一个新线程去处理这个请求(如无空闲线程)。 - 如果正在运行的线程数大于等于
maximumPoolSize
,并且任务队列已饱和(已满),则会执行拒绝策略(如无空闲线程)。
而通过对于
corePoolSize
参数和maximumPoolSize
参数的设置,我们可以实现不同策略的线程池:- 将
corePoolSize
参数和maximumPoolSize
参数设置为相同值,则可以创建固定大小的线程池。 - 将
maximumPoolSize
参数设置为无边界的值(例如:Integer.MAX_VALUE
),则可以创建容纳任意数量线程(并发执行任意数量的任务)的线程池。
另外,对于
core
参数和maximum
参数一般只会在构造方法中进行设置,但也可以通过setCorePoolSize
方法和setMaximumPoolSize
方法进行动态变更。 - 如果正在运行的线程数小于
-
On-demand construction
:默认情况下,core
线程只有在新任务到达时才会被创建和启动。我们可以使用prestartCoreThread
方法或prestartAllCoreThreads
方法对这种行为进行动态修改(覆盖)。例如,当需要使用非空队列来构造线程池时,则可能用到预启动线程策略。 -
Creating new threads
:ThreadPoolExecutor
是通过使用ThreadFactory
来创建一个新的线程,默认情况下(如无特别指定)它将会使用Executors#defaultThreadFactory
(默认线程工厂)。其中,通过使用默认线程工厂创建出的线程具有相同的线程组ThreadGroup
和相同的优先级NORM_PRIORITY
,并且都是非后台状态。也就是说,我们可以通过使用不同的线程工厂ThreadFactory
来修改线程的名字、线程组、优先级及其后台状态等。 -
Keep-alive times
:如果线程池当前具有多于corePoolSize
个线程,对于这些超出corePoolSize
的线程则会在其空闲时间超过keepAliveTime
时将其终止,这也提供了一种方式让线程池在没有活跃被使用的情况下减少资源的消耗(如果之后线程池再次变得活跃则将重新创建线程)。其中,keepAliveTime
参数可以通过setKeepAliveTime(long, TimeUnit)
方法进行动态变更,如果使用setKeepAliveTime(LONG.MAX_VALUE,TimeUnit#NANOSECONDS)
的参数值调用可以有效地禁止空闲线程在关闭之前被终止。默认情况下,
keep-alive
策略只应用于非core
线程(线程数大于corePoolSize
的线程),如果想让core
线程也应用这个超时策略则可以通过调用allowCoreThreadTimeOut(boolean)
方法来实现。 -
Queuing
:当正在运行的线程大于corePoolSize
但小于maximumPoolSize
时就会将任务添加到队列中进行排队,其中我们可以使用任意的BlockingQueue
队列来进行(被提交)任务的传输和保存(即,任何BlockingQueue
都可以用来传输和保存提交的任务)。关于此队列用法与线程池大小的交互规则:
- 如果小于
corePoolSize
个线程正在运行,执行器总是偏向新建线程而不是去排队。 - 如果多于
corePoolSize
个线程正在运行,执行器总是偏向排队请求而不是新建线程。 - 如果多于
corePoolSize
个线程正在运行,并且请求不能被排队(队列已满),则会再次创建新线程执行任务。但,如果此时线程数已超过maximumPoolSize
,则会执行拒绝策略(任务被拒绝提交)。
一般存在三种排队的策略:
Direct handoffs
:Direct handoffs
策略,即直接交接策略,一般可通过SynchronousQueue
队列来实现。Direct handoffs
策略只会将任务传递给没有持有其他任务的线程。如果没有线程可以立即运行任务,后续把任务提交到队列中排队的操作将会失败,此时将会构造一个新的线程进行处理。通过这种策略,在处理一组可能存在内部依赖的请求时可以避免被锁定。一般情况下,Direct handoffs
策略需要与无界线程池(无边界的maximumPoolSizes
)联合使用以避免新提交的任务被拒绝,但这也可能会因为请求(不断地)以平均快于其处理速度的速度到达而导致线程无限增长。Unbounded queues
:Unbounded queues
策略,即无界策略,一般可通过LinkedBlockingQueue
无界队列来实现。在Unbounded queues
策略下,创建的线程数是不会超过corePoolSize
的,因为只有在core
线程都处于忙碌状态并且队列已满(无界队列并不符合)的情况下才会再次创建线程(换句话说,任何maximumPoolSize
值对此都不会有任何影响)。虽然这种排队方式有助于消除瞬时的请求突发,但这也可能会因为请求(不断地)以平均快于其处理速度的速度到达而导致线程无限增长。Bounded queues
:Bounded queues
策略,即有界策略,一般可通过ArrayBlockingQueue
有界队列来实现。Bounded queues
策略在maximumPoolSizes
有限的情况下可以帮助线程池阻止其资源耗尽,但这也意味着需要对其进行一定程度地调节和控制,例如需要对线程池最大线程数大小和队列大小进行互相权衡:- 使用大队列和小线程池可以最小化
CPU
的使用、OS
资源和上下文切换的负载,但会(人工的)导致吞吐量地下降。 - 使用小队列通常需要大线程池,它可以保持
CPU
忙碌,但如果遇到比较大的调度开销,这也会导致吞吐量下降。
- 使用大队列和小线程池可以最小化
- 如果小于
-
Rejected tasks
:当执行器被关闭后通过execute
方法提交的新任务将会被拒绝。另外,当执行器使用有限的maximum
线程数和有限的工作队列,并都处于饱和状态时,那么也会拒绝新任务的提交。在这些情况下,方法execute
将会调用RejectedExecutionHandler#rejectedExecution
方法进行处理。JDK
提供了四种预定义的处理器策略:拒绝策略 描述 AbortPolicy
(默认)在拒绝时,处理器抛出运行时异常 RejectedExecutionException
。CallerRunsPolicy
在拒绝时,把任务交由调用 execute
的线程自己执行(这提供一个简单的反馈控制机制,同时也将降低新任务被提交的速率)。另外,如果线程池被关闭则此策略不生效,直接将提交的任务丢弃。DiscardPolicy
在拒绝时,将不能被执行的任务直接丢弃。 DiscardOldestPolicy
在拒绝时,将队头的任务丢弃,然后再次执行提交的任务(重试),如再次失败则再次执行重试。另外,如果线程池被关闭则此策略不生效,直接将提交的任务丢弃。 如果这些策略都不符合需求的话,也可以通过实现
RejectedExecutionHandler
来自定义拒绝策略。 -
Hook methods
:ThreadPoolExecutor
提供了一些可覆盖的protected
方法,一般可以使用它们来控制执行环境(例如,重新初始化ThreadLocal
)、收集数据统计或添加访问日志等。例如,beforeExecute
方法和afterExecute
方法在执行每个任务前后被调用;terminated
方法在执行器完全终止时被调用。 -
Queue maintenance
:ThreadPoolExecutor
提供了getQueue()
方法让我们可以对工作队列进行访问,其主要是用于监控和调试(强烈不推荐用作其他目的)。另外,当队列中存在大量的任务被取消时,我们可以通过ThreadPoolExecutor#remove
方法和ThreadPoolExecutor#purge
方法进行协助存储回收。 -
Finalization
:一个不再被引用并且没有剩余线程的线程池将会被自动关闭。如果想确保即使用户忘记调用shutdown
也能回收未被引用的线程池,那么必须通过设置适当的keep-alive
让没有被使用的线程最终被销毁。另外,我们也可以使用0
作为core
线程的下限和/或
设置allowCoreThreadTimeOut
来使得core
线程也能最终被销毁。
除此之外,
JDK
官方为了让我们可以更轻易地使用线程池,在Executors
工厂类中预配置了一些常见使用场景的线程池,我们可以通过Executors
中相应的工厂方法进行获取 (JDK
推荐):Executors#newCachedThreadPool
:无边界的线程池(具有自动回收线程功能)Executors#newFixedThreadPool
: 固定大小的线程池Executors#newSingleThreadExecutor
:只有一个线程的线程池
-
用法
通过上一小节我们对ThreadPoolExecutor
的概念和用法应该都有一定的了解了,下面我们将从方法调用的角度对其用法进一步分析。
构造
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {...}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {...}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {...}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
首先,在使用ThreadPoolExecutor
前需要对其实例进行构造,在ThreadPoolExecutor
中提供了4
种具有不同参数的构造方法,如存在空缺的构造参数则会使用默认参数来代替。下面将总结出其中涉及的构造参数(结合上文参数说明):
参数 | 说明 |
---|---|
int corePoolSize | 核心线程数。在提交任务时,如果线程数小于corePoolSize 则会创建新的线程进行处理(即使存在其他线程处于空闲状态);如果线程数大于corePoolSize 则把任务存储在队列中(如果队列满了会再去创建线程进行处理)。 |
int maximumPoolSize | 最大线程数。在提交任务时,如果线程数大于maximumPoolSize 且队列存储已满,则提交失败并执行拒绝策略。 |
long keepAliveTime | 空闲线程的最长保持时间。如果线程空闲时间超过了keepAliveTime 则会进行回收,默认情况下core 线程是不会进行被回收的,但我们可通过allowCoreThreadTimeOut 方法进行设置。 |
TimeUnit unit | keepAliveTime 的时间单位。 |
BlockingQueue<Runnable> workQueue | 任务存储队列。当线程数大于corePoolSize 时则会把任务存储在队列中,而不是创建新线程。 |
ThreadFactory threadFactory | 创建线程的工厂。通过工厂方法可指定线程的名字、线程组、优先级和后台状态等。 |
RejectedExecutionHandler handler | 任务拒绝策略。在任务提交时,如果线程池已关闭,或者线程数已经到达maximumPoolSize 且队列已满,则会执行拒绝策略。 |
除了ThreadPoolExecutor
的4
个构造方法,JDK
也在Executors
工厂类中提供了最常用的线程池配置的工厂方法。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
在
Executors
工厂类中,还存在与上述方法相对应且带ThreadFactory
参数的工厂方法(具有相同作用的多态工厂方法),作用是替换其默认的ThreadFactory
,在这里就不继续展开讲了。
方法 | 说明 |
---|---|
newFixedThreadPool | 创建一个基于无界队列且具有固定线程数量的线程池。 |
newCachedThreadPool | 创建一个可无限创建线程的线程池。每个线程在空闲状态下只保留60 秒,超过则会自动回收(即60 秒内可复用)。 |
newSingleThreadExecutor | 创建一个基于无界队列且只有一个线程的线程池。与此同时,它保证了被提交的任务的顺序性(顺序执行)和任何时候不会多于一个任务处于激活状态。另外,与其等价的线程池相比(例如,newFixedThreadPool(1) ),newSingleThreadExecutor 是不能重新配置线程数量的。 |
注意,
Executors
并不只有这些构造工厂方法,只是这些是比较经典的,如有兴趣可以再深入研究。
参数控制
在实例构造后,我们可以通过对应参数的getter
/setter
方法对它们进行访问和修改。
public void setThreadFactory(ThreadFactory threadFactory) {...}
public ThreadFactory getThreadFactory() {...}
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {...}
public RejectedExecutionHandler getRejectedExecutionHandler() {...}
public void setCorePoolSize(int corePoolSize) {...}
public int getCorePoolSize() {...}
public boolean prestartCoreThread() {...}
public int prestartAllCoreThreads() {...}
public boolean allowsCoreThreadTimeOut() {...}
public void allowCoreThreadTimeOut(boolean value) {...}
public void setMaximumPoolSize(int maximumPoolSize) {...}
public int getMaximumPoolSize() {...}
public void setKeepAliveTime(long time, TimeUnit unit) {...}
public long getKeepAliveTime(TimeUnit unit) {...}
方法 | 说明 |
---|---|
getThreadFactory setThreadFactory | 获取/设置用于创建线程的线程工厂。 |
getRejectedExecutionHandler setRejectedExecutionHandler | 获取/设置用于执行拒绝策略的处理器。 |
getCorePoolSize setCorePoolSize | 获取/设置core 线程数(可覆盖构造时的设置)。如果设置的新值小于当前值,则超过的线程会在下一次空闲时被终止;如果设置的新值大于当前值,则将会开始执行等待队列中的任务(如有)。 |
prestartCoreThread prestartAllCoreThreads | 预启动一个/所有core 线程,默认情况下只有当新任务执行时才会启动core 线程。 |
allowsCoreThreadTimeOut | 获取/设置在keep-alive 时间内core 线程是否允许超时终止。 |
getMaximumPoolSize setMaximumPoolSize | 获取/设置线程池最大线程数(可覆盖构造时的设置)。如果设置的新值小于当前值,则超出的线程会在空闲状态时被终止。 |
getKeepAliveTime setKeepAliveTime | 获取/设置线程池中线程的最大空闲时间,默认情况下只作用于非core 线程。 |
执行与终止
public void execute(Runnable command) {...}
public void shutdown() {...}
public List<Runnable> shutdownNow() {...}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {...}
在构造完ThreadPoolExecutor
后,我们就可以进行任务地执行和终止了。
方法 | 说明 |
---|---|
execute | 用于执行传入的任务。该任务可能在新的线程上执行,也有可能在(已存在)线程池中的线程上执行。如果线程池处于关闭状态或饱和状态,则不能被提交成功(执行拒绝策略)。 |
shutdown | 用于有序关闭线程池。在执行方法后将不会再接收新的任务,而对于在关闭之前已提交的任务可以继续执行,但并不会等待其执行完成。对于需要等待执行完成的情况,可以使用awaitTermination 方法。 |
shutdownNow | 用于立即(暴力)关闭线程池。在执行方法后,它会试图关闭所有正在执行的任务,停止对正在等待的任务(等待队列中的任务)的处理并将它们作为结果返回。同样地,此方法并不会等待正在执行的任务去执行终止操作,而对于需要等待执行终止的情况,可以使用awaitTermination 方法。另外需要注意,此方法并不保证一定能停止正在执行的任务,因为在ThreadPoolExecutor 中是通过Thread#interrupt 来实现任务的取消,所以如果任务响应中断失败则永远不会被终止。 |
awaitTermination | 用于等待线程池被终止。调用此方法会使调用者进入阻塞状态,直到所有任务都已经执行完成,或出现超时,或当前线程发生中断。如果所有任务都执行完成,线程池可以被终止,则返回true ;而如果在线程池可以被终止前发生等待超时,则返回false (一般会在发起线程池关闭请求后调用此方法)。 |
除此之外,ThreadPoolExecutor
还从其父类中继承几个更加灵活的任务执行方法,如下所示:
public Future<?> submit(Runnable task) {...}
public <T> Future<T> submit(Runnable task, T result) {...}
public <T> Future<T> submit(Callable<T> task) {...}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {...}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {...}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {...}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {...}
方法 | 说明 |
---|---|
submit(Runnable task) | 用于执行传入的任务,并返回一个表示任务执行结果的Future 。如果执行成功(已完成),则Future#get 将返回null 。 |
submit(Runnable task, T result) | 用于执行传入的任务,并返回一个表示任务执行结果的Future 。如果执行成功(已完成),则Future#get 将返回(参数传入的)result 。 |
submit(Callable<T> task) | 用于执行传入的任务,并返回一个表示任务执行结果的Future 。如果执行成功(已完成),则Future#get 将返回任务执行的结果。 |
invokeAny(Collection<? extends Callable<T>> tasks) | 用于执行传入的任务集合,并返回任意一个已经成功完成的结果(没有抛出异常的情况)。而在正常或异常返回后,其他还没完成的任务将被取消。 |
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 用于执行传入的任务集合,并在给定时间内返回任意一个已经成功完成的结果(没有抛出异常的情况)。而在正常或异常返回后,其他还没完成的任务将被取消。 |
invokeAll(Collection<? extends Callable<T>> tasks) | 用于执行传入的任务集合,并在所有任务完成后返回一组Future 列表(表示任务执行结果的)。其中,任务“完成”可以是正常执行完成或者抛出异常。 |
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 用于执行传入的任务集合,并在所有任务完成或者等待超时后返回一组Future 列表(表示任务执行结果的)。其中,任务“完成”可以是正常执行完成或者抛出异常。 |
监控与调试
/* User-level queue utilities */
public BlockingQueue<Runnable> getQueue() {...}
public boolean remove(Runnable task) {...}
public void purge() {...}
在对ThreadPoolExecutor
进行构造和动态配置后,我们可以通过它的一些方法进行监控和调试。
方法 | 说明 |
---|---|
getQueue | 获取执行器所使用的任务队列。 |
remove | 从工作队列中移除任务,如果添加到任务队列之前已被转换为其他形式的任务则会导致remove 执行失败,例如使用submit 方法将任务形式转换为Future (可通过purge 方法进行处理)。 |
purge | 从工作队列中移除所有已被取消的Future 任务,如果Future 任务存在其他线程引用则会导致purge 执行失败。 |
数据统计
/* Statistics */
public int getPoolSize() {...}
public int getActiveCount() {...}
public int getLargestPoolSize() {...}
public long getTaskCount() {...}
public long getCompletedTaskCount() {...}
最后,ThreadPoolExecutor
还提供了一些有关线程池数据的统计方法。
方法 | 说明 |
---|---|
getPoolSize | 获取当前线程池的线程数量。 |
getActiveCount | 获取当前线程池正在执行任务的线程数量(大约)。 |
getLargestPoolSize | 获取当前线程池曾经出现过的最大线程数。 |
getTaskCount | 获取当前线程池已经被调度执行的任务总数量(大约)。 |
getCompletedTaskCount | 获取当前线程池已经被执行完成的任务总数量(大约)。 |
扩展
如何计算线程池的合理大小?
通过执行的任务可以把线程池分为两种类型,分别是CPU
密集型和IO
密集型。对于不同类型的线程池有不同的分析和计算方式:
-
对于
CPU
密集型的任务,在N
个处理器的系统上,当线程池的大小设置为N+1
时,通常能实现最优的利用率(对于额外的线程可以在线程池中某个线程由于某种原因被暂停时发挥作用,以至于CPU
时钟周期不会被浪费)。 -
对于
IO
密集型的任务,由于线程不会一直在执行,因此线程池相比于CPU
密集型规模应该更大。而对于这种类型的线程池可以通过以下公式进行计算:线程池大小 = N * U * (1`+ W/C) 其中: N = CPU核数, U = CPU利用率, 0 <= U <= 1 ,W/C = 等待时间 / 计算时间
从上述公式可得出如果任务等待时间越长,线程池应该设置的越大,以至于更好的利用CPU
,因为等待时间越长CPU
的空闲时间就越长。其中,对于当前设备的CPU
核数可通过Runtime.getRuntime().availableProcessors()
方法获得。
此处线程池大小的计算方法是源自《Java并发编程实践》的,其中
IO
密集型的计算方式由于过于理论化存在一定的争议。对此,在美团技术团队的博客《Java线程池实现原理及其在美团业务中的实践》中也提出了质疑,并提供了最终的解决方案,有兴趣的读者可以进一步研究。
如何选择线程池的阻塞队列?
对于具有优先级的任务可以选择PriorityBlockingQueue
队列来进行处理,不过需要注意如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。而对没有优先级要求的任务建议使用有界队列,因为有界队列能增加系统的稳定性和预警能力,可以根据需求设置相对大一点,比如几千。而如果设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用。
对于常见的阻塞队列可以有以下几种:
ArrayBlockingQueue
: 基于数组的有界阻塞队列,队列按照FIFO
(先进先出)进行排序。LinkedBlockingQueue
: 基于链表的阻塞队列(有界/无界),队列按照FIFO
(先进先出)进行排序,吞吐量通常要高于ArrayBlockingQueue
。SynchronousQueue
: 无存储元素的阻塞队列,每次只能传递一个元素,吞吐量通常要高于LinkedBlockingQueue
。PriorityBlockingQueue
: 具有优先级的无界阻塞队列。
实现原理
通过上文对ThreadPoolExecutor
概念和用法的阐述,我们应该已经对ThreadPoolExecutor
的配置和使用都十分熟悉了。这里我们将从源码的角度对ThreadPoolExecutor
的实现原理进一步分析。
ThreadPoolExecutor
的实现中存在一条从上至下的继承链,在继承链的最顶层是最核心最抽象的Executor
,其仅负责做一件事,即执行任务(execute
)。而随着继承链的向下延伸,不断地会有一些功能特性添加进来直至形成最终的ThreadPoolExecutor
。下文笔者将以同样的顺序从最核心最抽象的Executor
开始顺着继承链路往下一步步进行分析,直至ThreadPoolExecutor
。
在
ThreadPoolExecutor
的实现中存在一条从上至下的继承链,即Executor <= ExecutorService <= AbstractExecutorService <= ThreadPoolExecutor
Executor
/**
* An object that executes submitted {@link Runnable} tasks. This
* interface provides a way of decoupling task submission from the
* mechanics of how each task will be run, including details of thread
* use, scheduling, etc.
*
* ...
*
* The {@code Executor} implementations provided in this package
* implement {@link ExecutorService}, which is a more extensive
* interface. The {@link ThreadPoolExecutor} class provides an
* extensible thread pool implementation. The {@link Executors} class
* provides convenient factory methods for these Executors.
*
* ...
*
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
* ...
*/
void execute(Runnable command);
}
Executor
接口是线程池执行任务的抽象,它仅仅定义了一个方法,即execute(Runnable command)
,在其实现中可以通过新的线程执行command
任务、可以通过线程池中的线程执行command
任务、也可以通过调用者线程执行command
任务,这取决于其具体的实现(Executor
接口并不是严格需要是异步执行的)。即:
-
通过调用者线程执行
command
任务public class DirectExecutor implements Executor { public void execute(Runnable command) { command.run(); } }
-
通过新的线程执行
command
任务public class ThreadPerTaskExecutor implements Executor { public void execute(Runnable command) { new Thread(command).start(); } }
-
通过自定义有序地执行
command
任务(具有顺序限制)public class SerialExecutor implements Executor { final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable command) { tasks.offer(new Runnable() { public void run() { try { command.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
在完成Executor
的定义后,我们就可以直接使用execute
方法执行任务,而无需每个任务都通过new Thread(new RunnableTask()).start()
的方式进行创建并执行线程(通常被用来代替显式地创建线程),即:
Executor executor = ...;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
Executor
作为线程池最顶级的父类它已经定义了关于线程池最核心最基础的方法execute(Runnable command)
,通过这种方式可以让任务提交从任务执行中解耦出来,包括线程使用细节、调度等。
ExecutorService
/**
* An {@link Executor} that provides methods to manage termination and
* methods that can produce a {@link Future} for tracking progress of
* one or more asynchronous tasks.
*
* <p>An {@code ExecutorService} can be shut down, which will cause
* it to reject new tasks. Two different methods are provided for
* shutting down an {@code ExecutorService}. The {@link #shutdown}
* method will allow previously submitted tasks to execute before
* terminating, while the {@link #shutdownNow} method prevents waiting
* tasks from starting and attempts to stop currently executing tasks.
* Upon termination, an executor has no tasks actively executing, no
* tasks awaiting execution, and no new tasks can be submitted. An
* unused {@code ExecutorService} should be shut down to allow
* reclamation of its resources.
*
* <p>Method {@code submit} extends base method {@link
* Executor#execute(Runnable)} by creating and returning a {@link Future}
* that can be used to cancel execution and/or wait for completion.
* Methods {@code invokeAny} and {@code invokeAll} perform the most
* commonly useful forms of bulk execution, executing a collection of
* tasks and then waiting for at least one, or all, to
* complete. (Class {@link ExecutorCompletionService} can be used to
* write customized variants of these methods.)
* ...
*/
public interface ExecutorService extends Executor {
/** 相关注释可阅读上文 **/
void shutdown();
/** 相关注释可阅读上文 **/
List<Runnable> shutdownNow();
/**
* Returns {@code true} if this executor has been shut down.
*/
boolean isShutdown();
/**
* Returns {@code true} if all tasks have completed following shut down.
* Note that {@code isTerminated} is never {@code true} unless
* either {@code shutdown} or {@code shutdownNow} was called first.
*/
boolean isTerminated();
/** 相关注释可阅读上文 **/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/** 相关注释可阅读上文 **/
<T> Future<T> submit(Callable<T> task);
/** 相关注释可阅读上文 **/
<T> Future<T> submit(Runnable task, T result);
/** 相关注释可阅读上文 **/
Future<?> submit(Runnable task);
/** 相关注释可阅读上文 **/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/** 相关注释可阅读上文 **/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException;
/** 相关注释可阅读上文 **/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/** 相关注释可阅读上文 **/
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService
继承自Executor
,它在Executor
的基础上添加对线程池终止和对异步任务进度追踪(通过Future
实现)的能力。即:
ExecutorService
定义了shutdown
方法和shutdownNow
方法来执行线程池的终止(线程池被终止后,将会拒绝新任务的提交)。ExecutorService
使用了Future
来实现异步任务的进度追踪,在任务提交后方法返回Future
以此来进一步决定取消任务还是等待任务。
下面,笔者基于官方文档的描述总结出ExecutorService
中定义的一些方法(这部分可能与上文部分内容有重叠):
方法 | 说明 |
---|---|
shutdown | 用于有序关闭线程池。在执行方法后将不会再接收新的任务,而对于在关闭之前已提交的任务可以继续执行,但并不会等待其执行完成。对于需要等待执行完成的情况,可以使用awaitTermination 方法。 |
shutdownNow | 用于立即(暴力)关闭线程池。在执行方法后,它会试图关闭所有正在执行的任务,停止对正在等待的任务(等待队列中的任务)的处理并将它们作为结果返回。同样地,此方法并不会等待正在执行的任务去执行终止操作,而对于需要等待执行终止的情况,可以使用awaitTermination 方法。另外需要注意,此方法并不保证一定能停止正在执行的任务,因为在ThreadPoolExecutor 中是通过Thread#interrupt 来实现任务的取消,所以如果任务响应中断失败则永远不会被终止。 |
awaitTermination | 用于等待线程池被终止。调用此方法会使调用者进入阻塞状态,直到所有任务都已经执行完成,或出现超时,或当前线程发生中断。如果所有任务都执行完成,线程池可以被终止,则返回true ;而如果在线程池可以被终止前发生等待超时,则返回false (一般会在发起线程池关闭请求后调用此方法)。 |
isShutdown() | 用于判断线程池是否处于关闭状态。如果线程池已经关闭了则返回true ,否则返回false 。 |
isTerminated() | 用于判断线程池是否处于TERMINATED 终止状态。在线程池关闭后如果所有任务已经执行完成则返回true ,否则返回false 。 |
submit(Runnable task) | 用于执行传入的任务,并返回一个表示任务执行结果的Future 。如果执行成功(已完成),则Future#get 将返回null 。 |
submit(Runnable task, T result) | 用于执行传入的任务,并返回一个表示任务执行结果的Future 。如果执行成功(已完成),则Future#get 将返回(参数传入的)result 。 |
submit(Callable<T> task) | 用于执行传入的任务,并返回一个表示任务执行结果的Future 。如果执行成功(已完成),则Future#get 将返回任务执行的结果。 |
invokeAny(Collection<? extends Callable<T>> tasks) | 用于执行传入的任务集合,并返回任意一个已经成功完成的结果(没有抛出异常的情况)。而在正常或异常返回后,其他还没完成的任务将被取消。 |
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 用于执行传入的任务集合,并在给定时间内返回任意一个已经成功完成的结果(没有抛出异常的情况)。而在正常或异常返回后,其他还没完成的任务将被取消。 |
invokeAll(Collection<? extends Callable<T>> tasks) | 用于执行传入的任务集合,并在所有任务完成后返回一组Future 列表(表示任务执行结果的)。其中,任务“完成”可以是正常执行完成或者抛出异常。 |
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 用于执行传入的任务集合,并在所有任务完成或者等待超时后返回一组Future 列表(表示任务执行结果的)。其中,任务“完成”可以是正常执行完成或者抛出异常。 |
最后,结合ExecutorService
提供的线程池终止方法,我们可以实现一个线程池两阶段关闭策略,即:
- 首先通过调用
shutdown
方法来拒绝新任务的传入。 - 然后通过调用
awaitTermination
方法阻塞等待线程池被终止(作用于阻塞等待任务执行完成)。- 如果
awaitTermination
方法等待超时或者发生中断,则兜底执行shutdownNow
方法进行暴力终止。 - 再次通过调用
awaitTermination
方法阻塞等待线程池被终止(作用于阻塞等待任务执行终止)。
- 如果
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
AbstractExecutorService
/**
* Provides default implementations of {@link ExecutorService}
* execution methods. This class implements the {@code submit},
* {@code invokeAny} and {@code invokeAll} methods using a
* {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
* to the {@link FutureTask} class provided in this package. For example,
* the implementation of {@code submit(Runnable)} creates an
* associated {@code RunnableFuture} that is executed and
* returned. Subclasses may override the {@code newTaskFor} methods
* to return {@code RunnableFuture} implementations other than
* {@code FutureTask}.
* ...
*/
public abstract class AbstractExecutorService implements ExecutorService {
}
AbstractExecutorService
继承自ExecutorService
,它对ExecutorService
部分抽象方法进行了实现,其中包括submit
方法、invokeAny
方法和invokeAll
方法。下面,我们分别来看看AbstractExecutorService
是如何对它们进行实现的。
-
submit
方法关于
submit
方法提供的对异步任务进度追踪的功能是基于RunnableFuture
(默认实现为FutureTask
)实现的,它首先会通过newTaskFor
方法构建出RunnableFuture
实例,然后再交由执行任务的execute
方法进行处理,最后将RunnableFuture
实例返回给调用者,让其可以对任务进度进行追踪。/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } /** * Returns a {@code RunnableFuture} for the given runnable and default * value. * * @param runnable the runnable task being wrapped * @param value the default value for the returned future * @param <T> the type of the given value * @return a {@code RunnableFuture} which, when run, will run the * underlying runnable and which, as a {@code Future}, will yield * the given value as its result and provide for cancellation of * the underlying task * @since 1.6 */ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } /** * Returns a {@code RunnableFuture} for the given callable task. * * @param callable the callable task being wrapped * @param <T> the type of the callable's result * @return a {@code RunnableFuture} which, when run, will call the * underlying callable and which, as a {@code Future}, will yield * the callable's result as its result and provide for * cancellation of the underlying task * @since 1.6 */ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
-
invokeAny
方法在
invokeAny
方法中会执行传入的一组任务,只要有一个任务执行完成就立刻返回其结果,而其他任务则全部变为取消状态。public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); }
其中上述两个
invokeAny
方法之间区别就在于是否存在超时时间,本质上它们都是委托给同一个方法进行处理的,即doInvokeAny
。/** * the main mechanics of invokeAny. */ private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); // 每个提交的任务都会把future保存在这里,目的是最后用于取消操作 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); // 对于ExecutorCompletionService的效果与ExecuteService差不多 // 区别就在于ExecutorCompletionService会把执行完成的任务添加到内部的一个队列里 // 所以我们可以通过执行它的take()方法或poll()方法获取所有任务中已经完成的任务 ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); // For efficiency, especially in executors with limited // parallelism, check to see if previously submitted tasks are // done before submitting more of them. This interleaving // plus the exception mechanics account for messiness of main // loop. try { // Record exceptions so that if we fail to obtain any // result, we can throw the last exception we got. ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); // Start one task for sure; the rest incrementally futures.add(ecs.submit(it.next())); // 未执行的任务数量-1 --ntasks; // 初始化已执行的任务数量 int active = 1; for (;;) { // 此处快速判断是否存在任务已经执行完成了,如果是则直接返回 Future<T> f = ecs.poll(); if (f == null) { /** 在此下面的判断会在所有任务都未提交前一直触发 **/ if (ntasks > 0) { // 未执行的任务数量-1 --ntasks; // 不断地提交任务并存储返回的Future futures.add(ecs.submit(it.next())); // 已执行的任务数量+1 ++active; } /** 在此下面的判断会在所有任务都已经提交后才会触发的 **/ else if (active == 0) { // 如果已执行的任务仍然为0,则跳出循环并抛出异常 break; } else if (timed) { // 如果存在超时策略,则执行如下获取逻辑 // 通过poll在规定时间内获取已完成的线程任务 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); // 如果在超时策略下获取失败,则抛出异常 if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else { // 如果不存在超时策略,则执行如下获取逻辑 // 通过take方法以阻塞等待的方式获取已完成的任务 f = ecs.take(); } } if (f != null) { --active; try { // 返回已经执行完成的任务 return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 取消未执行完成的任务 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
-
将提交的任务集合添加到
ExecutorCompletionService
中执行,并把返回的Future
对象存储到futures
列表中,执行2
步。 -
对
ExecutorCompletionService
执行poll()
/take()
操作(如果存在超时策略则执行poll()
方法)。- 如果获取结果不为空,则调用
get()
方法返回结果,执行第3
步。 - 如果执行结果为空,则继续执行第
1
步。
- 如果获取结果不为空,则调用
-
最后对
futures
列表上剩下的Future
对象执行取消操作。
ExecutorCompletionService
内部维护了一个用于存储已完成任务的列表,通过poll()
方法或take()
方法都可以获取其中已完成的任务。 -
-
invokeAll
方法与
invokeAny
不同的是,在invokeAll
方法中会对传入的任务集合操作完成后再返回一组表示任务执行结果的Future
列表(注意,此处任务“完成”可以是正常执行完成或者抛出异常),而不是任意一个已完成的任务结果。public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); // 每个提交的任务都会把future保存在这里 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { // 执行传入的所有任务 for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } // 遍历等待所有任务执行完成 for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); // 如果任务没有完成,则进行阻塞等待 if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { // 将异常抛出当作“完成”状态(忽略异常) } catch (ExecutionException ignore) { } } } done = true; // 返回futures列表 return futures; } finally { if (!done) // 如果传入的任务执行发生异常中断,则将全部任务取消 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); // 每个提交的任务都会把future保存在这里 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { // 将任务构造为Future,并添加到futures列表中 for (Callable<T> t : tasks) futures.add(newTaskFor(t)); // 计算最大等待时间 final long deadline = System.nanoTime() + nanos; final int size = futures.size(); // 逐个执行futures列表中的任务 // Interleave time checks and calls to execute in case // executor doesn't have any/much parallelism. for (int i = 0; i < size; i++) { execute((Runnable)futures.get(i)); // 判断是否超时等待 nanos = deadline - System.nanoTime(); if (nanos <= 0L) return futures; } for (int i = 0; i < size; i++) { Future<T> f = futures.get(i); // 如果任务没有完成,则进行阻塞等待 if (!f.isDone()) { // 判断是否超时等待 if (nanos <= 0L) return futures; try { f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { // 将异常抛出当作“完成”状态(忽略异常) } catch (ExecutionException ignore) { } catch (TimeoutException toe) { // 超时等待,直接阻塞返回futures return futures; } nanos = deadline - System.nanoTime(); } } done = true; // 返回futures列表 return futures; } finally { if (!done) // 如果传入的任务执行发生异常中断,则将全部任务取消 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
- 将传入的任务集合构造成
Future
对象,并将每个Future
对象添加到futures
列表中。 - 将
futures
列表中的Future
对象逐个传入execute
方法中执行(此处,如果存在超时时间则会进一步判断是否超时,如超时则直接返回futures
列表)。 - 在所有任务都操作完成后返回
futures
列表(如抛出未知异常则会将所有任务取消)。- 如果执行完成则继续遍历下一个任务(通过
Future#isDone
方法和Future#get
方法进行判断)。 - 如果抛出异常则继续遍历下一个任务(抛出异常也属于执行“完成”)。
- 如果存在超时时间则进一步判断是否超时,如超时则直接返回
futures
列表。
- 如果执行完成则继续遍历下一个任务(通过
- 将传入的任务集合构造成
从上述方法实现中,我们可以看到无论是submit
还是invokeAny
或invokeAll
,其本质还是通过调用execute
方法来实现。但是,对于execute
方法的实现在AbstractExecutorService
中并没有提供,这需要交由其子类来完成,比如ThreadPoolExecutor
。
对于
Future
的实现原理可以阅读笔者之前文章:Future为什么能作为异步运算结果,简单来说就是将任务传入Future
中,然后在执行过程中通过状态机和线程等待原理进行实现。
ThreadPoolExecutor
至此,我们已经从最抽象的Executor
沿着继承链一直往下探索到当前分析的ThreadPoolExecutor
。ThreadPoolExecutor
继承自AbstractExecutorService
,它集结了上文所述的Executor
、ExecutorService
和AbstractExecutorService
所有的精华,并对其他尚未定义的方法给出了具体的实现。
状态机
为了便于对线程池的管理与控制,ThreadPoolExecutor
定义了一系列的状态机(分别用数字-1
到3
来表示),并基于此实现出线程池的生命周期。下面,我们来看看ThreadPoolExecutor
是如何对它们进行定义的。
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
*
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示runState的偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 表示(2^29)-1,即workerCount的掩码
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}
在ThreadPoolExecutor
中,通过一个ctl
原子(32
位整型)变量来存储线程池的状态及其有效线程数量,即:
字段位 | 说明 |
---|---|
ctl#runState | 表示线程池的状态,运行、关闭等。 |
ctl#workerCount | 表示线程池中线程的有效数量。 |
在ctl
原子(32
位整型)变量中线程池状态使用了3
位存储,线程池有效数量则使用了29
位存储,即:
runState(3位) | workerCount(29位) |
---|---|
000 | 00000 00000000 00000000 00000000 |
- 对于
ctl#runState
,其第1
位是符号位,后2
位则表示状态值。- 对于
ctl#workerCount
,其表示已被许可开始执行但还未执行结束的线程数量。需要注意,此值可以短暂地与真实存活的线程有所不同,例如在请求时ThreadFactory
创建线程失败等。
因此,我们可以看到在ThreadPoolExecutor
中对线程池状态及其线程有效数量的访问和设置方式都是通过位运算实现的,即:
- 通过
runStateOf
方法与高3
位进行与运算(&
)来获得线程池状态。 - 通过
workerCountOf
方法与低29
位进行与运算(&
)来获得线程池中有效线程数量。 - 通过
ctlOf
方法将高3
位的线程池状态与低29
位的线程池线程数量进行或运算(|
)来设置ctl
变量。
对于
ctl#workerCount
,其被限制为(2^29)-1
,即最大有效线程数大约5亿
个(并不是(2^31)-1
,约20亿
)。如果在未来因为这个产生了问题,则可以把变量改变为AtomicLong
,并将相应的shift/mask
也进行调整。但在这之前使用AtomicInteger
可以让代码更快、更简单。
其中,根据线程池的生命周期ThreadPoolExecutor
对ctl#runState
定义了5
种状态,并在不同的状态下限制不同的行为,即:
生命周期 | 说明 |
---|---|
RUNNING | 线程池处于运行状态,接收新任务和工作队列中的任务。 |
SHUTDOWN | 线程池处于关闭状态,不接受新任务,但执行队列中的任务。 |
STOP | 线程池处于停止状态,不接收新任务,不执行队列中的任务,并中断正在执行的任务。 |
TIDYING | 线程池处于终止状态前的过度状态,所有任务已经终止,workerCount 为0 ,转变到TIDYING 状态的线程将执行terminated() 方法。 |
TERMINATED | 线程池处于终止状态,terminated() 已经完成。 |
对不同状态间的转换ThreadPoolExecutor
也给出了明确的指示,即:
RUNNING
->SHUTDOWN
- 在调用
shutdown()
时触发,可能发生在finalize()
- 在调用
(RUNNING or SHUTDOWN)
->STOP
- 在调用
shutdownNow()
时触发
- 在调用
SHUTDOWN
->TIDYING
- 当队列和线程池都为空时触发
STOP
->TIDYING
- 当线程池为空时触发
TIDYING
->TERMINATED
- 当执行
terminated()
完成后触发
- 当执行
最后,基于上文对状态机的分析,这里笔者描绘出如下关于线程池生命周期的流程图:
both queue and pool are empty
++
+---------+ shutdown() +----------+ | +---------+ terminated() +------------+
| RUNNING +----------->+ SHUTDOWN +------+---->+ TIDYING +------------->+ TERMINATED |
+----+----+ +-----+----+ +----+----+ +------------+
| | ^
| shutdownNow()| |
| v |
| +-----+----+ |
+---------------->+ STOP +-----------------+
shutdownNow() +----------+ pool is empty
至此,我们应该已经充分地认识到ThreadPoolExecutor
的状态机,下面我们将开始从构造
->执行
->关闭
的思路去阅读ThreadPoolExecutor
的源码。
构造
在ThreadPoolExecutor
实例创建时,它会将通过构造方法传入的线程池核心参数保存到成员变量中。
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/* The context to be used when executing the finalizer, or null. */
private final AccessControlContext acc;
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其中,大部分参数在上文已经有所提及,如有疑问可以回顾上文内容。下面,笔者贴出其中相应参数的说明与作用:
参数 | 说明 |
---|---|
workQueue | 表示任务队列,用于持有任务和传递任务。 |
threadFactory | 表示线程工厂,用于创建所有线程。 |
handler | 表示拒绝处理器,用于处理线程池饱和或被关闭时拒绝任务的提交。 |
keepAliveTime | 表示空闲线程超时时间,在线程空闲时间超过此限制时会被销毁。默认情况下只会作用于非core 线程,可通过修改allowCoreThreadTimeOut 属性作用于所有线程。 |
allowCoreThreadTimeOut | 表示空闲线程超时策略是否作用于core 线程,如果不允许(false )则会使得core 线程一直存活;反之(true )则会让core 线程在空闲时间超过keepAliveTime 时被销毁。 |
corePoolSize | 表示core 线程最大数量,如果core 线程不存在超时策略,它则代表着保持存活的最小工作线程数(如存在超时策略则此值为0 )。 |
maximumPoolSize | 表示线程池最大线程数,需要注意它同时也受内部边界CAPACITY 的限制。 |
defaultHandler | 表示默认拒绝处理器,其默认值为AbortPolicy 策略,即抛出异常。 |
acc | 当执行finalizer 或null 时需要用到的上下文。 |
另外,ThreadPoolExecutor
还存在一些用于统计数据的变量,即:
workers
:用于存储线程池中所有工作线程的集合。largestPoolSize
: 用于表示线程池所获得的最大线程大小(包含历史值)。completedTaskCount
:用于表示已完成的任务数量(只有在工作线程被终止时才会更新)。
除了上述的构造方法外,
ThreadPoolExecutor
还重载了其他的参数更少的构造方法,而对于其中缺失的参数则是通过默认参数来代替,即:public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, > ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,> RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
此处可以看到主要是包含两个默认的构造参数参数,分别是:
ThreadFactory
参数的默认值,Executors.defaultThreadFactory()
RejectedExecutionHandler
参数的默认值,defaultHandler=new AbortPolicy()
执行
在对ThreadPoolExecutor
构造完后,我们就可以调用execute
方法执行提交的任务了。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 1.线程数小于corePoolSize,直接添加线程执行任务
if (workerCountOf(c) < corePoolSize) {
// 添加线程并执行任务
if (addWorker(command, true))
return;
// 添加线程失败,获取最新的ctl
c = ctl.get();
}
// 2.如果线程池在运行状态,并入队成功
if (isRunning(c) && workQueue.offer(command)) {
// double-check
int recheck = ctl.get();
// 如果线程池关闭了,则进行回滚操作,并拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池没有线程了,则开启一个新的线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.试图在添加一遍新任务,如再次失败则进行拒绝操作
else if (!addWorker(command, false))
reject(command);
}
从
Executor
、ExecutorSerivce
到AbstractExecutorService
一直都没被定义的execute
方法在此处进行了实现。
ThreadPoolExecutor
对于execute
方法的定义是,对于被提交的任务会在将来某个时间执行,其中对于任务的执行可以通过新的线程或者线程池中的线程完成。而如果任务不能被提交(线程池容量饱和或者被关闭)则会将其交由RejectedExecutionHandler
进行处理(拒绝策略)。具体地,我们可以将execute
方法的执行步骤分为3
步(详情可阅读上述源码):
- 如果正在运行的线程数小于
corePoolSize
,则试图去启动一个新的线程并将传入的command
任务作为它的第一个任务。如果成功,则执行结束;否则执行第2
步。 - 如果当前线程池处于
RUNNABLE
状态,则将传入的command
任务添加到工作队列中;如果当前线程不是RUNNABLE
状态或者添加到工作队列失败,则执行第3
步。- 检查当前线程池是否处于
RUNNABLE
状态(double-check
),如果不是则将第2
步添加到工作队列的任务移除(回滚),并执行拒绝策略,最后结束方法执行。 - 检查线程池的线程数是否等于
0
,如果是则启动一个新的线程(无分配任务的线程),然后结束方法执行。
- 检查当前线程池是否处于
- 如果任务添加到工作队列失败,则再次试图去启动一个新的线程。如果成功,则执行结束;否则执行拒绝策略(可能线程池队列已饱和或者被关闭)。
关于execute
方法的执行步骤,因为需要考虑到一些异常的情况,所以看上去可能还是有点复杂,这里笔者将其进行了简化,即:
- 当线程池中线程数小于
corePoolSize
时,无论是否存在空闲线程都新建一个线程执行任务(第1
步)。 - 当线程池中线程数大于
corePoolSize
时,执行任务优先进入工作队列等待,如果入队失败则表示工作队列已饱和(第2
步)。 - 当线程池中线程数大于
corePoolSize
且工作队列已饱和,则通过新建线程来执行任务,直至线程数大于maximumPoolSize
(第3
步)。 - 当线程池中线程数大于等于
maximumPoolSize
且工作队列已饱和,则执行执行拒绝策略(第3
步)。
以上即是execute
方法大致的执行流程,下面我们再细致到每个关键步骤进行分析。
添加线程
在execute
方法的执行流程中,它是通过addWorker
方法来实现添加线程的。
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
/** 判断是否允许新增工作线程 **/
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断当前线程池是否允许添加线程
// 1. runState < SHUTDOWN,则表示运行状态,正常继续往下执行
// 2. runState >= SHUTDOWN时,即SHUTDOWN、STOP、TIDYING、TERMINATED,
// - 在STOP、TIDYING、TERMINATED的状态下都不能添加线程的(即,runState != SHUTDOWN),所以直接返回false
// - 在SHUTDOWN状态时是只能执行工作队列中的任务,而不能接收新的任务。对于线程池不存在工作线程的情况下是允许创建一个空线程来执行工作队列中的任务,即在 rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() 这种情况下是可以添加空线程的。反之,其他情况直接返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 判断线程数量是否超过边界
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 以传入参数控制边界的值为corePoolSize或者maximumPoolSize
return false;
// CAS自增线程数量workerCount,若成功则退出判断逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态发生变更,重试判断是否允许新增工作线程操作
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/** 执行添加工作线程 **/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 判断当前线程池是否允许添加线程
// 1. runState < SHUTDOWN,则表示运行状态,即可以添加工作线程
// 2. rs == SHUTDOWN && firstTask == null,则表示线程池处于SHUTDOWN状态并且无分配任务的情况。在SHUTDOWN状态时是只能执行工作队列中的任务,而不能接收新的任务,即它是允许创建一个空线程来执行工作队列中的任务的。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将创建的工作线程添加到workers中
workers.add(w);
// 更新largestPoolSize的值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 没启动成功的,进行回滚操作
addWorkerFailed(w);
}
return workerStarted;
}
关于addWorker
方法,它主要分为两个步骤,即:
- 判断是否允许新增工作线程。
- 执行添加线程池工作线程(
double-check
,包含是否允许新增工作线程的判断)。
其中,对于第1
点和第2
点中关于“是否允许新增工作线程”的逻辑,笔者在代码相应部分都已经给了很详细的注释说明了,所以在这里就不再展开了。下面,我们将重点聚焦在第2
点“执行添加工作线程”中的“添加”部分,即:
- 通过传入的
firstTask
任务创建Worker
实例对象。 - 将
Worker
实例对象添加到workers
线程列表中。 - 通过
Thread#start
启动第1
步所创建Worker
实例对象(继承自Runnable
接口)。
关于
firstTask
,它表示新线程执行的第一个任务(如有)。如果将firstTask
设置null
则表示不接受新任务,而是执行队列中的任务。
在
addWorker
中如果发生线程启动失败(可能是线程工厂返回null
,或者抛出异常等,典型的是在Thread#start
时发生OOM
),则会通过调用addWorkerFailed
方法进行回滚操作,即/** * Rolls back the worker thread creation. * - removes worker from workers, if present * - decrements worker count * - rechecks for termination, in case the existence of this * worker was holding up termination */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将当前创建的`worker`从`workers`列表中移除。 if (w != null) workers.remove(w); // 通过调用`decrementWorkerCount`对`workerCount`数量进行递减 decrementWorkerCount(); // 通过调用`tryTerminate()`尝试终止线程池 tryTerminate(); } finally { mainLock.unlock(); } }
不难看出,在
addWorkerFailed
方法仅仅做了简单的清理操作:
- 将当前创建的
worker
从workers
列表中移除。- 通过调用
decrementWorkerCount
对workerCount
数量进行递减- 通过调用
tryTerminate()
尝试终止线程池
执行任务
在上一小节里我们通过addWorker
方法添加线程,并在其中调用Thread#start
方法启动了线程。下面我们来看看线程中是如何执行任务的:
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 将AQS中的state设置为-1时为了抑制中断,详情可阅读interruptIfStarted方法
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
// 此处通过判断state来抑制中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
首先,在构造Worker
实例时会进行初始化,即:
- 设置
Worker
实例第一个需要执行的任务firstTask
,如果firstTask
为null
则表示没有需要执行的任务,后续会从工作队列中获取任务来执行。 - 设置
Worker
实例的执行线程,此处会将当前对象实例(Worker
实例,继承自Runnable
)传入线程工厂ThreadFactory
来实例化线程。
对于
Worker
类,我们可以看到它分别继承AbstractQueuedSynchronizer
类和Runnable
接口,这也意味着:
- 在线程启动时会执行其中的
Runnable#run
方法(继承自Runnable
接口)。- 在线程启动后可以通过
AQS
的同步策略来实现互斥(继承自AbstractQueuedSynchronizer
类)。关于
Worker
类选择继承AbstractQueuedSynchronizer
来实现资源的互斥而没有使用现有的ReetrantLock
,是因为它不想在调用线程池控制方法(例如setCorePoolSize
)时,Worker
任务(即,Worker#run
)可以重新获取锁。也就是说,这里需要的是一个不可重入的互斥锁,而ReetrantLock
是一个可重入的互斥锁,不符合需要。
对于
AbstractQueuedSynchronizer
的实现原理在这里就不继续展开了,有兴趣的读者可以阅读笔者之前的文章:什么是AQS。
在Worker
实例化后会将对象传到外层(addWorker
方法),然后在外层(addWorker
方法)取出其成员变量Thread
并执行Thread#start()
方法启动执行,即执行Worker
实例的Runnable#run
方法。
接着,在Worker#run
的方法中我们可以看到它立刻就调用了runWorker(this)
方法,即:
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 获取任务
while (task != null || (task = getTask()) != null) {
// 此处会通过lock方法对资源进行锁定,防止在执行过程中被中断了
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 扩展方法钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 扩展方法钩子
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 处理工作线程退出操作
processWorkerExit(w, completedAbruptly);
}
}
对于runWorker
方法的执行流程,在它的方法注释上就已经给出很详细的说明,即:
- 如果初始任务存在(若没执行),则将它作为第一个执行任务;如不存在或者已执行,则通过
getTask
方法从工作队列中获取任务(若线程池正在运行)。 - 执行
Worker#lock
方法来持有它的互斥锁,以阻止线程池在任务正在执行时将其中断(除非线程池正在停止,否则不会对线程执行中断)。 - 调用执行任务前的钩子方法,即
beforeExecute
方法。如果beforeExecute
抛出异常会使得线程在没有处理任务就被销毁了(在completedAbruptly
为true
时跳出循环)。 - 执行第
1
步中获取的任务,即执行Runnable#run
方法。如果执行任务过程中抛出异常则会将异常传输到afterExecute
进行处理,并销毁当前执行线程(此处采用保守处理,即任何异常都会使得执行线程被销毁)。 - 调用执行任务后的钩子方法,即
afterExecute
方法。如果afterExecute
方法抛出异常会使得执行线程被销毁(根据JLS Sec 14.20
,即使task.run
抛出异常,在afterExecute
方法中抛出的异常也将生效)。
关于获取任务的
getTask()
方法,它是有可能因为线程池状态变更、配置参数的变化或者抛出异常等原因导致其返回null
,进而使得当前工作线程(Worker
实例)被销毁(Runnable#run
方法执行结束)。而对于当前工作线程(Worker
实例)销毁后是否再创建新的工作线程(Worker
实例)则取决于processWorkerExit
方法的实现逻辑了,详情可阅读下文。
简单来说,runWorker
方法就是不断的调用getTask()
方法获取任务(如存在初始任务,则优先执行初始任务),然后执行获取任务(执行Runnable#run
方法),最后在被销毁前调用processWorkerExit
方法进行结束处理(由于异常/超时等原因)。
下面我们来看看其中两个比较关键的步骤,即getTask()
方法和processWorkerExit
方法。
-
getTask()
方法/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断线程池是否能够获取任务 // 1. 如果线程池处于SHUTDOWN状态,并且工作队列为空,则无法获取任务,即返回null // 2. 如果线程池处于STOP、TIDYING、TERMINATED状态,则无法获取任务,即返回null // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 当前工作线程即将销毁,workCount数量递减 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 判断当前线程是否应用keepAliveTime策略,即超时策略 // 1. 如果线程数大于corePoolSize,则此工作线程应用keepAliveTime策略 // 2. 如果线程池设置了allowCoreThreadTimeOut,则此工作线程应用keepAliveTime策略(全部线程都应用keepAliveTime策略) // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 此处为会判断大于maximumPoolSize,可能期间会通过setMaximumPoolSize进行调整 // 在正常情况会判断超时情况, // 判断当前工作线程是否有必要继续运行 // 1. 判断当前工作线程数量是否超过了maximumPoolSize(在执行期间通过setMaximumPoolSize进行了调整)。 // 2. 判断在有超时策略的前提下上一次阻塞等待是否超时。 // 如果2种情况任意一种发生了,都可以进一步去让当前工作线程被销毁,即返回null // 但是,因为在工作队列不为空时证明线程池还没有处于空闲状态,和工作线程不能全部被销毁,所以又加上wc > 1 || workQueue.isEmpty()的条件判断 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 当前工作线程即将销毁,workCount数量递减 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 从工作队列中获取任务 Runnable r = null; if(timed){ // 如果存在keepAliveTime策略,则使用poll获取任务(存在超时策略阻塞策略) workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) }else{ // 如果不存在keepAliveTime策略,则使用take获取任务(一直阻塞) workQueue.take(); } // 如果获取任务成功,则返回 if (r != null){ return r; } // 执行至此,证明是阻塞超时而返回,即r==null(留到下一次循环中会进行处理) timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
对于
getTask()
方法的执行步骤,笔者根据代码逻辑整理了下来,即:- 判断线程池能够获取任务,如果不能则递减
workerCount
,并返回null
。- 如果线程池处于
SHUTDOWN
状态,并且工作队列为空,则无法获取任务,即返回null
(即将销毁)。 - 如果线程池处于
STOP
、TIDYING
、TERMINATED
状态,则无法获取任务,即返回null
(即将销毁)。
- 如果线程池处于
- 判断当前工作线程是否有必要继续运行,如果不能则递减
workerCount
,并返回null
。- 如果线程数超过
maximumPoolSize
(若存在其他工作线程),并且工作队列处于空闲状态,则当前线程不必继续运行,即返回null
(即将销毁)。 - 如果在有超时策略的前提下上一次阻塞发生了超时等待(若存在其他工作线程),并且工作队列处于空闲状态,则当前线程不必继续运行,即返回
null
(即将销毁)。
- 如果线程数超过
- 从工作队列中获取任务(阻塞),如果获取成功则返回,否则跳回第
1
步继续执行。-
如果设置了超时策略,则通过调用
poll
方法获取任务。 -
如果没有设置超时策略,则通过调用
take
方法获取任务。
-
不难看出,对于线程池中线程的超时策略是通过队列
Queue
的poll(time,TimeUnit)
方法来实现的,即如果在规定时间内如果无法获取到任务(超时),则返回null
并在getTask()
方法外层对相应的工作线程进行销毁;而对于线程池的复用原理,则是在一个线程中不停地通过getTask()
方法从工作队列中获取任务来执行,以此来实现线程的复用。从这个角度来看线程池的复用原理的确跟工作队列密切相关,因此在工作队列的选择上需要十分讲究。 - 判断线程池能够获取任务,如果不能则递减
-
processWorkerExit
方法/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { // completedAbruptly=true表示是通过异常终止的,对此线程池采用了保守处理,即将销毁当前线程(递减workerCount) if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 统计被销毁线程的完成任务数 completedTaskCount += w.completedTasks; // 移除工作线程 workers.remove(w); } finally { mainLock.unlock(); } // 尝试终止线程池 tryTerminate(); int c = ctl.get(); // 判断是否替换worker if (runStateLessThan(c, STOP)) { // 如果工作线程是正常终止的 if (!completedAbruptly) { // 计算当前工作线程数量 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果工作队列不为空,工作线程数量不能为0 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 判断工作线程数量是否大于最小值,如果是则终止方法执行,否则继续往下执行(通过addWorker添加工作线程) if (workerCountOf(c) >= min) return; // replacement not needed } // 添加无初始任务的工作线程 addWorker(null, false); } }
从上述代码中可以看到,
processWorkerExit
方法主要就是对线程池统计数据进行计算、工作线程进行清理回收和替换等,下面笔者将相关的执行流程整理了下来:- 对线程池工作线程数进行统计更新(如异常退出,则对
workerCount
进行递减)。 - 对线程池完成任务数量
completedTaskCount
进行统计和更新。 - 将当前工作线程从线程队列
workers
中移除。 - 调用
tryTerminate
方法尝试终止线程池(详情可阅读下文)。 - 计算当前工作线程数,如果大于最小线程数则执行结束。
- 如果当前工作线程是正常退出(替换),或者工作线程数小于最小线程数(补充),则调用
addWorker
方法添加工作线程。
- 对线程池工作线程数进行统计更新(如异常退出,则对
拒绝策略
在execute
方法中多次添加任务失败后,则会执行拒绝策略,即ThreadPoolExecutor#reject
。
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
在ThreadPoolExecutor#reject
方法中会委托给拒绝处理器RejectedExecutionHandler
进行处理,默认情况下RejectedExecutionHandler
采用了AbortPolicy
策略。而除了AbortPolicy
策略外,ThreadPoolExecutor
还预设了其他几种策略让我们使用,分别是CallerRunsPolicy
策略、DiscardPolicy
策略和DiscardOldestPolicy
策略,即:
拒绝策略 | 描述 |
---|---|
AbortPolicy (默认) | 在拒绝时,处理器抛出运行时异常RejectedExecutionException 。 |
CallerRunsPolicy | 在拒绝时,把任务交由调用execute 的线程自己执行。另外,如果执行器被关闭则此策略不生效,直接将提交的任务丢弃。 |
DiscardPolicy | 在拒绝时,将不能被执行的任务直接丢弃。 |
DiscardOldestPolicy | 在拒绝时,将队头的任务丢弃,然后再次执行提交的任务(重试),如再次失败则再次执行重试。另外,如果执行器被关闭则此策略不生效,直接将提交的任务丢弃。 |
即便上述策略都不适用,我们还可以自定义符合自己需要的策略。为了更好地了解拒绝策略,下面我们先来看看RejectedExecutionHandler
是如何定义的。
/**
* A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
*/
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
在RejectedExecutionHandler
中只定义了一个rejectedExecution
方法,它会在ThreadPoolExecutor#execute
不能接收任务时被调用(线程池已关闭,或者线程数已经到达maximumPoolSize
且队列已满的情况)。在预设或者自定义的不同策略中,我们只需要实现它的rejectedExecution
方法即可,下面笔者把相应的实现贴了出来:
-
AbortPolicy
/** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 抛出异常 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
AbortPolicy
拒绝策略是直接抛出RejectedExecutionException
异常(默认策略)。 -
CallerRunsPolicy
/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 通过调用线程执行 r.run(); } } }
CallerRunsPolicy
拒绝策略是直接将要执行的任务交给调用线程执行。需要注意,如果线程池已关闭,则直接丢弃任务。 -
DiscardPolicy
/** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 静默丢弃,不做处理 } }
DiscardPolicy
拒绝策略是直接丢弃当前提交的任务(静默)。 -
DiscardOldestPolicy
/** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 丢弃队头任务,再次将当前提交任务传入execute方法中执行 e.getQueue().poll(); e.execute(r); } } }
DiscardPolicy
拒绝策略是直接丢弃等待时间最长的任务,并将当前提交的任务再次传入execute
方法中执行。需要注意,如果线程池已关闭,则直接丢弃任务。 -
自定义
RejectedExecutionHandler
如果上述策略都不符合我们需要,则可以实现
RejectedExecutionHandler
,自定义自己的拒绝策略。public class CustomRejectedExecutionHandler implements RejectedExecutionHandler{ public CustomRejectedExecutionHandler(){} public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 自定义 } }
至此,对于ThreadPoolExecutor
的拒绝策略已经分析完毕。
流程总结
最后,笔者将上述整个执行流程描绘成一张图,如下所示。
task: XXX +-----+-----+-----+-----+
XXX 2.submit to q | | | XXX | XXX |
+-------------->+ | | XXX | XXX +----+
| +-----+-----+-----+-----+ |
| |
| | poll()/take()
| |
| v
| +-----------+-----------+
| | |
| | +----------------+ |
+----------+ +----+----+ 1.submit to corepool | | | |
| main +----->+ execute +-----+---------------------------> XXX XXX +--------------------+
+----------+ +----+----+ | | | XXX XXX | | |
| | | | | | |
| | | | | | v
| | | | | | +-----+------+
| | | | corePool | | | |
| | | +----------------+ | | exit |
| | | | | |
| +---------------------------> XXX XXX | +-----+------+
| 3.submit to maxpool | XXX XXX | ^
| | | |
| | XXX XXX | |
| | XXX XXX +----------------+
| | |
| | maximumPool |
| +-----------------------+
| +--------------+
| | |
+------------->+ reject |
4.reject task | |
+--------------+
关闭
最终,在使用完ThreadPoolExecutor
后需要调用shutdown
方法或者shutdownNow
方法对其进行关闭操作。
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* Performs any further cleanup following run state transition on
* invocation of shutdown. A no-op here, but used by
* ScheduledThreadPoolExecutor to cancel delayed tasks.
*/
void onShutdown() {
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
其中shutdown
方法用于有序关闭线程池,在执行方法后将不会再接收新的任务,而对于在关闭之前已提交的任务可以继续执行,但并不会等待其执行完成;shutdownNow
方法则用于立即(暴力)关闭线程池,在执行方法后,它会试图关闭所有正在执行的任务,停止对正在等待的任务(等待队列中的任务)的处理并将它们作为结果返回。同样地,此方法并不会等待正在执行的任务去执行终止操作。
虽然,shutdown
方法与shutdownNow
方法在关闭策略上有所差异,但是其实现步骤上却是大体相同,即:
- 通过调用
checkShutdownAccess
方法检查是否存在关闭权限。 - 通过调用
advanceRunState
方法变更线程池运行状态。- 如果是通过
shutdown
方法关闭,则会变更为SHUTDOWN
。 - 如果是通过
shutdownNow
方法关闭,则会变更为STOP
。
- 如果是通过
- 中断线程池工作线程。
- 如果是通过
shutdown
方法关闭,则通过interruptIdleWorkers
方法进行中断。 - 如果是通过
shutdownNow
方法关闭,则通过interruptWorkers
方法进行中断。
- 如果是通过
- 执行关闭钩子方法
onShutdown
(若通过shutdown
方法关闭)。 - 执行
tryTerminate
方法尝试关闭线程池。 - 通过调用
drainQueue
方法获得工作队列中的任务并返回(若通过shutdownNow
方法关闭)
以上即是线程关闭的大致执行流程,下面我们对一些关键步骤进行分析。
首先,关于线程池关闭的中断操作它们分别用到了interruptIdleWorkers
方法和interruptWorkers
方法,就如同它们的方法名一样,interruptIdleWorkers
方法会中断空闲的工作线程,而interruptWorkers
方法则会中断所有工作线程,下面我们来看看它们是如何实现的。
-
interruptIdleWorkers
方法private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
在
interruptIdleWorkers
方法中会遍历工作线程列表workers
,并通过Thread#interrupt
方法实现对其中工作线程的中断。但在调用Thread#interrupt
方法之前,它会通过调用Worker
实例的tryLock
方法来判断当前工作线程是否处于空闲状态,只有处于空闲状态下才能执行Thread#interrupt
方法。在执行
runWorker
方法时会执行lock
方法将资源锁定,导致在其他地方无法获取到资源。此时在interruptIdleWorkers
方法中调用Worker#tryLock
方法将会失败,即它不会对处理中的工作线程执行中断。 -
interruptWorkers
方法/** * Interrupts all threads, even if active. Ignores SecurityExceptions * (in which case some threads may remain uninterrupted). */ private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } private final class Worker extends AbstractQueuedSynchronizer implements Runnable { void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
在
interruptWorkers
方法中也会遍历工作线程列表workers
,但它不会对工作线程是否处于空闲状态进行判断,而是直接调用Worker
实例的interruptIfStarted
方法执行中断。需要注意,在
interruptIfStarted
方法中加入了getState() >= 0
的判断是因为在Worker
构造到执行runWorker
方法的间隔是不允许被中断的(此时线程可能尚未初始化或者尚未启动,执行中断无意义),所以在此期间会将state
设置为-1
。
最后,在完成线程池的一些数据统计和基本清理操作之后,它会调用tryTerminate()
方法尝试进行线程池的终止。
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 判断是否可以终止1,不能则直接return
// 1. 如果当前线程池处于RUNNING状态,则不能终止,结束执行
// 2. 如果当前线程处于TIDYING状态、TERMINATED状态,则无需再终止,结束执行
// 3. 如果当前线程处于SHUTDOWN状态且工作队列不为空,则不能终止,结束执行
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 判断是否可以终止2,如果工作线程数量不为0,则不能终止。对空闲线程执行中断操作,并结束执行
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 进行终止
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 修改线程池状态,设置为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 扩展钩子方法
terminated();
} finally {
// 修改线程池状态,设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒其他等待终止线程(通过`awaitTermination`方法进入阻塞)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
// 等待线程池终止
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 判断当前线程池是否处于TERMINATED状态,如果是则直接返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// 阻塞等待(有超时策略)
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() {}
在tryTerminate
方法中主要做的是对线程池状态的变更,并在一定条件下会执行interruptIdleWorkers
方法对空闲线程进行中断,具体执行步骤如下所示:
- 判断是否可以终止线程池,不能则直接结束执行。
- 如果当前线程池处于
RUNNING
状态,则不能终止,结束执行。 - 如果当前线程处于
TIDYING
状态、TERMINATED
状态,则无需再终止,结束执行。 - 如果当前线程处于
SHUTDOWN
状态且工作队列不为空,则不能终止,结束执行。
- 如果当前线程池处于
- 判断是否可以终止线程池,如果当前工作线程数量不为
0
,则不能终止。另外,对空闲线程执行中断操作,并结束执行。 - 执行线程池终止操作。
- 变更线程池状态为
TIDYING
。 - 执行线程池终止的钩子方法
terminated
。 - 变更线程池状态为
TERMINATED
。 - 唤醒其他等待线程池终止线程(通过
awaitTermination
方法进入阻塞)。
- 变更线程池状态为
至此,对于ThreadPoolExecutor
的关闭已经分析完毕了。简单来说,无论是shutdown
还是shutdownNow
都是先执行对线程池状态的变更(SHUTDOWN
或者STOP
),让其无法接受新任务和无法执行任务(若是shutdownNow
);然后中断工作线程(空闲的或者所有的),让工作线程数量逐渐下降到0
;最后将线程池状态的变更为TERMINATED
。
其中,对于第一次状态变更(SHUTDOWN
或者STOP
)和中断(空闲的或者所有的)都是为了让工作线程逐步地被销毁,这样做的原理则是在添加线程和执行任务步骤中都会对线程池状态进行判断,如果不符合则会结束执行,并最后销毁线程(若对此还存在疑惑,建议结合上下文再阅读一遍)。
关于权限判断的
checkShutdownAccess()
方法,它会判断调用者是否有关闭每个工作线程的权限,具体逻辑如下所示:/** * If there is a security manager, makes sure caller has * permission to shut down threads in general (see shutdownPerm). * If this passes, additionally makes sure the caller is allowed * to interrupt each worker thread. This might not be true even if * first check passed, if the SecurityManager treats some threads * specially. */ private void checkShutdownAccess() { SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) security.checkAccess(w.thread); } finally { mainLock.unlock(); } } }
对于上述
SecurityManager
的权限管理,可阅读《JDK官方文档关于SecurityManager的描述》。其中,此处所需要的modifyThread
权限是JDK
默认自带的,即在默认下就是有权限对工作线程执行关闭操作。
总结
本文从“概述”、“用法”和“实现原理”的角度对线程池ThreadPoolExecutor
进行了全面的分析,通过对本文的学习应该对ThreadPoolExecutor
有了一个全面的认识。如果读者对ThreadPoolExecutor
有更多的兴趣或者更多的疑问可以自行阅读相关材料或者相关源码。
参考
- 《Java并发编程实践》
- 《Java并发编程的艺术》
- Wiki《Thread Pool》
- 美团《Java线程池实现原理及其在美团业务中的实践》
未经本人许可,禁止转载
转载自:https://juejin.cn/post/7143494647954800654