likes
comments
collection
share

深入理解Java线程池,ThreadPoolExecutor源码剖析

作者站长头像
站长
· 阅读数 46

Java线程池

什么是线程池

线程池是一种线程资源,我们通过把线程池化,可以带来许多好处

线程池的好处

  • 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程
  • 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
  • 可以对线程做统一管理、监控、调优

线程池的应用非常丰富,随便翻开一个开源项目都能看到线程池的身影。

Java线程池快速开始

    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            // 向线程池提交任务
            executor.execute(()->{
                System.out.println(1);
            });
        }
        // 关闭线程池
        executor.shutdown();
        // 等待所有任务完成
        while (!executor.isTerminated()) {
        }
        System.out.println("线程池所有任务完成");
    }

线程池的创建方式

  1. new ThreadPoolExecutor
  2. 通过Executors工具类创建线程池,这种方法本质是对第一种的封装

阿里规约禁止工程项目通过Executors工具类创建线程池,但规矩是死的人是活的,实际上像dubbo源码中也用到了Executors创建的线程池,但足以见得ThreadPoolExecutor的重要性。

那现在的问题是:Java如何做到池化线程资源?我们知道一个线程只能被调用一次start方法,那么线程在执行完一个线程后,如何执行下一个任务?没有任务时线程就一直消耗资源吗?我们来扒一扒ThreadPoolExecutor源码

Java线程池体系结构

深入理解Java线程池,ThreadPoolExecutor源码剖析 Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的核心实现类, 我们从上往下慢慢看。

Executor接口
public interface Executor {
    // 执行任务 Runnable是我们封装的线程体,是一段可执行的方法
    void execute(Runnable command);
}

Executor往下走是ExecutorService

ExecutorService
public interface ExecutorService extends Executor {
    // 关闭线程池
    void shutdown();
    // 返回未完成任务的列表
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    // 提供了3种submit,返回值都是Future
    <T> Future<T> submit(Callable<T> task);
    // 共4种invoke 支持超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit);
}

ExecutorService是对Executor的增强,从提供执行任务的基础功能扩展:

  • 增强了对线程池本身的管理功能,可以关闭线程池
  • 额外提供了3种有返回值的执行任务的方法,以及4种invoke的提交方法

关于invoke提交有啥用我们后面再聊,最核心的提交任务的方式是:execute > submit

在往下是AbstractExecutorService,看到是抽象类,那基本上就是对公共逻辑的实现封装了。

AbstractExecutorService
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

AbstractExecutorService主要是对上面的3种submit和4种invoke的实现,提供了两个newTaskFor方法。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

FutureTask不陌生,我们知道要获取异步执行线程的结果,光有callable是不够的,需要用FutureTask做一层封装

而submit方法最终还是调用了execute方法,只是在调用前将Callable封装成FutureTask而已。

所以到这我们可以基本确定:执行任务的核心就是execute方法

那最后就是ThreadPoolExecutor了。

ThreadPoolExecutor源码也并不多,只有2000行,作为比较,ConcurrentHashMap有6000行。

那么线程复用的核心就藏在这2000行代码里了。

ThreadPoolExecutor

核心字段

ctl和Worker是至关重要的两个字段

ctl:控制核心

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 1 << 29 - 1 ,即CAPACITY为 后29位全1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits(前3位)
// Accept new tasks and process queued tasks
private static final int RUNNING    = -1 << COUNT_BITS;
// Don't accept new tasks, but process queued tasks
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
private static final int STOP       =  1 << COUNT_BITS;
// All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
private static final int TIDYING    =  2 << COUNT_BITS;
// terminated() has completed
private static final int TERMINATED =  3 << COUNT_BITS;

源码上的注解非常长,也很详细,大致意思是这样的:

ctl用一个字段封装了两个含义:

  • workerCount:the effective number of threads that have been permitted to start and not permitted to stop.
  • runState:线程池的状态,共有5种,具体的含义已经在上方源码中注释

因为ctl有32位,因此:用3位表示runState(5种状态 > 4),剩下的表示workerCount

并且提供了取出这两个数据的方法:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }

主要字段

// 1.2. 核心/最大线程数
private volatile int corePoolSize;
private volatile int maximumPoolSize;
// 3.4. 超时时间
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
// 5. 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 6. 线程工厂
private volatile ThreadFactory threadFactory;
// 7. 拒绝策略
private volatile RejectedExecutionHandler handler;
// 锁 以及等待通知机制 线程池会添加/删除线程,就依赖mainLock来保证线程安全
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
// 工作线程的集合
private final HashSet<Worker> workers = new HashSet<Worker>();

给它们进行了编号,实际上这些参数是可以通过构造方法自定义的。

线程池工作原理概述

现在有必要对这些字段有一些稍微的了解:

线程池的线程分为两种:核心线程,和非核心线程。用了两个字段维护它们的最大值,但需要注意:maximumPoolSize是所有线程数量的上限,而不单单是非核心线程的。

超时时间:线程是会占用资源的,瞬间的大量请求可能会导致线程池线程数量很多,但又回归平静没啥请求了,就会导致大量线程白白占用资源,尽管可以阻塞但仍然不够优雅。因此我们需要在空闲时能够释放它们。并且用allowCoreThreadTimeOut来控制是否核心线程允许超时过期,默认不允许。

阻塞队列:线程池在极端情况下会被提交很多任务,靠线程来不及一下子执行完,当然需要一个数据结构来存储这些线程,同时是个多线程场景,因此用了阻塞队列。

主锁mainLock:线程池在管理(创建和删除)线程时,也是会面临线程安全问题的,因此用这把锁来保证。

对于一个新来的任务:

  • 如果核心线程数量都还没满,就创建一个线程来执行;
  • 如果核心线程数满了,就放入阻塞队列中存储任务
  • 阻塞队列也满了,就尝试创建非核心线程来执行任务
  • 非核心线程数量也满了,只能触发拒绝策略

现在上面的字段会让你陌生的可能有ThreadFactory,RejectedExecutionHandler,Worker。我们依次来看:

ThreadFactory:线程工厂

ThreadFactory比较好猜:就是工厂模式嘛

源码也非常短,ThreadFactory接口只规定了一个方法:newThread

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

这就解释了线程池中线程名字的由来,并且:非守护线程,优先级为5。当然ThreadFactory我们是可以自定义修改的,实际上在项目中也大都需要自定义,修改线程名字便于定位和排查问题。

Rejected...Handler:拒绝策略

RejectedExecutionHandler是拒绝策略,即线程池不再允许放入任务时的行为。RejectedExecutionHandler是个接口,就一个方法。

ThreadPoolExecutor提供四种拒绝处理的策略(RejectedExecutionHandler的实现类)为:

  • ThreadPoolExecutor.AbortPolicy默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException
  • ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

来看一下AbortPolicy的源码:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

下面再看一下Worker

在线程池中真正执行任务的Worker

Worker是个内部类。

private final class Worker extends AbstractQueuedSynchronizer 
    implements Runnable{
    final Thread thread;
    Runnable firstTask;
    public void run() {
            runWorker(this);
        }
}

发现Worker居然继承了AQS,是个独占的锁

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        return true;
    }
    return false;
}

protected boolean tryRelease(int unused) {
    setState(0);
    return true;
}

源码注释说到:这个锁是为了说明线程工作状态,无锁代表空闲,有锁代表正在工作,当然细节远不止这么一些,但我们主线任务是线程池的原理,这个稍后说。

构造方法

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 线程用ThreadFactory生成
    this.thread = getThreadFactory().newThread(this);
}

发现构造方法仅在一处被调用,那就是addWorker。

addWorker:新增Worker

private boolean addWorker(Runnable firstTask, boolean core) {
    // 1. CAS自旋尝试将Ctl的workerCount + 1  
         int wc = workerCountOf(c);
    // 如果超过线程数直接add失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
   // 因为判断比较多 写的比较长 省略部分
    try {
        // 创建Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 获取锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck 线程池状态,线程是否存活等
                // 插入set集合
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功 启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

正常情况:就是new一个Worker,维护一下HashSet,然后启动线程。

线程复用核心原理:start

其实要做到线程复用,因为start方法确定只能调用一次,那么问题肯定是处在start方法上,也就是Worker的run方法,而run直接调用runWorker方法。我们不妨来看一下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 先执行firstTask,getTask方法就是从阻塞队列里取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查是否需要中断自己,比如线程池状态变了
            try {
                // 这是一个空方法,是一个扩展点,源码的注解是:
                // may be used to re-initialize ThreadLocals, or to perform logging
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                }  finally {
                    // 和beforeExecute一样是个空方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 这个方法就是线程出现异常如何处理的关键,我们稍后单独分析
        processWorkerExit(w, completedAbruptly);
    }
}

总之,先执行Worker创建时绑定的方法,然后死循环从阻塞队列中取任务并执行。

Worker的start并不是单单执行一个任务,而是不断获取任务来执行,这就是线程复用的核心。

Worker就差不多讲完了,当然还有很多细节我们后面再讲,线程池更多的就是对Worker的管理和调度。现在把目光重新聚焦于ThreadPoolExecutor本身:

重新审视线程池状态

虽然源码上有注解,但不了解线程池大致工作原理容易看不明白,现在了解了大致工作原理了,我们可以重新看线程池状态会有一个更清晰的认知。我们再回顾一下源码的注释:

// runState is stored in the high-order bits(前3位)
// Accept new tasks and process queued tasks
private static final int RUNNING    = -1 << COUNT_BITS;
// Don't accept new tasks, but process queued tasks
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
private static final int STOP       =  1 << COUNT_BITS;
// All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
private static final int TIDYING    =  2 << COUNT_BITS;
// terminated() has completed
private static final int TERMINATED =  3 << COUNT_BITS;

我列了一张表格,会更加直观:

线程池状态新来的任务阻塞队列中的任务执行任务中的线程空闲的线程
RUNNING接受正常执行正常执行阻塞
SHUTDOWN拒绝正常执行正常执行中断
STOP拒绝丢弃中断中断
TIDYING拒绝没有任务没有线程没有线程
TERMINATED拒绝没有任务没有线程没有线程

TIDYING和TERMINATED有啥区别?源码注释说的很清楚了:the thread transitioning to state TIDYING will run the terminated() hook method,大概就是:一旦到了TIDYING状态,就会调用terminated()方法,这个方法调用完,线程池就变为TERMINATED状态。

线程池状态的转换

我们new出来的线程池初始当然是RUNNING状态

  • 一旦手动调用shutdown,就会转化为SHUTDOWN;
  • 一旦手动调用shutdownNow,就会转化为STOP;

等到SHUTDOWN或是STOP忙完了自己的事,线程池会自己陷入TIDYING,最后是TERMINATED。

shutdown:转化为SHUTDOWN

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // CAS 修改状态为shutdown
        advanceRunState(SHUTDOWN);
        // 中断空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor 
        // 在ThreadPoolExecutor 这是个空方法
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

shutdownNow:转化为STOP

大部分源码都和shutdown差不多,下面列出不同点:

// CAS的目标为 STOP 状态
advanceRunState(STOP);
// 会中断所有线程
interruptWorkers();
// 将阻塞队列所有未完成的任务返回
tasks = drainQueue();

返回的类型是List<Runnable>,实际类型是ArrayList

这两个方法的共同点是:最后都调用了tryTerminate();方法

tryTerminate:可能的终止

tryTerminate();方法被调用的地方很多,源码的注释是这样说的:

This method must be called following any action that might make termination possible -- reducing worker count or removing tasks from the queue during shutdown.

tryTerminate();方法必须在任何可能导致线程池termination的地方被调用,比如:减少Worker数,在shutdown时删除阻塞队列中的任务。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 中断一个空闲的线程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        // Worker数为0
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 先转化为TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 空方法 
                    terminated();
                } finally {
                    // 最后转化为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

线程池的构造方法

以下是ThreadPoolExecutor构造函数需要的参数,共7个:

  • int corePoolSize:该线程池中核心线程数最大值
  • int maximumPoolSize:该线程池中线程总数最大值
  • long keepAliveTime非核心线程闲置超时时长
  • TimeUnit unit:keepAliveTime的单位
  • BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象

以上五个是必选项,剩下两个是可选项

  • ThreadFactory threadFactory创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。
  • RejectedExecutionHandler handler拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略。

可选项默认的构造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

核心方法:execute(主要工作原理)

线程池最重要的功能当然是执行任务,我们直接看execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();   
    int c = ctl.get();
    // 1.当前活跃的线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
    if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
           return;
       c = ctl.get();
    }
    // 2.如果不小于corePoolSize,则将任务添加到workQueue队列(阻塞队列)
    // 这里调用offer方法而非put,因为阻塞队列满了需要特殊处理,而不是像get阻塞等待消费者唤醒
    // offer方法,要是竞争锁失败,不会等待直接返回false
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 如果线程池状态以及不是Running了,则remove这个任务,然后执行拒绝策略。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.2 核心线程数为0的特殊情况
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.如果放入workQueue失败(workQueue满了),则创建非核心线程执行任务,
    else if (!addWorker(command, false))
         // 如果这时创建非核心线程失败(当前线程总数不小于maximumPoolSize时),就会执行拒绝策略。
         reject(command);
}
// addWorker(command, true) 第二个参数true表示核心线程,false表示非核心 

文字小结一下:

  1. 线程总数量 < corePoolSize,无论是否有线程空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。
  2. 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去阻塞队列中取任务来执行(体现了线程复用)。
  3. 当阻塞队列满了,说明这个时候任务已经很多了,会创建非核心线程去执行这个任务。
  4. 阻塞队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理

整个execute方法还是很清晰的,如果忘记了addWorker方法可以回顾一下上文。

好,那么线程池最核心的部分就讲完了,是不是有点突然,才只看了一个execute方法,但那也是因为前文我们铺垫的足够久。当然从剖析源码的角度,我们还需要关注一下别的细节:

线程池的其他细节

核心线程预热

  • prestartCoreThread 创建一个核心线程
  • prestartAllCoreThreads 创建核心线程直到满

其实就是调用了addWorker方法

public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}

线程池异常处理/线程回收

在Worker线程的runWorker方法中,退出「不断从阻塞队列取任务并执行的循环」后,会执行processWorkerExit,要不是线程空闲太久了退出,要不是线程异常了退出,我们来看一下这个方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 线程异常退出时 completedAbruptly为true
    // CAS WorkerCount - 1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    // 从HashSet中移除这个Worker对象
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 删除掉 这个Worker对象
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 可能终止线程池
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 涉及极端情况的细节考虑 但如果是正常的线程过期 就直接return
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 重新添加一个Worker,非核心线程
        addWorker(null, false);
    }
}

因此,如果线程池中的线程抛出异常没有处理,会把它移除掉,再添加一个新的线程进去。

如何区分核心/非核心线程

这里我们要提一个细节:

private volatile int corePoolSize;
private volatile int maximumPoolSize;

字段中有两个值,分别是核心线程数,和最大线程数,但只用了一个WorkerCount(ctl维护)统计线程数量,线程池是如何区分核心线程和非核心线程的?

实际上线程池并不严格区分核心线程和非核心线程,因为从Worker的角度来说它们没有什么区别,因此线程池只在乎数量。下面马上来看一下回收线程时的判断:

线程过期回收的实现

我们之前在分析线程复用核心原理start的方法时,getTask方法是个黑盒,并且getTask返回null时Worker线程就会退出循环并执行processWorkerExit(w, completedAbruptly);,现在我们来看一下getTask这个方法:

getTask()方法返回null的时机如下:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }

allowsCoreThreadTimeOut:是否允许核心线程过期被回收

因此,要么允许核心线程过期,要么线程数超过了核心线程数,此时才会执行poll方法,可能返回null,因为:

  • take阻塞直到获取到任务
  • poll(time,unit) :阻塞一段时间,还没获取到任务,返回null

用了allowCoreThreadTimeOut || wc > corePoolSize;这个判断,因此线程池不需要严格区分核心线程和非核心线程。

那一旦返回了null,就可以用上文提到的processWorkerExit来处理。

单线程线程池有什么用

单线程线程池,再加上阻塞队列的FIFO特性,可以保证提交到该线程池的任务的有序性。

自定义线程池实战

熟悉了线程池工作原理,就可以自定义线程池并应用了,那么问题在于,线程池的构造函数复杂,该如何配置线程池的参数呢?

线程池的参数配置问题

线程池的参数并不好配置。最核心的三个:corePoolSize、maximumPoolSize,workQueue

IO密集型和CPU密集型运行起来的情况差异非常大。

CPU密集型:线程数等于CPU核数+1

IO密集型:线程数可以多一些,来充分利用CPU资源

像这个方案是太笼统了。

参数配置解决方案

比较知名的有:disruptor、动态线程池等,也可以自己根据业务不断改善和调整。想了解动态线程池的可以参考:Java线程池实现原理及其在美团业务中的实践

打破常规逻辑:Tomcat线程池

JAVA JDK线程池策略,比较适合处理 CPU 密集型任务,但是对于 I/O 密集型任务,如数据库查询,rpc 请求调用等,不是很友好,所以Tomcat在其基础上进行了扩展。

它里面的线程池的运行过程是先把最大线程数用完,然后再提交任务到队列里面去的。

tomcat的阻塞队列workqueue重写了offer(排队任务的方法),(还记得怎么回收非核心线程吗,getTask方法如果超过核心线程数,会调用offer而不是task,因此可能被回收)该方法中,当当前线程数大于核心线程数,小于最大线程数时,返回false,导致workQueue.offer(command)false,进而导致该分支代码不被执行,执行addWorker(command, false)方法,创建新线程。

核心源码offer方法如下:

// 对offer方法的包装  
public boolean force(Runnable o) {
        if (parent == null || parent.isShutdown()) {
            throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
        }
        return super.offer(o); 
        //forces the item onto the queue, to be used if the task is rejected
    }
// 重写offer方法
    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) {
            return super.offer(o);
        }
        //getPoolSize 方法是获取线程池中当前运行的线程数量
        //也就是所有线程都创建完了,放入阻塞队列
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
            return super.offer(o);
        }
        // getSubmittedCount 获取的是当前已经提交但是还未完成的任务的数量
        // 其值是队列中的数量加上正在运行的任务的数量。
        // 这块逻辑可能的情况就是,有些线程执行完了,空闲,那么我把任务放入队列里,就能立刻执行
        // 没有必要再创建线程了
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
            return super.offer(o);
        }
        //if we have less threads than maximum force creation of a new thread
        // 这里就是实现创建线程的关键,return false就会去创建新线程
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
            return false;
        }
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

上文源码的parent是什么?

taskqueue.setParent(executor);

构造线程池处有一行代码,设置了阻塞队列的parent为线程池

常见面试题

execute和submit的区别

submit本质只是将线程体(可能是callable,可能是runnable)封装成FutureTask再去执行execute方法

  • execute只能提交Runnable类型的任务,没有返回值;
  • submit既能提交Runnable类型任务也能提交Callable类型任务,返回Future类型。
  • execute方法提交的任务异常是直接抛出的;
  • submit方法是是捕获了异常的,当调用FutureTask的get方法时,才会抛出异常。

这种处理异常的方式不是被线程池所决定的,而是被FutureTask在try块捕获并封装到结果中,从而在get时才抛出。

为什么Worker要继承AQS实现锁

这个锁是为了说明线程工作状态,无锁代表空闲,有锁代表正在执行任务。并且Worker是不可重入的。目的是不希望正在工作中的线程被中断。

提交任务的方式有哪些

有4种invoke,3种submit,和一个execute。

submit是对execute的增强,主要是用FutureTask对线程体做了封装从而能够获取异步执行的结果。

invoke是对功能的扩展,支持提交多个任务,并且所有(All)/某个(Any)任务执行完毕才返回,它们也都用FutureTask做了封装,支持获取执行结果。

invokeAll:执行所有任务

线程池会把 list 里面的任务一个个的去执行,执行完成后返回一个 Futures 列表。

invokeAll方法:

  1. 把传入进来的任务封装为一个 FutureTask 对象,先放到一个 ArrayList 里面,然后调用 execute 方法,也就是扔到线程池里面去执行。
  2. 遍历每个Future 对象,调用get方法阻塞直到得到Future的结果

invokeAll与shutdownNow潜在的bug

  • 在invokeAll期间执行shutdownNow方法,会导致有的任务阻塞在Future.get。因此程序无法退出。

shutdownNow返回未完成任务的列表,我们对list内的每个FutureTask执行cancel方法,从而使程序正常退出。

但采用了DiscardPolicy,你没有任何地方让这个被静默处理的 Future 抛出异常,也没用任何地方能调用它的 cancel 方法,所以这里就会一直阻塞。

说一下线程池里面的几把锁

除了mainlock和Worker,阻塞队列里也有两把锁

线程池如何处理异常

转载自:https://juejin.cn/post/7282692887665836086
评论
请登录