likes
comments
collection
share

详解Java当中的线程池

作者站长头像
站长
· 阅读数 13

前言

学习Java也有一年半的时间了,前段时间做的项目,基本都用到了线程池技术,甚至是动态线程池技术。所以线程池作为一个如此重要的工具,也应当好好的去总结一下,这也是面试几乎必问的一个知识点。回头过来想,线程池的设计真的是精妙,它巧妙的解决了创建与销毁线程消耗过大、自创建线程难以管理的一些痛点,将多核CPU的优势展现出来。接下来,我会从简单的创建线程方式开始,一直到复杂的线程池列举自己的一些笔记以及思考。

线程的创建方式

第一种:继承Thread类

说到线程的创建方式,我想大家脑子里蹦出的一定是new Thread 创建一个线程类,简单的就直接使用lambda表达式。

new Thread(() -> System.out.println(Thread.currentThread().getName())).start();

}

还有就是自定义一个类,继承Thread,重写其run方法。在new 出来之后,调用其start方法。

public class ExtendsThread extends Thread {
    @Override
    public void run() {
        System.out.println("用Thread类实现线程");
    }
}

第二种:实现Runnable接口,实现其run方法,使用时也和Thread相似。

public class RunnableThread implements Runnable {
    @Override
    public void run() {
        System.out.println('用实现Runnable接口实现线程');
    }
}

还有么?对。线程池的时候,你也会创建线程,不过这个线程的创建方式其实和第一种一样。这是线程池中的一个默认线程创建工厂源码。

static class DefaultThreadFactory implements ThreadFactory {
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
            Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

注:这也就是为啥,我们如果使用了默认线程池,在控制台中就会打印pool-1-thread-1等字样。这个我们其实也可以自己重写,为线程修改成我们自己的业务名字 + 线程号

所以对于线程池而言,本质上是通过线程工厂创建线程的,默认采用 DefaultThreadFactory ,它会给线程池创建的线程设置一些默认值,比如:线程的名字、是否是守护线程,以及线程的优先级等。但是无论怎么设置这些属性,最终它还是通过 new Thread() 创建线程的 ,只不过这里的构造函数传入的参数要多一些,由此可以看出通过线程池创建线程并没有脱离最开始的那两种基本的创建方式,因为本质上还是通过 new Thread() 实现的。

还有么?应该还有吧,像Callable、FutrueTask等等一些包装类,也都是会创建新的线程去执行任务的。

不过,我列举了这么多创建线程的方式,我反而想说其实真正创建线程的还是第一种new Thread。其他的方式也只是对这条语句进行了更加进一步的封装。例如上述的线程工厂类就有展示。以及我们在实现lambda、Callable、FutureTask也都是先创建出来,然后扔进new Thread 在调用start方法。

在Thread的源码中的一段就体现了这一点,target就是在Thread构造器中传入的任务对象。

@Override

public void run() {

    if (target != null) {

        target.run();

    }

}

所以,个人认为创建线程的方式只有 new Thread 一种。

多线程下的性能问题

线程的存在,让我们可以在同一时间执行多个任务。但是,对于多线程而言,线程的之间并不是孤立的,而是相互合作、调度的也许你会奇怪,我们使用多线程的最大目的不就是为了提高性能吗?让多个线程同时工作,加快程序运行速度,为什么反而会带来性能问题呢?这是因为单线程程序是独立工作的,不需要与其他线程进行交互,但多线程之间则需要调度以及合作,调度与合作就会带来性能开销从而产生性能问题。

为什么多线程会带来性能问题

那么什么情况下多线程编程会带来性能问题呢? 主要有两个方面,一方面是线程调度,另一个方面是线程协作。

调度开销

上下文切换 首先,我们看一下线程调度,在实际开发中,线程数往往是大于 CPU 核心数的,比如 CPU 核心数可能是 8 核、16 核,等等,但线程数可能达到成百上千个。这种情况下,操作系统就会按照一定的调度算法,给每个线程分配时间片,让每个线程都有机会得到运行。而在进行调度时就会引起上下文切换,上下文切换会挂起当前正在执行的线程并保存当前的状态,然后寻找下一处即将恢复执行的代码,唤醒下一个线程,以此类推,反复执行。但上下文切换带来的开销是比较大的,假设我们的任务内容非常短,比如只进行简单的计算,那么就有可能发生我们上下文切换带来的性能开销比执行线程本身内容带来的开销还要大的情况。

缓存失效

不仅上下文切换会带来性能问题,缓存失效也有可能带来性能问题。由于程序有很大概率会再次访问刚才访问过的数据,所以为了加速整个程序的运行,会使用缓存,这样我们在使用相同数据时就可以很快地获取数据。可一旦进行了线程调度,切换到其他线程,CPU就会去执行不同的代码,原有的缓存就很可能失效了,需要重新缓存新的数据,这也会造成一定的开销,所以线程调度器为了避免频繁地发生上下文切换,通常会给被调度到的线程设置最小的执行时间,也就是只有执行完这段时间之后,才可能进行下一次的调度,由此减少上下文切换的次数。

那么什么情况会导致密集的上下文切换呢?如果程序频繁地竞争锁,或者由于 IO 读写等原因导致频繁阻塞,那么这个程序就可能需要更多的上下文切换,这也就导致了更大的开销,我们应该尽量避免这种情况的发生。

协作开销

除了线程调度之外,线程协作同样也有可能带来性能问题。因为线程之间如果有共享数据,为了避免数据错乱,为了保证线程安全,就有可能禁止编译器和 CPU 对其进行重排序等优化,也可能出于同步的目的,反复把线程工作内存的数据 flush 到主存中,然后再从主内存 refresh 到其他线程的工作内存中,等等。这些问题在单线程中并不存在,但在多线程中为了确保数据的正确性,就不得不采取上述方法,因为线程安全的优先级要比性能优先级更高,这也间接降低了我们的性能。

为什么要使用线程池?

在最初的时候,是没有线程池这个技术的,我们只能手动的去创建线程,去管理线程。

public class Test { 

    public static void main(String[] args) { 

        Thread thread = new Thread(new Task());

        thread.start();

    } 

    static class Task implements Runnable { 

        public void run() { 

           System.out.println("Thread Name: " + Thread.currentThread().getName());

        } 

    } 

}
//输出:Thread Name: Thread-0

主线程调用 start() 方法,启动了一个子线程。这是在一个任务的场景下,随着我们的任务增多,比如现在有 10 个任务了,那么我们就可以使用 for 循环新建 10 个子线程,如代码所示。

public class Test { 

    public static void main(String[] args) { 

        for (int i = 0; i < 10; i++) { 

            Thread thread = new Thread(new Task());

            thread.start();

        } 

    } 

    static class Task implements Runnable { 

        public void run() { 

            System.out.println("Thread Name: " + Thread.currentThread().getName());

        } 

    } 

}
/**
执行结果:

Thread Name: Thread-1

Thread Name: Thread-4

Thread Name: Thread-3

Thread Name: Thread-2

Thread Name: Thread-0

Thread Name: Thread-5

Thread Name: Thread-6

Thread Name: Thread-7

Thread Name: Thread-8

Thread Name: Thread-9
**/

这里你会发现,打印出来的顺序是错乱的,比如 Thread-4 打印在了 Thread-3 之前,这是因为,虽然 Thread-3 比 Thread-4 先执行 start 方法,但是这并不代表 Thread-3 就会先运行,运行的顺序取决于线程调度器,有很大的随机性,这是需要我们注意的地方。

我们再看,主线程通过 for 循环创建了 t0~t9 这 10 个子线程,它们都可以正常的执行任务,但如果此时我们的任务量突然飙升到 10000 会怎么样?我们先来看看依然用 for 循环的实现方式:

for (int i = 0; i < 10000; i++) { 

    Thread thread = new Thread(new Task());

    thread.start();

}

我们创建了 10000 个子线程,而 Java 程序中的线程与操作系统中的线程是一一对应的,此时假设线程中的任务需要一定的耗时才能够完成,便会产生很大的系统开销与资源浪费。

创建线程时会产生系统开销,并且每个线程还会占用一定的内存等资源,更重要的是我们创建如此多的线程也会给稳定性带来危害,因为每个系统中,可创建线程的数量是有一个上限的,不可能无限的创建。线程执行完需要被回收,大量的线程又会给垃圾回收带来压力。但我们的任务确实非常多,如果都在主线程串行执行,那效率也太低了,那应该怎么办呢?于是便诞生了线程池来平衡线程与系统资源之间的关系

总结

  • 反复创建线程系统开销比较大,每个线程创建和销毁都需要时间,如果任务比较简单,那么就有可能导致创建和销毁线程消耗的资源比线程执行任务本身消耗的资源还要大。
  • 过多的线程会占用过多的内存等资源,还会带来过多的上下文切换,同时还会导致系统不稳定。

线程池如何解决上述两个问题

针对上面的两点问题,线程池有两个解决思路。

首先,针对反复创建线程开销大的问题,线程池用一些固定的线程一直保持工作状态并反复执行任务。

其次,针对过多线程占用太多内存资源的问题,解决思路更直接,线程池会根据需要创建线程,控制线程的总数量,避免占用过多内存资源。

如何使用线程池

线程池就好比一个池塘,池塘里的水是有限且可控的,比如我们选择线程数固定数量的线程池,假设线程池有 5 个线程,但此时的任务大于 5 个,线程池会让余下的任务进行排队,而不是无限制的扩张线程数量,保障资源不会被过度消耗。如代码所示,我们往 5 个线程的线程池中放入 10000 个任务并打印当前线程名字,结果会是怎么样呢?

public class Test { 

    public static void main(String[] args) { 

        ExecutorService service = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 10000; i++) { 

            service.execute(new Task());

        } 

    System.out.println(Thread.currentThread().getName());

    } 

    static class Task implements Runnable { 

        public void run() { 

            System.out.println("Thread Name: " + Thread.currentThread().getName());

        } 

    } 

}
/**
Thread Name: pool-1-thread-1

Thread Name: pool-1-thread-2

Thread Name: pool-1-thread-3

Thread Name: pool-1-thread-4

Thread Name: pool-1-thread-5

Thread Name: pool-1-thread-5

Thread Name: pool-1-thread-5

Thread Name: pool-1-thread-5

Thread Name: pool-1-thread-5

Thread Name: pool-1-thread-2

Thread Name: pool-1-thread-1

......
**/

如打印结果所示,打印的线程名始终在 Thread Name: pool-1-thread-1~5 之间变化,并没有超过这个范围,也就证明了线程池不会无限制地扩张线程的数量,始终是这5个线程在工作。

使用线程池的好处

  • 线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度。因为线程池中的线程是可以复用的,我们只用少量的线程去执行大量的任务,这就大大减小了线程生命周期的开销。而且线程通常不是等接到任务后再临时创建,而是已经创建好时刻准备执行任务,这样就消除了线程创建所带来的延迟,提升了响应速度,增强了用户体验。
  • 线程池可以统筹内存和 CPU 的使用,避免资源使用不当。线程池会根据配置和任务数量灵活地控制线程数量,不够的时候就创建,太多的时候就回收,避免线程过多导致内存溢出,或线程太少导致 CPU 资源浪费,达到了一个完美的平衡。
  • 线程池可以统一管理资源。比如线程池可以统一管理任务队列和线程,可以统一开始或结束任务,比单个线程逐一处理任务要更方便、更易于管理,同时也有利于数据统计,比如我们可以很方便地统计出已经执行过的任务的数量。

线程池的各个参数详解

下图展示了线程池中的主要参数

详解Java当中的线程池

接下来,看看两个参数所代表的的具体含义以及线程池中创建线程的时机,如下图所示。

详解Java当中的线程池

当提交任务后,线程池首先会检查当前线程数,如果此时线程数小于核心线程数,比如最开始线程数量为 0,则新建线程并执行任务,随着任务的不断增加,线程数会逐渐增加并达到核心线程数,此时如果仍有任务被不断提交,就会被放入 workQueue 任务队列中,等待核心线程执行完当前任务后重新从 workQueue 中提取正在等待被执行的任务。

此时,假设我们的任务特别的多,已经达到了 workQueue 的容量上限,这时线程池就会启动后备力量,也就是 maximumPoolSize 最大线程数,线程池会在 corePoolSize 核心线程数的基础上继续创建线程来执行任务,假设任务被不断提交,线程池会持续创建线程直到线程数达到 maximumPoolSize 最大线程数,如果依然有任务被提交,这就超过了线程池的最大处理能力,这个时候线程池就会拒绝这些任务,我们可以看到实际上任务进来之后,线程池会逐一判断 corePoolSize、workQueue、maximumPoolSize,如果依然不能满足需求,则会拒绝任务。

corePoolSize 与 maximumPoolSize

通过上面的流程图,我们了解了 corePoolSize 和 maximumPoolSize 的具体含义,corePoolSize 指的是核心线程数,线程池初始化时线程数默认为 0,当有新的任务提交后,会创建新线程执行任务,如果不做特殊设置,此后线程数通常不会再小于 corePoolSize ,因为它们是核心线程,即便未来可能没有可执行的任务也不会被销毁。随着任务量的增加,在任务队列满了之后,线程池会进一步创建新线程,最多可以达到 maximumPoolSize 来应对任务多的场景,如果未来线程有空闲,大于 corePoolSize 的线程会被合理回收。所以正常情况下,线程池中的线程数量会处在 corePoolSize 与 maximumPoolSize 的闭区间内。

正式员工(corePoolSize)与零时工(maxPoolSize - corePoolSize)

简单形容一下,核心线程,就相当与公司的正式员工,离不开公司,平常情况下事情没那么多,正式员工就可以搞定了。但是,总有业务繁忙的时候,这个时候就会急招一些临时工去干一些活,减轻业务压力,到了业务不繁忙的时候,临时工也也就改拿工资走人了。

keepAliveTime+时间单位

当线程池中线程数量多于核心线程数时,而此时又没有任务可做,线程池就会检测线程的 keepAliveTime,如果超过规定的时间,无事可做的线程就会被销毁,以便减少内存的占用和资源消耗。如果后期任务又多了起来,线程池也会根据规则重新创建线程,所以这是一个可伸缩的过程,比较灵活,我们也可以用 setKeepAliveTime 方法动态改变 keepAliveTime 的参数值。

ThreadFactory 线程工厂

ThreadFactory 实际上是一个线程工厂,它的作用是生产线程以便执行任务。我们可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程,我们也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名。

WorkQueue 工作队列

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。

  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

  • DelayedWorkQueue:延迟队列,其是基于“堆”这个数据结构实现的。可以想想为什么延迟队列使用堆作为数据结构?(堆顶始终是下一个需要执行的任务呀!)

Handler 拒绝策略

拒绝时机

一般有两种情况,会对提交的任务进行拒绝

  • 使用shotdown方法销毁线程池的时候
  • 当线程数到达最大线程数并且任务队列中积压的任务到达最大值的时候
拒绝策略

新建线程池时可以指定它的任务拒绝策略,例如

newThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),

   new ThreadPoolExecutor.DiscardOldestPolicy());

详解Java当中的线程池 而JDK为我们提供了四种不同的拒绝策略

  • AbortPolicy,这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略。

  • DiscardPolicy,这种拒绝策略正如它的名字所描述的一样,当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。

  • DiscardOldestPolicy,如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险。

  • CallerRunsPolicy,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处:

    • 新提交的任务不会被丢弃,这样也就不会造成业务损失。
    • 由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。

JDK提供的几个常用线程池

FixedThreadPool

第一种线程池叫作 FixedThreadPool,它的核心线程数和最大线程数是一样的,所以可以把它看作是固定线程数的线程池,它的特点是线程池中的线程数除了初始阶段需要从 0 开始增加外,之后的线程数量就是固定的,就算任务数超过线程数,线程池也不会再创建更多的线程来处理任务,而是会把超出线程处理能力的任务放到任务队列中进行等待。而且就算任务队列满了,到了本该继续增加线程数的时候,由于它的最大线程数和核心线程数是一样的,所以也无法再增加新的线程了。 它采用的任务队列是LinkedBlockingQueue,因为其最大线程数目与核心线程数目相同,且固定,因此需要使用一个队列大小为Integer.MAX_VALUE的队列(无界队列)来存放任务。

CachedThreadPool

第二种线程池是 CachedThreadPool,可以称作可缓存线程池,它的特点在于线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE,为 2^31-1,这个数非常大,所以基本不可能达到),而当线程闲置时还可以对线程进行回收。也就是说该线程池的线程数量不是固定不变的,当然它也有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。

当我们提交一个任务后,线程池会判断已创建的线程中是否有空闲线程,如果有空闲线程则将任务直接指派给空闲线程,如果没有空闲线程,则新建线程去执行任务,这样就做到了动态地新增线程。让我们举个例子,如下方代码所示。

ScheduledThreadPool

第三个线程池是 ScheduledThreadPool,它支持定时或周期性执行任务。比如每隔 10 秒钟执行一次任务,而实现这种功能的方法主要有 3 种,如代码所示:

ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

//延迟指定时间后执行一次任务就结束
service.schedule(new Task(), 10, TimeUnit.SECONDS);
//固定频率执行任务,第二个参数 initialDelay 表示第一次延时时间,第三个参数 period 表示周期,也就是第一次延时后每次延时多长时间执行一次任务
service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
//以任务结束的时间为下一次循环的时间起点开始计时
service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);

SingleThreadExecutor

第四种线程池是 SingleThreadExecutor,它会使用唯一的线程去执行任务,原理和 FixedThreadPool 是一样的,只不过这里线程只有一个,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景,而前几种线程池不一定能够保障任务的执行顺序等于被提交的顺序,因为它们是多线程并行执行的。 其队列的选择依旧是LinkedBlockingQueue,因为是固定的线程数,依旧选择无界队列。

SingleThreadScheduledExecutor

第五个线程池是 SingleThreadScheduledExecutor,它实际和第三种 ScheduledThreadPool 线程池非常相似,它只是 ScheduledThreadPool 的一个特例,内部只有一个线程,如源码所示:

new ScheduledThreadPoolExecutor(1)

它只是将 ScheduledThreadPool 的核心线程数设置为了 1。

总结上述的五种线程池,我们以核心线程数、最大线程数,以及线程存活时间三个维度进行对比,如表格所示。

线程池采用的任务队列
FixedThreadPoolLinkedBlockingQueue
SingleThreadExecutorLinkedBlockingQueue
CacheThreadPoolSynchronousQueue
ScheduledThreadPoolDelayedWorkQueue
SingleThreadScheduledExecutorDelayedWorkQueue

第一个线程池 FixedThreadPool,它的核心线程数和最大线程数都是由构造函数直接传参的,而且它们的值是相等的,所以最大线程数不会超过核心线程数,也就不需要考虑线程回收的问题,如果没有任务可执行,线程仍会在线程池中存活并等待任务。

第二个线程池 CachedThreadPool 的核心线程数是 0,而它的最大线程数是 Integer.MAX_VALUE,线程数一般是达不到这么多的,所以如果任务特别多且耗时的话,CachedThreadPool 就会创建非常多的线程来应对。

线程池的核心线程数 与 最大线程数该如何进行选择

CPU 密集型任务

首先,我们来看 CPU 密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍以上,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降。

针对这种情况,我们最好还要同时考虑在同一台机器上还有哪些其他会占用过多 CPU 资源的程序在运行,然后对资源使用做整体的平衡。

IO密集型任务

第二种任务是耗时 IO 型,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在任务队列中等待的任务就会减少,可以更好地利用资源。

《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法:

线程数 = CPU 核心数 * (1+平均等待时间/平均工作时间)

通过这个公式,我们可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。

太少的线程数会使得程序整体性能降低,而过多的线程也会消耗内存等其他资源,所以如果想要更准确的话,可以进行压测,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量应该创建的线程数,合理并充分利用资源。

综上所述我们就可以得出一个结论:

  • 线程的平均工作时间所占比例越高,就需要越少的线程;
  • 线程的平均等待时间所占比例越高,就需要越多的线程;
  • 针对不同的程序,进行对应的实际测试就可以得到最合适的选择。

如何根据实际需要,定制自己的线程池?

核心线程数

第一个需要设置的参数往往是 corePoolSize 核心线程数,在上一课时我们讲过,合理的线程数量和任务类型,以及 CPU 核心数都有关系,基本结论是线程的平均工作时间所占比例越高,就需要越少的线程;线程的平均等待时间所占比例越高,就需要越多的线程。而对于最大线程数而言,如果我们执行的任务类型不是固定的,比如可能一段时间是 CPU 密集型,另一段时间是 IO 密集型,或是同时有两种任务相互混搭。那么在这种情况下,我们可以把最大线程数设置成核心线程数的几倍,以便应对任务突发情况。当然更好的办法是用不同的线程池执行不同类型的任务,让任务按照类型区分开,而不是混杂在一起,这样就可以按照上一课时估算的线程数或经过压测得到的结果来设置合理的线程数了,达到更好的性能。

阻塞队列

对于阻塞队列这个参数而言,我们可以选择之前介绍过的 LinkedBlockingQueue 或者 SynchronousQueue 或者 DelayedWorkQueue,不过还有一种常用的阻塞队列叫 ArrayBlockingQueue,它也经常被用于线程池中,这种阻塞队列内部是用数组实现的,在新建对象的时候要求传入容量值,且后期不能扩容,所以 ArrayBlockingQueue 的最大的特点就是容量是有限的。这样一来,如果任务队列放满了任务,而且线程数也已经达到了最大值,线程池根据规则就会拒绝新提交的任务,这样一来就可能会产生一定的数据丢失。

但相比于无限增加任务或者线程数导致内存不足,进而导致程序崩溃,数据丢失还是要更好一些的,如果我们使用了 ArrayBlockingQueue 这种阻塞队列,再加上我们限制了最大线程数量,就可以非常有效地防止资源耗尽的情况发生。此时的队列容量大小和 maxPoolSize 是一个 trade-off,如果我们使用容量更大的队列和更小的最大线程数,就可以减少上下文切换带来的开销,但也可能因此降低整体的吞吐量;如果我们的任务是 IO 密集型,则可以选择稍小容量的队列和更大的最大线程数,这样整体的效率就会更高,不过也会带来更多的上下文切换。

线程工厂

对于线程工厂 threadFactory 这个参数,我们可以使用默认的 defaultThreadFactory,也可以传入自定义的有额外能力的线程工厂,因为我们可能有多个线程池,而不同的线程池之间有必要通过不同的名字来进行区分,所以可以传入能根据业务信息进行命名的线程工厂,以便后续可以根据线程名区分不同的业务进而快速定位问题代码。 例如下面这个简单的重写

ThreadFactory threadFactory = new ThreadFactory() {
	private int cnt = 1;
	@Override
	public Thread newThread(@NotNull Runnable r) {
		Thread thread = new Thread(r);
		thread.setName("线程" + cnt++);
		return thread;
	}
};

拒绝策略

最后一个参数是拒绝策略,我们可以根据业务需要,选择四种拒绝策略之一来使用:AbortPolicy,DiscardPolicy,DiscardOldestPolicy 或者 CallerRunsPolicy。除此之外,我们还可以通过实现 RejectedExecutionHandler 接口来实现自己的拒绝策略,在接口中我们需要实现 rejectedExecution 方法,在 rejectedExecution 方法中,执行例如打印日志、暂存任务、重新执行等自定义的拒绝策略,以便满足业务需求。

private static class CustomRejectionHandler implements RejectedExecutionHandler { 
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
        //打印日志、暂存任务、重新执行等拒绝策略
    } 
}

总结

所以定制自己的线程池和我们的业务是强相关的,首先我们需要掌握每个参数的含义,以及常见的选项,然后根据实际需要,比如说并发量、内存大小、是否接受任务被拒绝等一系列因素去定制一个非常适合自己业务的线程池,这样既不会导致内存不足,同时又可以用合适数量的线程来保障任务执行的效率,并在拒绝任务时有所记录方便日后进行追溯。

如何正确关闭线程池?以及 shutdown() 与 shutdownNow() 方法的区别?

首先,我们创建一个线程数固定为 10 的线程池,并且往线程池中提交 100 个任务,如代码所示。

ExecutorService service = Executors.newFixedThreadPool(10);

 for (int i = 0; i < 100; i++) { 

    service.execute(new Task());

 }

那么如果现在我们想关闭该线程池该如何做呢?本课时主要介绍 5 种在 ThreadPoolExecutor 中涉及关闭线程池的方法,如下所示。

void shutdown;
boolean isShutdown;
boolean isTerminated;
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
List shutdownNow;

shutdown()

第一种方法叫作 shutdown(),它可以安全地关闭一个线程池,调用 shutdown() 方法之后线程池并不是立刻就被关闭,因为这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。但这并不代表 shutdown() 操作是没有任何效果的,调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。

isShutdown()

第二个方法叫作 isShutdown(),它可以返回 true 或者 false 来判断线程池是否已经开始了关闭工作,也就是是否执行了 shutdown 或者 shutdownNow 方法。这里需要注意,如果调用 isShutdown() 方法的返回的结果为 true 并不代表线程池此时已经彻底关闭了,这仅仅代表线程池开始了关闭的流程,也就是说,此时可能线程池中依然有线程在执行任务,队列里也可能有等待被执行的任务。

isTerminated()

第三种方法叫作 isTerminated(),这个方法可以检测线程池是否真正“终结”了,这不仅代表线程池已关闭,同时代表线程池中的所有任务都已经都执行完毕了,因为我们刚才说过,调用 shutdown 方法之后,线程池会继续执行里面未完成的任务,不仅包括线程正在执行的任务,还包括正在任务队列中等待的任务。比如此时已经调用了 shutdown 方法,但是有一个线程依然在执行任务,那么此时调用 isShutdown 方法返回的是 true ,而调用 isTerminated 方法返回的便是 false ,因为线程池中还有任务正在在被执行,线程池并没有真正“终结”。直到所有任务都执行完毕了,调用 isTerminated() 方法才会返回 true,这表示线程池已关闭并且线程池内部是空的,所有剩余的任务都执行完毕了。

awaitTermination()

第四个方法叫作 awaitTermination(),它本身并不是用来关闭线程池的,而是主要用来判断线程池状态的。比如我们给 awaitTermination 方法传入的参数是 10 秒,那么它就会陷入 10 秒钟的等待,直到发生以下三种情况之一:

等待期间(包括进入等待状态之前)线程池已关闭并且所有已提交的任务(包括正在执行的和队列中等待的)都执行完毕,相当于线程池已经“终结”了,方法便会返回 true; 等待超时时间到后,第一种线程池“终结”的情况始终未发生,方法返回 false; 等待期间线程被中断,方法会抛出 InterruptedException 异常。 也就是说,调用 awaitTermination 方法后当前线程会尝试等待一段指定的时间,如果在等待时间内,线程池已关闭并且内部的任务都执行完毕了,也就是说线程池真正“终结”了,那么方法就返回 true,否则超时返回 fasle。

我们则可以根据 awaitTermination() 返回的布尔值来判断下一步应该执行的操作。

shutdownNow()

最后一个方法是 shutdownNow(),也是 5 种方法里功能最强大的,它与第一种 shutdown 方法不同之处在于名字中多了一个单词 Now,也就是表示立刻关闭的意思。在执行 shutdownNow 方法之后,首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行,然后会将任务队列中正在等待的所有任务转移到一个 List 中并返回,我们可以根据返回的任务 List 来进行一些补救的操作,例如记录在案并在后期重试。shutdownNow() 的源码如下所示。

public List<Runnable> shutdownNow() { 

    List<Runnable> tasks;

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try { 

        checkShutdownAccess();

        advanceRunState(STOP);

        interruptWorkers();

        tasks = drainQueue();

    } finally { 

        mainLock.unlock();

    } 

    tryTerminate();

    return tasks;

 }

你可以看到源码中有一行 interruptWorkers() 代码,这行代码会让每一个已经启动的线程都中断,这样线程就可以在执行任务期间检测到中断信号并进行相应的处理,提前结束任务。这里需要注意的是,由于 Java 中不推荐强行停止线程的机制的限制,即便我们调用了 shutdownNow 方法,如果被中断的线程对于中断信号不理不睬,那么依然有可能导致任务不会停止。可见我们在开发中落地最佳实践是很重要的。

在掌握了这 5 种关闭线程池相关的方法之后,我们就可以根据自己的业务需要,选择合适的方法来停止线程池,比如通常我们可以用 shutdown() 方法来关闭,这样可以让已提交的任务都执行完毕,但是如果情况紧急,那我们就可以用 shutdownNow 方法来加快线程池“终结”的速度。

为什么不应该自动创建线程池?

所谓的自动创建线程池就是直接调用 Executors 的各种方法来生成前面学过的常见的线程池,例如 Executors.newCachedThreadPool()。但这样做是有一定风险的,接下来我们就来逐一分析自动创建线程池可能带来哪些问题。其实就是采用了无界队列而导致的OOM问题。

FixedThreadPool

首先我们来看第一种线程池 FixedThreadPool, 它是线程数量固定的线程池,如源码所示,newFixedThreadPool 内部实际还是调用了 ThreadPoolExecutor 构造函数。

public static ExecutorService newFixedThreadPool(int nThreads) { 

    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

}

通过往构造函数中传参,创建了一个核心线程数和最大线程数相等的线程池,它们的数量也就是我们传入的参数,这里的重点是使用的队列是容量没有上限的 LinkedBlockingQueue,如果我们对任务的处理速度比较慢,那么随着请求的增多,队列中堆积的任务也会越来越多,最终大量堆积的任务会占用大量内存,并发生 OOM ,也就是OutOfMemoryError,这几乎会影响到整个程序,会造成很严重的后果。

SingleThreadExecutor

第二种线程池是 SingleThreadExecutor,我们来分析下创建它的源码。

public static ExecutorService newSingleThreadExecutor() { 

    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));

}

你可以看出,newSingleThreadExecutor 和 newFixedThreadPool 的原理是一样的,只不过把核心线程数和最大线程数都直接设置成了 1,但是任务队列仍是无界的 LinkedBlockingQueue,所以也会导致同样的问题,也就是当任务堆积时,可能会占用大量的内存并导致 OOM。

CachedThreadPool

第三种线程池是 CachedThreadPool,创建它的源码下所示。

public static ExecutorService newCachedThreadPool() { 

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

}

这里的 CachedThreadPool 和前面两种线程池不一样的地方在于任务队列使用的是 SynchronousQueue,SynchronousQueue 本身并不存储任务,而是对任务直接进行转发,这本身是没有问题的,但你会发现构造函数的第二个参数被设置成了 Integer.MAX_VALUE,这个参数的含义是最大线程数,所以由于 CachedThreadPool 并不限制线程的数量,当任务数量特别多的时候,就可能会导致创建非常多的线程,最终超过了操作系统的上限而无法创建新线程,或者导致内存不足。

ScheduledThreadPool 和 SingleThreadScheduledExecutor

第四种线程池 ScheduledThreadPool 和第五种线程池 SingleThreadScheduledExecutor 的原理是一样的,创建 ScheduledThreadPool 的源码如下所示。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 

    return new ScheduledThreadPoolExecutor(corePoolSize);

}

而这里的 ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的子类,调用的它的构造方法如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) { 

    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());

}

我们通过源码可以看出,它采用的任务队列是 DelayedWorkQueue,这是一个延迟队列,同时也是一个无界队列,所以和 LinkedBlockingQueue 一样,如果队列中存放过多的任务,就可能导致 OOM。

你可以看到,这几种自动创建的线程池都存在风险,相比较而言,我们自己手动创建会更好,因为我们可以更加明确线程池的运行规则,不仅可以选择适合自己的线程数量,更可以在必要的时候拒绝新任务的提交,避免资源耗尽的风险。

总结

  • 创建线程我认为只有一种,那就是new Thread(),其他的方式都是对该写法的进一步封装。
  • 线程池的诞生就是为了更好的利用系统资源,避免无效大量创建线程。线程直接与操作系统的线程挂钩,因此创建与销毁都需要调用底层实现,是会消耗系统资源。
  • 对于线程池创建,需要了解各个参数的意思,其中最重要的则是关于核心线程与最大线程的选择,理论为辅,符合实际需要最重要。
  • 我们可以自己根据业务需求,自己定义线程的名字、拒绝策略等等一些参数。
  • 线程池的销毁其实也是一种好的策略,线程不能说销毁就销毁,因为他可能还在执行自己的任务,突然停止会导致数据不一致,所以我们需要正确选择销毁时机。
  • JDK给我们提供了多种线程池,这些线程池其实都是对线程池的参数的选择不同而诞生。因此他们在使用的时候也可能会出现问题,其中最主要的原因还是因为无界队列问题,会导致程序OOM。

了解了这些,我觉得才是刚刚对线程池入了个门。对于创建后了的线程池,我们就无法在修改了吗?其实并不是的。目前已经存在了动态线程池等的开源项目,并且美团技术团队也提供了动态线程池实现思路,而将最大线程数与核心线程数进行动态化,这就解决了一旦线程池参数调的不够好,就需要重新打包项目部署等问题。这里给出文章链接。

动态线程池的实现思路

学习资料

  • 《极客时间. Java并发编程75讲》
  • 《Java并发编程的艺术》
  • 《Java并发编程实战》
转载自:https://juejin.cn/post/7282666367240241152
评论
请登录