likes
comments
collection
share

被线上的线程池疯狂折磨,深究线程池原理并根据业务场景更改其工作流程文章介绍 本文会从为什么使用线程池、如何使用线程池、线

作者站长头像
站长
· 阅读数 40

文章介绍

写这篇文章是因为我们后台有一个看板页面,并且指标非常多,查询条件也非常复杂导致没法做宽表只能实时从各个数据源查询出来并计算;想让接口响应时间更短一些更改了很多次线程池参数,但都达不到理想状态,为此研究了一下线程池原理,并且实现了一个适合自己业务场景的线程池工作。 本文会从为什么使用线程池、如何使用线程池、线程池的整体工作流程、线程池源码、手动实现一个适合实时io型密集型的线程池。

线程池

java中创建线程需要分配一定的内存和系统资源,频繁的创建销毁线程会带来较大的系统开销;同时创建大量的线程也会使cpu需要消耗更多的时间来管理线程,而影响整体性能;同时因为无法控制线程的数量,可能导致并发过多影响效率以及响应时间等一系列的问题。此时就需要我们有一个可以集中管理线程的工具线程池。 线程池的主要优点包括: 1.降低资源消耗:通过重复利用已创建的线程,减少了线程创建和销毁的开销。 2.提高响应速度:任务无需等待线程创建就能立即执行。 3.提高线程的可管理性:可以统一管理线程的数量、状态等。

使用

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            2,
            1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    // ...
                    return t;
                }
            },
            new ThreadPoolExecutor.AbortPolicy()
    );

    executor.execute(任务);
    executor.submit(有返回结果的任务);
    
    executor.shutdown(销毁);
    executor.shutdownNow(立即销毁);
}

通过上述代码可以创建一个线程池,当然大部分人会直接使用spring自带的,那么我们来看一下传入构造方法中的各个参数以及他们的含义:

public ThreadPoolExecutor(int corePoolSize, -- 核心线程数
                          int maximumPoolSize, -- 最大线程数
                          long keepAliveTime, -- 空闲时间,空闲超过该时间会被收回
                          TimeUnit unit, -- 空闲时间的单位
                          BlockingQueue<Runnable> workQueue, -- 队列,用来暂存任务
                          ThreadFactory threadFactory, -- 线程工厂,用来创建线程
                          RejectedExecutionHandler handler -- 拒绝策略
                          )

在接下来的原理详解中会逐步分析各个参数的含义以及线程池如何执行与销毁的。

原理

ThreadPoolExecutor状态属性

打开ThreadPoolExecutor类直接看到的就是以下属性,熟悉以下属性的含义可以让我们理解ThreadPoolExecutor源码更加容易。

// AtomicInteger,写操作用CAS实现,保证了原子性
// ctl维护这线程池的2个核心内容:
// 1:线程池状态(高3位,维护着线程池状态)
// 2:工作线程数量(核心线程+非核心线程,低29位,维护着工作线程个数)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// Integer.SIZE = 32 , COUNT_BITS = 29;
private static final int COUNT_BITS = Integer.SIZE - 3;

// 工作线程的最大个数 
// 数字1 往左移动29位,再减1。
// (0010 0000 0000 0000 0000 0000 0000 0000) - 1 
//   0001 1111 1111 1111 1111 1111 1111 1111 
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 线程池运行状态
// 运行中 1110 0000 0000 0000 0000 0000 0000 0000 
private static final int RUNNING    = -1 << COUNT_BITS;

// 不会接收新任务,但是会处理正在执行以及工作队列中的任务
// 1110 0000 0000 0000 0000 0000 0000 0000 
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 不会接收新任务,会立即中断正在执行任务的线程,不会处理工作队列中的任务
// 运行中 0010 0000 0000 0000 0000 0000 0000 0000 
private static final int STOP       =  1 << COUNT_BITS;

// 工作队列任务都处理完成,工作线程都移除,执行用户实现的 `terminated`
// 运行中 0100 0000 0000 0000 0000 0000 0000 0000 
private static final int TIDYING    =  2 << COUNT_BITS;

// 完全销毁状态
// 运行中 0110 0000 0000 0000 0000 0000 0000 0000 
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取线程池状态 通过ctl与~COUNT_MASK,例如现在是运行中状态有1+2+4=7个工作线程
// 1110 0000 0000 0000 0000 0000 0000 0111
// 1110 0000 0000 0000 0000 0000 0000 0000 
// 结果是RUNNING
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }

// 获取工作线程个数,例如现在是运行中有1+2+4=7个工作线程
// 1110 0000 0000 0000 0000 0000 0000 0111 
// 0001 1111 1111 1111 1111 1111 1111 1111 
// 结果是7
private static int workerCountOf(int c)  { return c & COUNT_MASK; }

这里配上一张线程状态流转图帮助大家理解:

被线上的线程池疯狂折磨,深究线程池原理并根据业务场景更改其工作流程文章介绍 本文会从为什么使用线程池、如何使用线程池、线

execute方法

无论是调用execute还是submit最终都是调用execute方法进行任务执行,两者的区别是submit对任务封装了一层RunnableFuture。execute代码如下:

public void execute(Runnable command) {
    // 如果任务是空,抛出空指针
    if (command == null)
        throw new NullPointerException();
        
    // 获取到ctl的值
    int c = ctl.get();
    
    // 通过ctl拿到工作线程个数,判断是否小于核心线程
    if (workerCountOf(c) < corePoolSize) {
    
        // 添加一个携带任务的核心线程;用第二个参数true、false来判断是否为核心线程。
        if (addWorker(command, true))
            // 添加成功,则执行成功。
            return;
            
        // 添加失败,重新获取ctl,再次检查状态
        c = ctl.get();
    }
    // 核心线程数已经到了最大值;添加核心线程时,线程执行了showdown/showdownNow
    // 判断线程是否还在运行 && 是运行,在任务队列中添加任务。
    if (isRunning(c) && workQueue.offer(command)) {
    
        // 再次获取状态
        int recheck = ctl.get();
        
        // 检查状态,如果不是运行中状态 && 从工作队列移除任务
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略
            reject(command);
        // 如果工作线程数为0。 例如核心线程数设置为0 || 核心线程允许超时销毁
        else if (workerCountOf(recheck) == 0)
            // 创建一个不携带任务的非核心线程来处理任务
            addWorker(null, false);
    }
    
    // 不是运行中状态或者任务队列添加满任务了,添加非核心线程(携带任务)来处理任务。
    else if (!addWorker(command, false))
    
        // 非核心线程添加失败,执行拒绝策略。
        reject(command);
}

通过上述方法,可以很清楚这是非常常见的面试题,线程池的执行流程:

  1. 当提交一个新任务到线程池时,线程池会检查当前运行的线程数是否少于核心线程数(corePoolSize)。
  2. 如果当前运行的线程数少于核心线程数,则创建一个新线程来执行任务,即使有空闲的核心线程也创建新的线程。
  3. 如果当前运行的线程数等于或者多于核心线程数,但任务队列未满,则将任务添加到任务队列中等待执行。
  4. 如果任务队列已满,线程池会检查当前运行的线程数是否少于最大线程数(maximumPoolSize)。
  5. 如果当前运行的线程数少于最大线程数,则创建新的非核心线程来执行任务。
  6. 如果当前运行的线程数等于最大线程数,且任务队列已满,则根据拒绝策略来处理这个任务。
  7. 队列中如果线程数等于0,则创建一个非核心线程来处理队列中的任务。

addWorker 添加线程

private boolean addWorker(Runnable firstTask, boolean core) {
    // 循环标识
    retry:
    // 获取ctl数值
    for (int c = ctl.get();;) {
        
/**
* runStateAtLeast方法就是比较一下两个状态的大小,这里用来判断线程池状态
* private static boolean runStateAtLeast(int c, int s) {
*    return c >= s;
*}
**/     
        // 这里比较难理解: 线程池的状态需要大于SHUTDOWN状态 && 三个条件需要至少满足其中一个条件。
        // 第一个是STOP,只有调用showdownNow,或者任务都处理完才会大于等于STOP
        // 第二个是当firstTask不等于空
        // 第三个是任务队列为空
        // 以上是创建线程失败。
        // 第二第三可以转换一下代码为 
        // ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
        // 当状态为SHUTDOWN时,携带的任务是空,并且任务队列不为空,需要添加成功。如果不是这个状态,添加失败。 此时是因为调用showdown时需要处理任务队列中的任务。firstTask为空,说明此时线程池中没有工作线程,如果添加失败的话,那么任务队列中的任务永远无法被执行。 下面还有类似的逻辑。
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        
        // 判断工作线程个数
        for (;;) {
            // 获取工作线程个数是否超过 核心线程工作线程数 / 最大核心线程数
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // 对工作线程+1操作
            if (compareAndIncrementWorkerCount(c))
                // 数量添加成功,直接跳到外部循环。
                break retry;
                
            //添加失败,重新获取ctl
            c = ctl.get();  // Re-read ctl
            // 判断线程池是否已经开始shotdown
            if (runStateAtLeast(c, SHUTDOWN))
                // shutdown,重新走一遍上树流程
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 工作线程启动标识
    boolean workerStarted = false;
    // 工作线程添加标识
    boolean workerAdded = false;
    // 工作线程
    Worker w = null;
    try {
        // 创建工作线程,携带任务
        w = new Worker(firstTask);
        // 获取worker内的w.thread
        final Thread t = w.thread;
        
        // 防止实现的线程工厂有bug,创建一个空线程
        if (t != null) {
            // 启动线程过程中,防止线程池被销毁从而漏掉该线程,使其变为游离态。
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次获取ctl
                int c = ctl.get();
                
                // 运行中状态 || 小于STOP状态,也就是SHUTDOWN状态,上面有解释。
                if (isRunning(c) || 
                    (runStateLessThan(c, STOP) && firstTask == null)) {

                    // 如果线程状态不是就绪状态,抛出异常。防止线程工作启动线程
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    
                    // 添加工作线程到workers中(hashSet)
                    workers.add(w);
                    
                    // 添加线程标识成功
                    workerAdded = true;
                    
                    // 获取线程数,设置largestPoolSize
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                // 释放锁资源
                mainLock.unlock();
            }
            // 工作线程添加成功
            if (workerAdded) {
                // 启动线程,并更改线程启动标识
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果没有执行到线程启动标识设置为true
        if (! workerStarted)
        
            // 走添加工作线程失败逻辑
            addWorkerFailed(w);
    }
    // 返回工作线程启动标识
    return workerStarted;
}
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 线程不为空
        if (w != null)
            // 从工作线程队列中移除工作线程
            workers.remove(w);
        // 工作线程减1
        decrementWorkerCount();
        // 尝试执行`tryTerminate`,结束当前线程池。
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

runWorker 执行任务

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    
    final Thread thread;
    
    Runnable firstTask;
    
    Worker(Runnable firstTask) {
        // 互斥资源设置为-1
        setState(-1); 
        // 设置当前任务
        this.firstTask = firstTask;
        // 通过线程池创建线程
        this.thread = getThreadFactory().newThread(this);
    }

    // 因为实现了Runnable,调用thread.start就会走到这里
    public void run() {
        runWorker(this);
    }
final void runWorker(Worker w) {
    // 拿到当前线程
    Thread wt = Thread.currentThread();
    // 获取存放的任务
    Runnable task = w.firstTask;
    // 将任务清空
    w.firstTask = null;
    // 允许中断
    w.unlock(); // allow interrupts
    
    // 突然退出标识
    boolean completedAbruptly = true;
    try {
        // 如果worker自身携带任务则直接执行,否则去任务队列中取。
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 判断线程池状态是否大于等于STOP。如果是 则要中断当前线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 中断
                wt.interrupt();
            try {
                // 钩子函数
                beforeExecute(wt, task);
                try {
                    // 执行任务
                    task.run();
                    // 后值钩子函数
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    //  后值钩子函数
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                // 任务设置为空
                task = null;
                // 完成任务数量加一
                w.completedTasks++;
                w.unlock();
            }
        }
        // 突然退出标识为false
        completedAbruptly = false;
    } finally {
        // 
        processWorkerExit(w, completedAbruptly);
    }
}

processWorkerExit 工作线程退出

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 不是正常退出
    if (completedAbruptly) 
        // 工作线程数量减1
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将当前工作线程完成的任务数量,加给整个线程池完成的任务数
        completedTaskCount += w.completedTasks;
        // 移除当前工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 尝试结束当前线程池
    tryTerminate();
    
    // 获取线程池状态
    int c = ctl.get();
    // 判断线程池状态还为停止
    if (runStateLessThan(c, STOP)) {
        // 如果是正常退出
        if (!completedAbruptly) {
            // 是否允许核心线程超时,如果允许的话则为0,不允许则为corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 当工作线程为0,并且任务队列中还有任务
            if (min == 0 && ! workQueue.isEmpty())
                // 把最小工作线程设置为1
                min = 1;
            // 如果工作线程数量大于1,则返回
            if (workerCountOf(c) >= min)
                // 直接返回
                return; 
        }
        
        // 根据上面的判断,走到这里的逻辑是 工作线程数为0,并且任务队列中还有任务。
        // 添加空任务的非核心线程,处理任务队列中的任务。
        addWorker(null, false);
    }
}

getTask 工作线程获取任务

private Runnable getTask() {
    // 超时标识
    boolean timedOut = false; 
    for (;;) {
        // 获取状态
        int c = ctl.get();

        // 线程池状态判断
        // 如果线程池状态为SHUTDOWN && 工作队列为空
        // 如果线程池状态为STOP
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            // 减少当前线程数量
            decrementWorkerCount();
            return null;
        }
        
        // 获取工作线程数量
        int wc = workerCountOf(c);

        // 判断核心线程是否允许超时?
        // 工作线程个数是否大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        //(工作线程是否超过最大线程数||允许超时并且已经超时)&&(线程数量大于1 || 工作队列为null )   因为工作线程大于1时,还有其他线程可以执行,当工作队列为空时,就算没有线程了也无所谓
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 工作线程数有问题,数量减少1。
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果是非核心,走poll,拉取工作队列任务,
            // 如果是核心线程,走take一直阻塞,拉取工作队列任务
            
            // 当工作队列没有任务时,这时就会被Condition通过await阻塞线程
            // 当有任务添加到工作线程后,这是添加完任务后,就会用过Condition.signal唤醒阻塞的线程
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 执行的poll方法,并且在指定时间没拿到任务,
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

reject 拒绝策略

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

拒绝策略完全是执行各种实现RejectedExecutionHandler的策略类。线程池提供的拒绝策略: 拒绝策略:线程池提供的拒绝策略,一般不适合你的业务场景时,你就自己定义即可。

  • AbortPolicy:抛出异常!
  • CallerRunsPolicy:让提交任务的线程处理这个任务!
  • DiscardPolicy:啥也不做,任务没了!
  • DiscardOldestPolicy:扔掉队列最前面的任务,尝试把当前任务添加进去!

实现适合自身业务的线程池

背景

在平时业务开发过程中,有一些数据查询场景需要并行查询一些任务返回给前端;此时就会有一些并行任务,希望每一个查询任务进入到线程池中都立刻会有线程来处理,而不是放到任务队列中,因为只要放到任务队列中等待线程take,那么整个接口的响应时间就会增加。

此时如何设置核心线程数、最大线程数、最大队列是一个很头疼的问题,刚开始引入了hippo4j,它是一个可以动态的修改线程线程池属性的框架,我们通过观察线程池的各种指标对线程池属性进行修改,最后的结果就是核心线程数量非常多,因为只有队列满了才会创建非核心线程,如果队列设置为0又担心任务太多,导致任务都丢弃了。此时就想更改一下线程池的工作流程,希望来一个任务就创建一个线程,一直创建到核心线程池数为止,后续再来任务判断当前活跃的线程+队列中任务数量是否小于核心线程数,如果小于放入到队列中,如果大于直接创建非核心线程池;这样就能保证所有的任务都立刻有线程执行。

实现

开始是想重写execute方法,当开始写的时候发现很多方法是私有的都没办法直接调用;此时就想到了可以从加入队列入手,只需要把队列的offer方法重写即可。

自定义队列队列:

public class TestTaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;

    private ThreadPoolExecutor executor;

    public TestTaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(ThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
        // 必须设置线程池,需要从线程池中获取数据信息。
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        
        // 获取当前线程数量
        int currentPoolThreadSize = executor.getPoolSize();
        // 获取活跃线程数量 + 队列数量 < 核心线程数
        if (executor.getActiveCount() + size() < currentPoolThreadSize) {
            // 入队,有空余核心线程可以进行处理
            return super.offer(runnable);
        }
        
        // 如果当前线程数量小于最大线程数量,则返回false,会直接创建非核心线程。
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }
        
        // 如果当前线程达到最大线程数,则直接放入队列中
        return super.offer(runnable);
    }

    
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}

使用方式:


TestTaskQueue<Runnable> testTaskQueue = new TestTaskQueue<>(DEFAULT_QUEUE_CAPACITY);

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        DEFAULT_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, DEFAULT_THREAD_TIMEOUT, TimeUnit.MINUTES, testTaskQueue,
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        },
        new ThreadPoolExecutor.CallerRunsPolicy());
testTaskQueue.setExecutor(executor);

转载自:https://juejin.cn/post/7402444509152952332
评论
请登录