likes
comments
collection
share

Java线程池源码解读

作者站长头像
站长
· 阅读数 18

线程池是Java并发编程中的一个重要组件,它能够有效地管理线程的生命周期和执行,从而避免了频繁创建和销毁线程的开销。在本文中,我们将详细解读Java线程池的实现源码。

  1. 线程池的基本实现

Java线程池的基本实现是通过ThreadPoolExecutor类来完成的。ThreadPoolExecutor是一个线程池的核心类,它实现了Executor接口并提供了线程池的完整功能。下面是ThreadPoolExecutor类的构造函数:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 省略代码 }
}

其中,corePoolSize是线程池的核心线程数,maximumPoolSize是线程池的最大线程数,keepAliveTime和unit是非核心线程的存活时间和时间单位,workQueue是任务队列,threadFactory用于创建线程,handler是拒绝策略,用于处理任务队列已满时的任务拒绝。

线程池的执行过程如下:

  1. 当任务数量小于核心线程数时,创建核心线程来处理任务。
  2. 当任务数量大于核心线程数时,将任务放入任务队列中。
  3. 当任务队列已满且线程数小于最大线程数时,创建新的线程来处理任务。
  4. 当线程数达到最大线程数时,执行拒绝策略。

下面我们来详细解读ThreadPoolExecutor类的实现源码。

  1. 线程池的实现细节

ThreadPoolExecutor类实现了ExecutorService接口,它提供了submit、invokeAll、invokeAny等方法,用于提交任务和管理线程池。下面我们来看一下ThreadPoolExecutor类中的一些重要方法。

2.1 execute方法

execute方法是ThreadPoolExecutor类中最重要的方法之一,用于执行任务。下面是execute方法的源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we add a new thread. This covers both cases when the * queue is full and when the queue is "small" (i.e., when * queuing would normally be allowed but construction has * not yet completed). We don't actually know whether the * task is going to be executed until workerThread * actually handles it, so we compensate for workers that * exit just after checking before blocking by rechecking * runState. */ 
     int c = ctl.get(); 
     if (workerCountOf(c) < corePoolSize) { 
         if (addWorker(command, true)) return; c = ctl.get(); 
     } 
     if (isRunning(c) && workQueue.offer(command)) { 
         int recheck = ctl.get();
         if (! isRunning(recheck) && remove(command)) reject(command); 
         else if (workerCountOf(recheck) == 0) addWorker(null, false); 
     } 
     else if (! addWorker(command, false)) reject(command); 
   }

可以看到,execute方法中主要做了以下几件事情:

  1. 如果当前线程数小于核心线程数,尝试通过addWorker方法创建新的线程来处理任务。
  2. 如果任务队列未满,则将任务加入任务队列中。
  3. 如果任务队列已满但线程数小于最大线程数,尝试通过addWorker方法创建新的线程来处理任务。
  4. 如果任务队列已满且线程数已达到最大线程数,执行拒绝策略。 其中,addWorker方法用于创建新的线程来处理任务。下面我们来看一下addWorker方法的实现。

2.2 addWorker方法 addWorker方法用于创建新的线程来处理任务,其源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c); // Check if queue empty only if necessary. 
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;
        for (; ; ) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
            if (compareAndIncrementWorkerCount(c)) break retry;
            c = ctl.get(); // Re-read ctl 
            if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop 
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock. 
                // Back out on ThreadFactory failure or if 
                // shut down before lock acquired. 
                int c = ctl.get();
                int rs = runStateOf(c);
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable 
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize) largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted) addWorkerFailed(w);
    }
    return workerStarted;
}

可以看到,addWorker方法主要做了以下几件事情:

  1. 检查线程池的状态,如果线程池的状态为SHUTDOWN或STOP,则返回false,不再创建新的线程。
  2. 如果当前线程数小于最大线程数,则通过compareAndIncrementWorkerCount方法增加线程数,并继续执行下面的代码。
  3. 创建新的Worker对象,并将其加入到workers集合中。
  4. 如果worker添加成功,则启动新的线程。 其中,Worker类是线程池中最重要的类之一,我们接下来来看一下Worker类的实现。

2.3 Worker类 Worker类是线程池中真正执行任务的类,其源码如下:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /**
     * Thread this worker is running in.  Null if factory fails.
     */
    final Thread thread;
    /**
     * Initial task to run.  Possibly null.
     */
    Runnable firstTask;
    /**
     * Per-thread task counter
     */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     *
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /**
     * Delegates main run loop to outer runWorker
     */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock() {
        acquire(1);
    }

    public boolean tryLock() {
        return tryAcquire(1);
    }

    public void unlock() {
        release(1);
    }

    public boolean isLocked() {
        return isHeldExclusively();
    }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

可以看到,Worker类主要做了以下几件事情:

  1. 实现了Runnable接口,用于执行任务。
  2. 实现了AbstractQueuedSynchronizer类,用实现线程的同步,保证同一时间只有一个线程在执行任务。
  3. 每个Worker对象都有一个firstTask属性,表示Worker执行的第一个任务。
  4. 每个Worker对象都有一个completedTasks属性,表示Worker执行的任务数量。

在Worker类中,最重要的方法是runWorker方法,它是线程池中真正执行任务的方法,其源码如下:


final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

可以看到,runWorker方法主要做了以下几件事情:

  1. 获取Worker对象的firstTask属性,并将其置为null。
  2. 允许中断,执行任务。
  3. 如果线程池状态为STOP或SHUTDOWN,并且当前线程没有被中断,则中断当前线程。
  4. 执行任务,并在任务执行前后调用beforeExecute和afterExecute方法。
  5. 在任务执行完成后,更新Worker对象的completedTasks属性,并将Worker对象解锁。
  6. 循环执行任务,直到线程池状态为STOP或SHUTDOWN。

通过以上源码解读,我们可以对Java线程池的实现原理有更加深入的理解。