likes
comments
collection
share

线程池详解

作者站长头像
站长
· 阅读数 16

一、线程池的作用

  1. 提升性能:线程池能独立负责线程的创建、维护和分配。主要体现在线程的复用。
  2. 线程管理:每个Java线程池会保持一些基本的线程统计信息,如完成的任务数量、空闲时间等。

二、线程池核心类

线程池详解

1、Executor <<接口>>

Executor是执行者接口,它的目标是执行目标任务,使任务提交和任务执行解藕。他只包含一个函数式方法:

void execute(Runnable command);

2、ExecutorService <<接口>>

继承自Executor。它的目标是对外提供异步任务接收服务并转交给执行者。

// 提交单个任务
<T> Future<T> submit(Callable<T> task);
// 提交批量任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

3、AbstractExecutorService

实现了ExecutorService接口。他的主要目标是为ExecutorService提供默认实现。

4、ThreadPoolExecutor

继承自AbstractExecutorService。是线程池的核心实现类。

5、ScheduledExecutorService <<接口>>

继承自ExecutorService。它是一个可以完成“延时”和“周期性”任务的调度线程池接口。

6、ScheduledThreadPoolExecutor

继承自ThreadPoolExecutor,提供了ScheduledExecutorService中“延迟”和“周期性”的具体实现。

7、Executors

它是一个静态工厂类,能通过静态方法快捷创建线程池。

三、线程池核心参数

public ThreadPoolExecutor(
    int corePoolSize, // 核心线程数
    int maximumPoolSize, // 最大线程数
    long keepAliveTime, // 空闲线程存活时间
    TimeUnit unit, // 空闲线程存活时间单位
    BlockingQueue<Runnable> workQueue, // 任务的阻塞队列
    ThreadFactory threadFactory, // 新线程的产生方式
    RejectedExecutionHandler handler // 拒绝策略
)

四、线程池的任务调度流程

线程池详解

  1. 如果当前的工作线程数小于核心线程数,执行器总是优先创建一个任务线程。
  2. 如果总任务数大于核心线程数,新接收的任务将加入阻塞队列中,一直到阻塞队列满。
  3. 当一个任务执行完毕时,执行器会从阻塞队列中获取下一个任务执行。
  4. 在核心线程数用完、阻塞队列也满了的情况下,线程池还接收到新任务,执行器为为任务创建一个非核心线程去执行新任务。
  5. 当核心线程数用完、阻塞队列也满了,非核心线程也用完了(到达maximumPoolSize),如果还有新任务,线程池将执行拒绝策略。

五、任务阻塞队列

Java线程池使用BlockingQueue存放接收到的异步任务,BlockingQueue常用的实现类有:

  1. ArrayBlockingQueue

是一个数组实现的有界阻塞队列,队列中的元素按FIFO排序。ArrayBlockingQueue在创建时必须设置大小。

  1. LinkedBlockingQueue

是一个基于链表实现的阻塞队列,按FIFO排序,可以设置容量,不设置容量默认按Integer.MAX_VALUE作为容量(无界队列)。该队列的吞吐量高于ArrayBlockingQueue。

  1. PriorityBlockingQueue

是一个具有优先级的队列。

  1. DelayQueue

是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列中获取元素时,只有过期的元素才会出队。

  1. SynchronousQueue

同步队列,是一个不存储元素的阻塞队列。SynchronousQueue的每一次put操作,必须等待其他线程的take操作。而每一个take操作也必须等待其他线程的put操作。吞吐量大于LinkedBlockingQueue。

六、ThreadFactory(线程工厂)

ThreadFactory是一个接口,用于创建线程,它只有一个方法:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

构造一个新的Thread 。实现还可以初始化优先级、名称、守护进程状态、 ThreadGroup等。

看一下默认的线程工厂:

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

这也解释了为什么线程池默认的线程名为pool-n-thread-n。

七、拒绝策略

被拒绝有两种情况:1、线程池已关闭;2、阻塞队列已满且maximumPoolSize已满。

RejectedExecutionHandler是拒绝策略接口,有以下几种常见的拒绝策略:

  1. AbortPolicy

线程池默认的拒绝策略。直接拒绝并抛出RejectedExecutionException异常。

  1. DiscardPolicy

该策略是AbortPolicy的安静版本,直接拒绝,但是并不会抛出异常。

  1. DiscardOldestPolicy

抛弃最老任务策略。就是当阻塞队列满了,将最早进入(最老)队列的任务抛弃,腾出空间给新任务。因为队列是FIFO的,所以每次都是移除队头元素。

  1. CallerRunsPolicy

调用者执行策略。如果任务添加失败,不再麻烦线程池去执行,哪个线程提交的任务,哪个线程就去执行。

  1. 自定义拒绝策略

如果以上拒绝策略均不满足要求,自定义拒绝策略实现RejectedExecutionHandler接口中的rejectedExecution方法即可。

public class SimplePolicy implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 记录日志
        // 报警
    }
}

八、Executors快捷创建线程池的潜在问题

  1. newFixedThreadPool

用于创建固定数量的线程池。

方法如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

主要原因在于使用了LinkedBlockingQueue(无界阻塞队列)作为阻塞队列。当任务提交速度大于任务执行速度,就会造成大量任务在阻塞队列中等待,如果任务很大,可能使得内存资源耗尽,引发OOM异常。

  1. newSingleThreadExecutor

单线程化线程池,线程池中只有一个线程。该线程池使用了FinalizableDelegatedExecutorService包装,保证了corePoolSize始终唯一不可修改,当试图修改corePoolSice属性时将抛出ClassCastException,表示FinalizableDelegatedExecutorService无法被转型为ThreadPoolExecutor。如果该单个线程由于关闭前执行过程中的失败而终止,则如果需要执行后续任务,一个新线程将取代它。

方法如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

潜在问题主要也是使用了LinkedBlockingQueue(无界阻塞队列)作为阻塞队列。

  1. newCachedThreadPool

可缓存线程池,可以无限的创建新线程,没有任务等待,使用SynchronousQueue作为阻塞队列。当有新任务到来,会被插入SynchronousQueue,在线程池中寻找可用线程执行,如果没有则创建一个新的线程执行该任务。

方法如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

潜在问题在于最大线程数不设上限,如果任务过多,会造成大量线程被启动,可能造成OOM异常,甚至导致CPU线程资源耗尽。

  1. newScheduledThreadPool

可调度线程池。可以在给定的延迟后运行、或定期执行异步任务。

方法如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

潜在问题主要在于最大线程数不受限制,使用了DelayedWorkQueue无界工作队列。可能引发OOM甚至资源耗尽问题。

九、确定线程池线程数

  1. 为IO密集型任务确认线程池线程数

IO密集型任务主要任务是执行IO操作,由于执行IO操作时间较长,导致CPU的利用率不高,线程空余时间很多。

线程数设定为CPU核数的两倍(参考Netty)。

// 获取CPU核数
Runtime.getRuntime().availableProcessors();
  1. 为计算密集任务确认线程池线程数

计算密集任务主要是要进行大量计算而需要消耗CPU资源,如数学计算、视频解码等。这类任务应该充分利用CPU,线程数设定为CPU核数。

  1. 为混合型任务确认线程池线程数

混合型任务既要进行大量逻辑计算,又要进行大量非CPU耗时操作。业界有一个比较成熟的估算公式:

最佳线程数 = (线程等待时间 + 线程CPU时间)/ 线程CPU时间 * CPU核数

最佳线程数 = 任务总耗时 / 线程CPU时间 * CPU核数

十、钩子方法

ThreadPoolExecutor为每个任务的执行前后都提供了钩子方法,一般由子类重写,具体如下:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
  1. beforeExecute

在给定线程执行任务之前调用的方法。此方法由将执行任务r的线程t调用,并且可用于重新初始化 ThreadLocals 或执行日志记录。

该实现不执行任何操作,但可以在子类中进行自定义。注意:为了正确嵌套多个重写,子类通常应在此方法末尾调用super.beforeExecute 。

  1. afterExecute

在给定线程执行任务之后调用的方法。此方法由将执行任务r的线程t调用,并且可用于清除ThreadLocals 或执行日志记录。

该实现不执行任何操作,但可以在子类中进行自定义。注意:为了正确嵌套多个重写,子类通常应在此方法末尾调用super.afterExecute 。

  1. terminated

线程池终止时调用。注意:为了正确嵌套多个重写,子类通常应在此方法中调用super.terminated 。

演示:

public static void main(String[] args) throws InterruptedException {
    ThreadLocal<Long> longThreadLocal = new ThreadLocal<>();
    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2)) {
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            System.out.println("线程" + r + "前钩子执行");
            longThreadLocal.set(System.currentTimeMillis());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            long time = System.currentTimeMillis() - longThreadLocal.get();
            System.out.println("线程" + r + "后钩子执行,任务花费" + time + "(ms)");
        }

        @Override
        protected void terminated() {
            System.out.println("线程池关闭");
            super.terminated();
        }
    };
    pool.execute(() -> System.out.println("任务执行"));
    Thread.sleep(1000);
    pool.shutdown();
}

线程池详解

十一、线程池的状态

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  1. RUNNING:线程创建后的初始状态,这种状态下可以执行任务。
  2. SHUTDOWN:该状态下不再接收新任务,但是会将工作队列中的任务执行完毕。
  3. STOP:该状态下线程池不再接收新任务,也不会处理工作队列中的任务,会中断所有工作线程。
  4. TIDYING:该状态下所有任务都已终止或者处理完成,将会调用terminated()钩子方法。
  5. TERMINATED:执行完terminated()钩子方法之后的状态。

线程池状态转换规则

线程池详解

  1. 线程池创建后状态为RUNNING。
  2. 执行线程池的shutdown()方法,线程池从RUNNING转换为SHUTDOWN。
  3. 执行线程池的shutdownNow()方法,线程池从SHUTDOWN转换为STOP。
  4. 线程池的所有工作线程停止,工作队列清空之后,线程池从STOP转变为TIDYING。
  5. 执行完terminated()钩子方法之后,线程池从TIDYING转换为TERMINATED。

十二、如何优雅的关闭线程池

优雅关闭线程池主要涉及的方法:

  1. shutdown

该方法会将线程池的状态设置为SHUTDOWN,中断空闲线程,不再接收新任务,但是会将工作队列中的任务执行完毕。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess(); // 检查权限
        advanceRunState(SHUTDOWN); // 设置状态
        interruptIdleWorkers(); // 中断空闲线程
        onShutdown(); // 清理资源
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
  1. shutdownNow

该方法会将线程池的状态设置为STOP,中断所有线程,不再接收新任务,清空阻塞队列,将未完成的任务返回给调用者。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
  1. awaitTermination

线程池的关闭需要时间,需要用户程序调用awaitTermination()主动等待。

如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,那么awaitTermination()返回true。

如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,那么awaitTermination()返回false。

如果等待时间没有超过指定时间,等待!

简单调用方法如下:

threadPool.shutdown();
try{
    // 等待线程池关闭
    while(!threadPool.awaitTermination(60, TimeUnit.SECONDS)){
        System.out.println("线程池还未关闭");
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

如果线程池关闭,awaitTermination()方法会返回true;如果超过了指定时间未结束会返回false。我们需要设定重试次数,但是不要永久等待。参考Dubbo中关闭线程池方法:

threadPool.shutdown();
if (!threadPool.isTerminated()) {
    try {
        for (int i = 0; i < 1000; i++) {
            if (threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                break;
            }
            threadPool.shutdownNow();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

优雅关闭线程池

public static void shutdownThreadPoolGracefully(ExecutorService threadPool) {
    // 拒绝新任务
    threadPool.shutdown();
    try {
        // 等待60s,等待线程池任务结束
        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
            // 取消正在执行的任务
            threadPool.shutdownNow();
            // 再次等待60s
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                System.out.println("线程池任务未正常执行结束");
            }
        }
    } catch (InterruptedException e) {
        // 捕获异常,再次shutdownNow
        threadPool.shutdownNow();
    }
    // 仍然没有关闭
    if (!threadPool.isTerminated()) {
        try {
            // 循环关闭1000次,每次等待10ms
            for (int i = 0; i < 1000; i++) {
                if (threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                    break;
                }
                threadPool.shutdownNow();
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}