likes
comments
collection
share

ThreadPoolExecutor线程池核心原理

作者站长头像
站长
· 阅读数 40

大家好,我是小趴菜,在面试中,我们经常会被面试官问到线程池的一些问题,比如

  • 1:线程池执行流程
  • 2:线程池是如何回收非核心线程
  • 3:线程池如何维护核心线程
  • 4:.........

今天我们来分析一下线程池的整体核心流程,帮助大家彻底了解线程池的底层核心原理

关于线程池的用法如下,我们要分析的入口就是submit()方法,这是将任务提交到线程池的方法

ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(1,2,3000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

threadPoolExecutor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(120);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(120);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

threadPoolExecutor.shutdown();
public Future<?> submit(Runnable task) {
    //判断提交的任务是否为空,为空就直接抛出空指针异常
    if (task == null) throw new NullPointerException();
    
    //将我们的任务封装成一个RunnableFuture,后续可以通过Future.get()方法获取我们任务的返回结果
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //核心方法,将任务提交给线程池执行
    execute(ftask);
    return ftask;
}

execute(ftask);最后进入ThreadPoolExecutor这个类中

public void execute(Runnable command) {
    //对我们的任务再次进行判空处理
    if (command == null)
        throw new NullPointerException();
    
    //拿到我们线程池中此时存活的线程数
    int c = ctl.get();
    //如果存活的线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果此时线程池存活的线程数不小于核心线程数
    //isRunning(c):判断此时线程池的状态,如果是Running,就可以接收新的任务
    //workQueue.offer(command): 将任务放入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            //线程池的拒绝策略,也就是拒绝了这个任务
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
    //如果队列满了,也就是存活线程数不小于核心线程数并且队列也已经满了
    //也就是判断线程数是否大于线程池最大的线程数
    else if (!addWorker(command, false))
        //线程池的拒绝策略,也就是拒绝了这个任务
        reject(command);
}

在execute(ftask);方法中,出现最多的就是addWorker()这个方法,接下来我们看下这个方法的作用是什么

//firstTask:是我们要提交给线程池的任务
//core:true表示的是这个线程是核心线程,也就是线程数小于核心线程数时候创建的线程,
        false,表示是可以回收的线程,表示此时线程数已经不小于核心线程数了
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    
    //对于我们要了解线程池核心原理,这个for(;;)其实没必要去深入了解,我们可以直接忽略
    //我们先把整体的流程搞明白,对于一些不重要的步骤我们可以直接忽略
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 判断线程池的状态,有RUNNING,STOP,SHUTDOWN等一些状态,只要不是RUNNING状态
        //那么线程池就不会接收新的任务,所以这里直接会返回false,但是我们此时的
        //线程池状态是RUNNING,所以不会返回进入这个if分支
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //拿到此时线程池存活的线程数
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //我们直接从这里开始分析
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //将我们的任务封装成一个Worker对象,此时这里就创建了一个新的线程
        w = new Worker(firstTask);
        //从Worker对象中拿到执行当前任务的线程
        final Thread t = w.thread;
        //判断线程是否为空
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
           
            mainLock.lock();
            try {
                
                int rs = runStateOf(ctl.get());
                
                //这里还是判断线程池的状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //将我们创建的新的线程放入到workers中
                    //private final HashSet<Worker> workers = new HashSet<Worker>();
                    //其实workers就是一个Set集合
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //调用线程的start()方法去真正执行我们的业务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

因为我们的任务被封装成一个Worker的对象,而Worker又实现了Runnable接口,所以在调用start()方法时候,就会执行它的run()方法

ThreadPoolExecutor线程池核心原理

// Worker中的run()方法
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //拿到我们提交给线程池的任务,注意:此时task不为空!!!!!!
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //因为此时的task不为空,所以会直接进入到while循环中去
        while (task != null || (task = getTask()) != null) {
            w.lock();
            
            //判断线程是否被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //执行任务的前置业务逻辑,因为我们没有实现,所以这里不做任务处理
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //真正去调用我们的业务方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务的后置业务逻辑,因为我们没有实现,所以这里不做任务处理
                    afterExecute(task, thrown);
                }
            } finally {
                //此时将task设置为null
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

第一次进入while循环就结束了,结束之后将task设置为null,因为这是个while循环,所以会再次进入这个while循环中去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //再次进入到这个while循环中来,此时的task已经为null了,所以第一个判断为false
        //接下来就会执行task = getTask()方法
        while (task != null || (task = getTask()) != null) {
            w.lock();
            
            //判断线程是否被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //执行任务的前置业务逻辑,因为我们没有实现,所以这里不做任务处理
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //真正去调用我们的业务方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务的后置业务逻辑,因为我们没有实现,所以这里不做任务处理
                    afterExecute(task, thrown);
                }
            } finally {
                //此时将task设置为null
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    //这里是个死循环
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果此时线程池状态等于RUNING,并且队列已经为空了,就直接返回null
        //但是此时我们的线程池状态是RUNNING,所以不会进入到这个if分支
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //将线程池的线程数减1
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //allowCoreThreadTimeOut:false 表示如果没有任务,那么核心线程在超时时间达到之后就会回收
        // true:表示核心线程不会被回收
        // 我们可以通过 threadPoolExecutor.allowCoreThreadTimeOut(false);来设置
        // allowCoreThreadTimeOut默认值就是false,所以timed的值就是由wc > corePoolSize来控制
        //如果此时存活的线程数大于核心线程数,那么就是true,否则就是false
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //   如果此时存活的线程数大于最大线程数并且队列为空
        //   就执行compareAndDecrementWorkerCount(c),将线程数减1,直接返回null,
        //   返回null之后,上层while循环就直接退出,然后将这个线程回收
        //   这个if的作用就是如果此时线程数大于最大线程数了,此时队列又没有任务,这时候就将
        //   那么就直接将这些线程回收
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        //如果timed = true:表示此时的线程数大于核心线程数,但是小于最大线程数,此时直接调用
        // 阻塞队列的poll方法,并在我们设置的存活时间之后如果还没有获取到元素,就直接回收这条线程
        
        // 如果timed = false:表示此时的线程数小于或等于核心线程数,那么此时就调用阻塞队列的
        // take()阻塞方法,知道队列中有任务
        
        //所以核心线程之所以能存活,就是因为调用了take()方法一直阻塞在这里,
        //非核心线程是有阻塞时间的,超过这个时间没有任务就会被回收
        try {
            Runnable r = timed ?
                //非核心线程会回收的核心原理
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                //核心线程存活的核心原理
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
转载自:https://juejin.cn/post/7258266800739237947
评论
请登录