likes
comments
collection
share

面试官:请叙述ThreadPoolExecutor的执行过程与原理

作者站长头像
站长
· 阅读数 11

ThreadPoolExecutor的执行过程与原理

核心方法一:execute

每次使用线程池都是从execute执行任务,只知道通过该方法就可以开启多线程并执行任务,但一直不清楚其背后执行的逻辑与原理,下面通过 ThreadPoolExecutor 的核心源码解析了解它的执行过程

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 核心线程池大小
    private final int corePoolSize;
    // 最大线程池大小
    private final int maximumPoolSize;
    // 线程空闲超时时间
    private final long keepAliveTime;
    // 时间单位
    private final TimeUnit unit;
    // 等待队列,用于保存等待执行的任务
    private final BlockingQueue<Runnable> workQueue;
    // 线程池中当前线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 线程工厂,用于创建新的线程
    private final ThreadFactory threadFactory;
    // 拒绝策略,用于处理无法处理的任务
    private final RejectedExecutionHandler handler;
    // 等待任务的线程是否可以中断
    private volatile boolean allowCoreThreadTimeOut;
    // 已经完成任务的计数器
    private final AtomicLong completedTaskCount = new AtomicLong();

    // 构造方法
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                              long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    // 执行任务
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 如果线程数小于核心池大小,则创建新的线程来执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果线程池处于运行状态,将任务添加到等待队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果线程池已经停止或者任务队列已满,移除任务并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果线程池中没有线程,则创建新的线程来执行任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果无法将任务添加到等待队列,则创建新的线程来执行任务,如果失败则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

    // 关闭线程池
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
}

从上述源码execute方法中可以看出,其中最重要的方法是addWorker,该方法是创建线程和执行任务的核心,看看该方法的源码:

核心方法二:addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get(); // 获取线程池的 ctl 状态值
        int rs = runStateOf(c); // 获取线程池的运行状态

        // 如果线程池的状态为 SHUTDOWN 并且下列三个条件同时满足,则不添加新的工作线程
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c); // 获取线程池中的工作线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 通过execute可得知,firstTask就是传进来的任务
        w = new Worker(firstTask);
        // Worker中thread成员变量是通过 getThreadFactory().newThread(this); 构建出来的Thread
        // Work被当作任务(this)传入Thread,所以w.thread; 就相当于new Thread(w);
        // 所以当t.start() 的时候,其实就会执行worker中的run方法,run方法调用了runWorker方法
        // runWorker方法也是线程池为什么会一直阻塞等待任务的方法。
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker实现了Runnable接口,所以对于线程池来说,worker就是一个待执行的任务。Worker类自身的源码很简单,Worker构造函数中做了两件事:

  1. 将传入task赋值给 firstTask
  2. 通过 getThreadFactory().newThread(this); 构建出来 Thread 并将自身作为任务

在addWorker方法中再通过 Worker 构建出来的Thread执行,看上述源码中t.start(),由于worker作为task,所以开启线程时会执行Worker的run方法,worker的run方法调用runWorker方法,runWorker方法也是线程池开启如果没有手动调用关闭,那么会一直存在,源码如下:

核心方法三:runWorker

final void runWorker(Worker w) {
    // 获取当前线程
    Thread wt = Thread.currentThread();
    // 获取 Worker 对象的 firstTask 属性
    Runnable task = w.firstTask;
    w.firstTask = null;
    // Worker 的 lock() 方法是一个独占式的获取锁,因此如果已经有线程获取到了锁,当前线程就会一直阻塞在这里,直到获取到锁
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果第一个任务不为空,则执行第一个任务
        while (task != null || (task = getTask()) != null) {
            // 加锁,防止任务执行过程中被中断
            w.lock();
            // 如果线程已经被中断,抛出异常
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行任务
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 执行结束,如果线程未被中断,则执行 workerDone() 方法进行清理工作
        processWorkerExit(w, completedAbruptly);
    }
}

从addWorker中得知,firstTask 是我们传进来需要被执行的任务,在runWorker方法中,他再次取worker中的firstTask并执行。

结论

通过源码分析得知从 execute -> addWorker -> runWorker,忽略其中的细节,Worker贯穿了线程池执行流程的全部,弄清楚Worker类的作用,想必对线程池的执行流程就会很清晰了。

转载自:https://juejin.cn/post/7202188243113263162
评论
请登录