likes
comments
collection
share

Java线程池详解二:线程池的使用

作者站长头像
站长
· 阅读数 16

我们可以为每一个异步任务都直接创建一个线程去执行则,但这样会其实存在巨大的风险,尤其是在任务比较多的情况下。线程是要消耗资源的,当线程过多时大量线程对资源的消耗反而会减低系统的性能和稳定性。线程主要会占用以下资源:

  • 内存资源:如果可运行的线程数量远大于处理器的数量,那么大量的线程将因得不到CPU资源而被闲置,这些闲置的线程所申请的内存仍然被占用着,尤其对 Java 来说线程栈未被被释放的话线程栈内引用的堆内的对象是不会被 GC 所会回收的。

  • 计算资源: 当存在大量现场时,线程竞争 CPU 资源所产生的性能开销也是不可忽略的,若果处理器已经足够忙碌了,那么创建更多的线程反而会减低系统的性能。

  • 系统调用: 线程的创建和销毁需要操作系统和 JVM 提供支持。如果线程创建的速率非常高而任务又都非常简单,那么可能线程创建和销毁的开销反而大于任务自身执行的开销。

  • 依赖资源: 线程中往往还会使用到其它资源,例如DB链接,文件句柄等,若这些资源的数量数小于线程数,那么没有竞争到资源的线程将阻塞。这时若再创建更多的线程只会加剧竞争而并不会提升系统的吞吐量。

同时操作系统对可创建线程的数量是存在一定限制的。

当线程数过多线程占用的资源过高时,系统的性能可能将逐步减低,但严重的情形下可能会发生内存不足、外部资源占用过高而导致外部系统崩溃或外部请求返回错误,这些情况系统很可能抛出 OutofMemoryError 或其它系统错误,从而可能导致整个系统崩溃。

要避免这些风险,就需要对该对程序可以创建的线程数量进行限制,并且全面的测试程序,从而确保在线程数量达到限制时,程序也不会耗尽资源。一个好的方案是使用线程池,线程池不仅可以大大减低因线程数量过多而耗尽资源的问题,也可以大大提升线程资源的利用率,并且能更好的管理任务的提交和执行。

1.  线程池概述

线程池,是一种基于池化思想的线程管理技术。将若干数量的线程放入到一个线程池中,当有任务需要执行时从池中分配一个空闲线程来执行任务,当任务执行完成后再将线程放回到池中。

通过使用线程池可以限制线程的数量,避免线程过多而将系统资源消耗殆尽,同时可以避免频繁地创建和销毁线程,从而提高程序的效率和性能。使用线程池主要有以下优势:

  • 减少资源消耗:线程池中线程数一般都是确定的,可以避免创建过多的线程导致资源耗尽。同时线程池中线程是可以复用的,新的任务可以使用空闲线程,任务执行完成后线程作为空闲线程回到池中,减少了线程创建和销毁的资源消耗。
  • 提高响应速度:当新的任务到达时,可以直接使用线程池中空闲的线程,也就没有了等待线程创建的延迟,从而提高了响应性。
  • 灵活性:线程池将任务的提交和执行解耦开来,在线程池中可以按实际需求灵活要定制一些策略来控制任务的提交和线程的执行。 

线程池将任务的提交与执行解耦开来,在线程池可以灵活的定制为任务的执行策略,在定制执行策略时主要需要考虑以下几项:

  • 同时能有多少个任务并发执行?
  • 选择哪一个线程来执行任务?
  • 提交的任务按照什么顺序执行 ,FIFO、 LIFO、自定义优先级?
  • 任务队列的大小,无界的还是有界的队列,有界队列的大小又应该设置为多少?
  • 如果系统由于过载而需要拒绝一个任务,那么应该拒绝哪一个任务?另外,如何通知应用程序任务被拒绝?
  • 在执行一个任务之前或之后,应该进行哪些动作?

1.1.  Executor 框架

我们使用线程池时主要涉及到类的类图如下:

Java线程池详解二:线程池的使用

这三个类都位于 java.util.concurrent 包下。

Executor 接口只简单的定义了一个提交任务的 execute 方法,但它却表达了线程池的一个最基本的思想:将任务的执行和提交解耦开来。Executor 基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),工作线程从任务队列中获取任务并执行则相当于消费者。

Executor 接口过于简单了,一个线程池应该还要包含返回任务的 Future、提交 Callable 任务、关闭线程池、批量提交任务等。所以在在 Executor 上扩展了 ExecutorService 接口,它继承自 Executor,完善了线程池一些基础操作的定义。

ThreadPoolExecutor 是 Java 提供的通用的基础的线程池实现。Java 中已经提供了一些常用的执行策略的线程池,它们位于 java.util.concurrent.Executors 中,这些线程池都是通过配置不同的策略的 ThreadPoolExecutor 实现的。

1.2.  任务队列

线程池还与任务队列有着密切的关系,线程池中一般都包含一个任务队列,队列中保存了所有等待执行的任务。有了队列后工作者线程的任务就很简单:工作线程只需要从任务队列中获取任务执行,执行完成后再继续从任务队列中获取任务执行。

Java线程池详解二:线程池的使用

Java 提供的线程池中一般都要求使用阻塞队列 BlockingQueue。使用阻塞队列当队列没有任务时可以将获取任务的线程阻塞,从而使线程释放 CPU 资源提供资源的利用率,当队列中有任务时再唤醒线程。

队列的策略也将直接影响到线程池中任务的执行,例如队列的排序规则、支持公平锁还是非公平锁、队列的容量界线等。

1.3.  Callable 与 Future

Executor 接口提供线程池最基础的框架,但如果要使用 Executor,必须将任务表述为一个Runnable。但 Runnable 是一种有很大局限的抽象,虽然在 run 方法中能将结果写入到日志文件或者放入某个共享的数据结构中,但它不能返回一个值或抛出一个受检查的异常。

Java 5开始引入的 Callable 则表示有返回结果或可能抛出受检查的异常的异步任务。

public interface Callable {
    V call() throws Exception;
}

但并没有直接在 Executor 接口上扩展提交 Callable 的相关的方法。而是在 ExecutorService 上新增了提交 Callable 任务的方法。

public interface ExecutorService extends Executor {
    <T> Future<T> submit(Callable<T> task);
}

Callable 的任务的执行结果通过 Future 来获取。Callable 任务被提交到线程池中时会返回一个表示任务生命周期的 Future 实例,通过该实例可以等待任务执行完成并获取执行结果、也可以在需要时通过该实例够取消任务的执行。

ExecutorService 接口中定义的用于提交任务的方法都会返回提交的任务的 Future。

使用举例:

ExecutorService es = Executors.newFixedThreadPool(10);
Future f = es.submit(new Callable() {
    @Override
    public Date call() throws Exception {
        return new Date();
    }
});
Date result = f.get();
System.out.println(result);

Future 的使用和实现详细见:juejin.cn/post/724844…

2.  ThreadPoolExecutor

ThreadPoolExecutor 是 Java 提供的一个基础的通用的线程线程池实现。可以直接创建一个ThreadPoolExecutor 对象来创建一个线程池,它提供了丰富的配置参数,通过指定不同参数值可以灵活的定制出符合我们需求的线程池出来。

ThreadPoolExecutor 实现了线程池一些复杂的操作和管理,使用时我们只需想好我们需要的线程池的执行策略,通过相关参数配置出这些策略即可。同时它还维护一些基本统计信息,通过相关的方法可以获取到。

ThreadPoolExecutor 提供了多个构造方法,其中最基础的构成方法如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

通过指定不同参数值我们可以定制出我们需要的线程池。

2.1.  线程池的大小

线程中线程的数量由核心线程数(corePoolSize)、最大线程数(maximumPoolSize)以及空闲存活时间(keepAliveTime)三个参数共同决定,同时这三个参数也控制了责线程的创建与销毁。

  • corePoolSize:核心线程数,线程池内常驻的线程数量。即使在没有任务执行时线程池内线程数也是这个大小,但当任务队列满了以后线程池是可以创建超出这个数量的线程的。
  • maximumPoolSize:线程池中线程数的最大大小,当任务队列满了以后并且没有空闲线程时可以创建超过核心线程数量的线程,但也不能无限制的新建,maximumPoolSize 控制了线程池内线程数的上限。
  • keepAliveTime:当线程池创建出超过核心线程数的线程后,多出的线程执行完任务后是要回收的,如果某个线程的空时间超过了空闲存活时间,那么这个线程将被终止。

Executors.newFixedThreadPool 指定的基本大小和最大大小为相同的值,这样活动的线程数量不会超过核心线程数的大小,同时因为线程数不会超过核心线程数的大小也就没有线程会被回收, 如此实现了数量恒定的线程池。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

Executors.newCachedThreadPool 指定基本大小为0而最大大小为 Integer.MAX_VALUE,同时指定线程空闲60秒后回收。这样配置相当于没有限制线程数,同时因为核心线程数为0,所有线程在空闲超过60秒后都会被回收。 实现了每次按需提交任务时若没有空闲线程则创建一个,若有则复用。线程池可以无限扩展,并且当需求降低时会自动收缩。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

2.2.  任务队列

使用固定大小的线程池来避免无限的创建线程导致的资源耗尽,但如果任务到底的速率超过了线程池处理的速率,新的任务到达时线程池内没有空闲线程了怎么处理这些没有分配到线程的任务那?

在 Executor 线程池框架中一般都包含一个任务等待队列。没有分配到线程的任务会进入到任务队列中去等待,而不会去竞争 CPU 资源。但有线程空闲下来时可以去任务队列中获取待执行的任务来运行。通过任务队列即可避免任务无限制的创建线程和任务直接丢失,也可以缓解任务突增的情况。

但如果任务提交的速率一直大于线程池处理的速率,线程池建一直处于忙碌状态,任务队列也可能被填满,资源可能会耗尽资源。

通过 ThreadPoolExecutor 构造方法中的 workQueue 参数可以指定一个阻 BlockingQueue 来保存等待执行的任务。在 ThreadPoolExecutor 中若当前线程数已经超过了核心线程数时提交的任务都将会被放到任务队列中去,当线程执行当前的任务后会主动去任务队列中获取任务并执行。

任务队大致可以分为三类:无界队列、有界队列和同步移交 。可以根据实际场景的需求选择合适的队列。 

2.2.1.  无界队列

无界队列的容量大小没有限制,可以一直添加元素直到内存资源耗尽。

Java 中提供的无界阻塞队列有不指定容量的 LinkedBlockingQueue。

newFixedThreadPool 和 newSingleThreadExecutor 默认情况下使用的就是一个无界的LinkedBlockingQueue。

Java线程池详解二:线程池的使用

但没有空闲线程来任务时任务将被放在队列中等待。使用无界队列时,如任务提交的速率一直大于线程池处理的速率那么队列将无限的增长,虽然无界队列没有限制队列的元素数量,但内存资源有限的,队列一直在增长最终建耗尽内存资源导致任务提交失败,严重时将导致系统崩溃。 

2.2.2.  有界队列

有界队列的容量有一个固定的值,其最大存放的元素数量是有限制的。

Java 提供的有界队列有 ArrayBlockingQueue、有界的 LinkedBlockingQueue 和 PriorityBlockingQueue。

使用无界队列存存在资源耗尽的风险,更稳妥的策略是使用有界队列。

使用有界队列时,队列的大小与线程池的大小需要一起调整。如果线程池较小而队列较大,那么有助于降低 CPU 的占用用率,同时还可以减少上下文切换,付出的代价是可能会限制吞吐量。

需要注意只有任务是互相独立的时才能使用有界队列,如果任务直接存在依赖,那么线程池使用有界队列可能会出现“饥饿”死锁问题。列如任务 A 提交任务 B 并等待 B 执行完成,若此时有界的队列满了,那么 B 任务可能被直接丢弃而 A 将一直等待不到 B 的执行结果,同时 A 也一直占用的线程资源。

有界队列虽然避免了资源耗尽问题,但又带来了新的问题:当任务队列填满后,新的任务该怎么办?这时就需要根据饱和策略来处理。

2.2.3.  同步移交队列

在线程数足够或没有大小限制的线程池中,因为线程足够所以任务并没有必要放置到队列中等待,这时就可以使用同步移交队列。同步移交队列是一个不存储元素的阻塞队列,每一个 put 操作都会等待 take 操作取走 put 的元素后才会返回。

Java 提供的同步移交队列有 SynchronousQueue。

使用 SynchronousQueue 作为任务队列,提交任务时会同步等待到有线程执行任务后才会返回。但若没有空闲线程并且线程数已经达到最多大不能创建新的线程了,那么这个任务将被拒绝。

只有线程池是无界的或者可以拒绝任务时,SynchronousQueue 才有实际价值。

在 newCachedThreadPool 工厂方法中就使用了 SynchronousQueue。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}

2.2.4.  有序的队列

提交到 ThreadPoolExecutor 中的任务大多都会被先放入到任务队列中去,所以任务队列的顺序在一定程度上代表了任务的执行顺序。

当使用像 LinkedBlockingQueue 或 ArrayBlockingQueug(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序 ,还可以使用PriorityBlockingQueue, 来安排任务的优先级。

2.3.  饱和策略

当使用有界队列并且队列被填满后,或向一个已经关闭的 Executor 提交任务时,饱和策略就会发挥作用。饱和策略通过 ThreadPoolExecutor 的 setRejectedExecutionHandler 方法进行设置。

饱和策略 RejectedExecutionHandler 是一个接口,只定义了一个 rejectedExecution 方法用于接收被拒绝的任务。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

JDK 提供了几种常用的包含策略,这些策略定义在 ThreadPoolExecutor 中的静态子类:

  • AbortPolicy:中止(Abort)策略,当任务被拒绝时改策略将抛出 RejectedExecutionException。AbortPolicy 是 ThreadPoolExecutor 默认的饱和策略。可在提交任务的地方捕获该异常,然后根据需求来等待后重新提交或者抛弃任务。
  • DiscardPolicy:丢弃策略,被拒绝的任务会被直接悄悄的丢弃。
  • DiscardOldestPolicy:“抛弃最旧的”策略,当有任务将被拒绝时将抛弃下一个将被执行的任务,然后尝试重新提交新的任务。如果是一个优先队列,那么该策略将导致优先级最高任务被抛弃,因此最好不要将该饱和策略和优先级队列放在一起使用。
  • CallerRunsPolicy:“调用者执行”策略,该策略既不会抛出异常也不会抛弃任务,而是在提交任务的线程中执行任务,即在调用 execute 的线程中执行。在队列满了之后使用提交任务的线程来执行任务可以降低任务提交的速率。

定制一个策略在任务被拒绝时在日志输出里出错误:

executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        LOG.error("rejected task" + ((FutureTask) r).toString());
    }
});

2.4.  线程工厂

每当线程池需要创建一个线程时,都是通过线程工厂来创建的。线程工厂 ThreadFactory 接口只定义了一个方法:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

在 ThreadPoolExecutor 构造方法中的 threadFactory 参数可以指定线程工厂,每当线程池需要创建一个新线程时都会调用线程工厂的 newThread 方法。通过指定一个线程工厂,可以定制线程池中的线程。

默认的线程工厂方法将创建一个新的、非守护的,并且不包含特殊的配置信息的线程。然而,在许多情况下都需要定制线程。例如,希望为线程池中的线程指定一个 UncaughtExceptionHandler,或者实例化一个定制的 Thread 类用于执行调试信息的记录、或只是希望给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。

例如为每个线程生成一个唯一的线程id:

ThreadFactory factory = new ThreadFactory() {
    private volatile int count = 1;
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "worker-" + (count++));
    }
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 10,
        0, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1),
        factory);

2.5.  修改ThreadPoolExecutor配置

在调用完 ThreadPoolExecutor 的构造函数后,仍然可以通过设置函数(Setter)来修改大多数构造函数指定的参数,例如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器 (Rejected Execution Handler)。

如果 Executor 是通过 Executors 中的某个 (newSingleThreadExecutor 除外)工厂方法创建的,那么可以将结果的类型转换为ThreadPoolExecutor 以访问设置器,如下程序将不限大小的 newCachedThreadPool线程池转为大小固定为 10 的线程池,并配置拒绝策略为丢弃:

 ExecutorService es = Executors.newCachedThreadPool();
ThreadPoolExecutor executor = (ThreadPoolExecutor) es;
// 修改核心线程数
executor.setCorePoolSize(10);
// 修改最大线程数
executor.setMaximumPoolSize(10);
// 修改线程空闲超时时间
executor.setKeepAliveTime(0, TimeUnit.SECONDS);
// 修改拒绝策略为丢弃策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

若希望 ThreadPoolExecutor 创建后在其它地方不被修改,可以使用 Executors 中提供的 unconfigurableExecutorService 工厂方法,该方法可以将一个现有的 ExecutorService 包装为一个 DelegatedExecutorService, DelegatedExecutorService 只包含了 ExecutorService 中提供的方法,所以包装后返回的 ExecutorService 也就没有了修改的 Setter 方法。

例如 newSingleThreadExecutor 方法返回的单线程的线程池其配置就不应该再被修改,因为如果后续修改了线程池的大小,那么就破坏了单线程执行的语义,可能产生线程安全问题。newSingleThreadExecutor 就是将基本大小和最大大小都设置为 1 的 ThreadPoolExecutor 封装为 FinalizableDelegatedExecutorService 返回。FinalizableDelegatedExecutorService 只在 DelegatedExecutorService上扩展了一个 finalize 方法,该方法只是再调用了 shutdown 方法。

2.6.  扩展ThreadPoolExecutor

ThreadPoolExecutor 提供了几个可以在子类化中改写的方法:beforeExecute、afterExecute和 terminated,通过在子类中覆盖这些这些方法可以扩展 ThreadPoolExecutor 的行为。

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 任务执行之前调用
    protected void beforeExecute(Thread t, Runnable r) { }
    // 任务执行之后调用
    protected void afterExecute(Runnable r, Throwable t) { }
    // 当线程池被关闭时调用
    protected void terminated() { }
}

在执行任务之前建调用 beforeExecute,如果 beforeExecute 抛出一个 RuntimeException,那么任务将不被执行,此时因为任务没有被执行所以 afterExecute 也不会被调用。

任务执行完成后建调用 afterExecute,无论任务是正常执行结束还是抛出异常意外终止,afterExecute 都会被调用。

在线程池完关闭之前将调用 terminated 方法,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。terminated 可以用来释放 Executor 在其生命周期里分配的各种资源。

在子类中覆盖 beforeExecute 、afterExecut 和 terminated 可以在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。

3.  常用线程池

Java 在 Executors 中提供了一些常用的线程池的静态工厂方法,通过调用对应的静态工厂方法来创建对应的线程池。这线程池都是指定不同的参数的 ThreadPoolExecutor,主要有以下几个线程池。

3.1.  newFixedThreadPool

创建一个指定线程数的线程池。但线程池内线程数达到指定数量后将不会再创建新的线程,而是建任务提交到队列中。任务队列为无界有序的 LinkedBlockingQueue。如果某个线程由于意外情况而终止那么线程池会补充一个新的线程。

优点:

  1. 有效的控制线程数,避免资源耗尽。
  2. 线程池内线程可以被重复利用避免每次创建线程和销毁消除的销毁

缺点:

  1. 线程池内线程创建后建一直存在直到线程池被关闭,若任务到达速率数量过低那么建导致一些线程闲置,白白占用资源。
  2. 无法应对请求突增的情况,但任务数突增时即使系统还有资源处理突增的请求,但线程池的大小已经确定从而使任务只能等待,资源利用率低。

3.2.  newCachedThreadPool

创建一个线程数没有限制的线程池。线程在执行完任务后空闲一分钟后将会被回收。使用的同步移交队列,在没有空闲线程时会创建新的线程接收任务。newCachedThreadPool 适用于任务处理时间短但任务量又比较大的场景。

优点:

  1. 线程数可以同任务数一起动态的增加和减少。
  2. 可以应对任务数徒增的情况。

** 缺点:**

  1. 线程数没有限制,可能造成资源耗尽。
  2. 在空闲很短时间后线程就会被回收,可能某段时间内任务数较多时又要重新创建大量的线程,导致线程的利用率不高。

3.3.  newSingleThreadExecutor

创建只有一个常驻线程的线程池,同时最大线程数也是一。如果这个线程因意外而终止,会创建一个新的线程来替代。使用的任务队列为无界有序的 LinkedBlockingQueue。任务会按照提交的顺序来串行执行。

由于其始终只有一个线程在执行所以没有线程安全的问题。为了保证线程池线程安全的语义不会被破坏,需要始终是单线程的,所以 newSingleThreadExecutor 返回的是创建后不可修改的 DelegatedExecutorService 类型的 ExecutorService。

newSingleThreadExecutor 适用与需要顺序处理其耗时不长的任务。

优点:

  1. 由于始终只有一个线程,所以可以避免一些线程安全问题,在编码时也可以不考虑同步等线程安全简化了编码难度。

缺点:

  1. 只有一个线程,所以不适合大量任务或耗时较长的任务。
  2. 只有一个线程,无法重复利用CPU的多核性能。

3.4.  newScheduledThreadPool

创建一个 ScheduledExecutorService 线程池,ScheduledExecutorService 表示一个可以延迟执行和定时执行任务的线程池。

延迟执行

schedule 方法用于提交一个延迟执行的任务。

 public  ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);

定时执行

scheduleAtFixedRate 提交一个延迟执行的任务。

public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

任务第一次执行在 initialDelay 指定的时间之后开始,之后任务执行完成后等待 delay 后才会发起新的执行。若执行中发生了异常,则后面将不会在执行。提交的任务可以通过返回的Future来取消。

周期执行

scheduleWithFixedDelay  提交一个周期执行的任务。

public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

任务第一次执行在initialDelay指定的验收之后开始,之后每隔delay执行一次。若执行中发生了一次,则后面将不会再执行。提交的任务可以通过返回的Future来取消。

4.  线程池的使用

4.1.  设置线程池大小

线程池的大小取决于提交的任务的类型以及所部署的环境。要想正确地设置线程池的大小,须要分析所在环境的资源预算和任务的特性。例如在部署的环境中有多少个 CPU?多大的内存?任务是计算密集型、IO密集型还是二者皆可?它们是否还需要像 JDBC 连接这样的稀缺资源?

幸运的是,要设置线程池的大小也并不困难,只需要避免“过大”和“过小”这两种极端情识。如果线程池过大,那么大量的线程将在相对很少的 CPU 和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空闲的处理器资源无法执行工作,从而降低吞吐率。

对于计算密集型的任务,在拥有 N个处理器的系统上,当线程池的大小为 N+1 时,通常能实现最优的利用率。即使当计算密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费。对于包含 IO 操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。

当然 CPU 并不是唯一影响线程池大小的资源,其它还包括内存、文件句柄、套接字句柄和数据库连接等。计算这些资源对线程池的约束条件是更容易的:计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得到结果就是线程池大小的上限。

当任务需要某种通过资源池来管理的资源时,例如数据库连接,那么线程池和资源池的大小将会相互影响。如果每个任务都需要一个数据库连接,那么连接池的大小就限制了线程池的大小。同样,当线程池中的任务是数据库连接的唯一使用者时,那么线程池的大小又将限制连接池的大小。

如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。

4.2  任务与执行策略之间的隐形耦合

虽然 Executor 框架将任务的提交与任务的执行策略解耦开来,为制定和修改执行策略都提供了相当大的灵活性,但有的任务的执行依赖于指定的执行策略,这就使得任务与执行策略之间形成了耦合,包括:

  • 使用线程封闭机制的任务:与线程池相比,单线程的 Executor 能够对线程安全性做出更强的承诺。它们能确保任务不会并发地执行,使代码能够放宽对线程安全的要求,对象可以封闭在任务线程中,在该线程中执行的任务在访问该对象时不需要同步,即使这些资源不是线程安全的也没有问题。这种情形将在任务与执行策略之间形成隐式的耦合,任务要求其执行所在的 Executor 是单线程的。如果将 Executor 从单线程环境改为线程池环境,那么将会失去线程安全性。
  • 依赖性任务:大多数行为正确的任务都是独立的:它们不依赖于其他任务的执行时序、执行结果或其他效果。当在线程池中执行独立的任务时,可以随意地改变线程池的大小和配置,这些修改只会对执行性能产生影响。然而,如果提交给线程池的任务需要依赖其他的任务,那么就隐含地给执行策略带来了约束,此时必须小心地维持这些执行策略以避免产生线程锁的死锁问题。
  • 对响应时间敏感的任务:GUI 应用程序对于响应时间是敏感的:如果用户在点击按钮后需要很长延迟才能得到可见的反馈,那么他们会感到不满。如果将一个运行时间较长的任务提交到单线程的 Executor 中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程中,那么将降低由该 Executor 管理的服务的响应性。
  • 使用 ThreadLocal 的任务:ThreadLocal 使每个线程都可以拥有某个变量的一个私有“版本”。然而,只要条件允许,Executor 可以自由地重用这些线程。在标准的 Executor 实现中,当执行需求较低时将回收空闲线程,而当需求增加时将添加新的线程,并且如果从任务中抛出了一个未检查异常,那么将用一个新的工作者线程来替代抛出当线程。只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用 ThreadLocal 才意义,而在线程池的线程中不应该使用 ThreadLocal 在任务之间传递值。

只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。幸运的是,在典型服务器应用程序中它们的请求通常都是同类型的并且相互独立的。

在一些任务中,需要拥有或排除某种特定的执行第略。例如某些任务依赖于其他的任务,那么会要求线程池足够大,从而确保它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。通过将这些需求写入文档,将来的代码维护人员就不会由于使用了某种不合适的执行第略而破坏安全性或活跃性。

4.3  线程饥饿死锁

在线程池中,如果任务依赖于其他任务,那么可能产生死锁。在单线程的 Executor 中,如果一个任务将另一个任务提交到同一个 Executor,并且等待这个被提交任务的结果,那么通常会引发死锁。第二个任务停留在工作队列中,等待第一个任务完成后获得线程的执行资源,而第一个任务又无法完成,因为它在等待第二个任务的完成。

在更大的线程池中,如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,那么会发生同样的问题。这种现象被称为线程饥饿死锁(Thread Starvation Deadlock),只要线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。

每当提交了一个有依赖性的 Executor 任务时,要清楚地知道可能会出现线程‘饥饿’死锁,因此需要在代码或配置 Executor 的配置文件中记录线程池的大小限制或配置限制。

4.4.  运行时间较长的任务

如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。执行时间较长的任务不仅会造成线程池堵塞,甚至还会增加执行时间较短任务的服务时间。如果线程池中线程的数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能所有的线程都会运行这些执行时间较长的任务,从而影响整体的响应性。

有一项技术可以缓解执行时间较长任务造成的影响,即限定任务等待资源的时间,而不要无限制地等待。大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如 Thread.join、 BlockingQueue.put、 CountDownLatch.await 以及 Selector.select 等。如果等待超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。这样,无论任务的最终结果是否成功,这种办法都能确保任务总能继续执行下去,并将线程释放出来以执行一些能更快完成的任务。如果在线程池中总是充满了被阻塞的任务,那么也可能表明线程池的规模过小。

5.  关闭线程池

Executor 的实现通常会创建线程来执行任务,但 JVM 只有在所有(非守护)线程全部终止后才会退出。因此,如果无法正确地关闭 Executor,那么JVM将无法结束。同时若 Executor 已经无用了,应该及时回收线程池中的线程和其它线程池可能占用的资源。所以线程池也需要是可关闭的(无论采用平缓的方式还是粗暴的方式),并在关闭操作中将受影响的任务的状态反馈给应用程序。

在Executor上扩展出的 ExecutorService 接口中提供了关闭相关的方法:

public interface ExecutorService extends Executor {
    void shutdown();
    List shutdownNow()
    boolean isShutdown()
    boolean isTerminated ();
    boolean awaitTrermination (long timeout, Timeunit unit) throws InterruptedException;
}

shutdown 方法启动有序的关闭线程池,先前提交但还未任务还会被执行,但不会再接受新任务。如果已经关闭,则重复调用没有其它影响。

shutdownNow 方法尝试停止所有任务,包括提交了但还未开始执行的任务,并返回等待的执行的任务列表。所有未执行的任务将被取消并返回,执行中的任务将会发送中断请求尝试终止。该方法并不会等待所有的任务都终止,若过需要可以使用 awaitTermination 来完成这个操作。

在 ExecutorService 关闭后提交的任务将由“拒绝执行处理器 (Rejected Execution Handler)”来处理。等所有任务都完成后,可以调用 awaitTermination 来等待 ExeeutorService 到达终止状态,或者通过调用 isTerminated 来轮询 ExecutorService 是否已经终止。

6.  CompletionService

如果向 Executor 提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的 Future,然后反复使用 get 方法,同时将 timeout 参数指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService)。

CompletionService 将 Executor 和 BlockingQueue 的功能融合在一起。可以将 Callable 或 Runnable 任务提交给它来执行,然后使用类似于队列操作的 take 和 poll 等方法来获得已完成的结果。

public interface CompletionService {
    // 提交一个Callable任务,并返回代表该任务的Future  
    Future submit(Callable task);
    // 提交一个Runnable任务,执行成功后返回指定的task作为结果 可运行的任务以执行,并返回该任务的Future。在完成后,可以获取或轮询此任务。
    Future submit(Runnable task, V result);
}

ExecutorCompletionService 实现了 CompletionService,并将计算部分委托给一个Executor。

ExecutorCompletionService的实现非常简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是 FutureTask 的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue 中。当计算完成时,调用FutureTask中的done方法。如下程序:

public class ExecutorCompletionService implements CompletionService {
    private final BlockingQueue> completionQueue;
    
    private class QueueingFuture extends FutureTask {
        private final Future task;
        QueueingFuture (Callable c) {
            super(task, null);
            this.task = task;
        }
                
        protected void done () {
            completionQueue.add(task);
        }
    }
}

take和poll方法委托给了BlockingQueue,这些方法会在得出结果之前阻塞。

多个 ExecutorCompletionService 可以共享一个Executor,因此可以创建一个对于特定计算私有,又能共享一个公共Executor的ExecutorCompletionService。因此,CompletionService 的作用就相当于一组计算的句柄,这与Future作为单个计算的句柄是非常类似的。通过记录提交给CompletionService 的任务数量,并计算出已经获得的已完成结果的数量,即使使用一个共享的 Executor,也能知道已经获得了所有任务结果的时间。