likes
comments
collection
share

深入浅出线程池

作者站长头像
站长
· 阅读数 16

为什么要用线程池?

  1. 通过重复利用已经存在的线程,降低线程的开销
  2. 防止线程无休止的创建,方便并发数的管控,防止cpu过度切换
  3. 提高系统相应速度,利用已经存在的线程,无需创建

简单线程池的使用

对于 ThreadPoolExecutor 这个类相比大家也并不陌生吧,它常常用于创建各种线程池。需要定义 核心线程池大小 ,最大线程池大小,最大存活时间,和利用什么结构存储去生成线程池。

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,1,1,TimeUnit.SECONDS,new ArrayBlockingQueue(1000) {});

最常用的两个方法就是execute(),和submit()两个方法了

        threadPoolExecutor.execute(()->{
            System.out.println("execute task");
        });

        Future<?> submitTask = threadPoolExecutor.submit(() -> {
            System.out.println("submit task");
        });

execute** **没有返回值,如果runnable 方法里面存在异常,会直接抛出,submit是将异常返回给futureTask,只有调用

submitTask.get();

才能看到异常。这两个方法也都是可以达到异步的效果的。

业务使用

拿消息通知业务来举例。现在公司内部有一些需要审核的消息和要执行的操作。 而这些需要审核的业务耗时是很长的,且暂时我们只是进行对审核的业务“发出”这样的一个操作,因此无需返回值。此时,我们就可以把它丢入线程池去异步执行。

public class AsyncThreadPool {

    public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            1,
            1,
            1,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));

    /**
     * 发出审核+执行操作
     * @return
     */
    static boolean doAudit(){
        System.out.println("执行操作");

        threadPoolExecutor.execute(()->{
            try {
                //模拟发出审核的耗时比较长
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("发出审核");
        });
        return true;
    }



    public static void main(String[] args) {
        doAudit();
    }
}

深入理解线程池原理

如果想要深入理解线程池原理,就不得不又回到我们开头讲到的一些传入线程池的几个参数了。

  1. corePoolSize:核心线程数
  2. maximumPoolSize:最大线程数
  3. workCount:工作线程数

我们先看这三个参数,这也是线程池的核心。

故事讲解

下面我我们用最生动的方式,讲述任务进入线程池内部执行流程。 你是一号任务,你需要一个线程去执行你自己。因此,你进入了线程的房间——线程池。你敲敲门,询问当前房间的大小。 房间内的人回答:现在的核心线程是空闲的,你可以进来让核心线程执行你! 二号任务进来了,再次询问。 房间内的人回答:现在的核心线程已经满了的,你站在门口等吧! 于是,二号线程就会进入任务队列中进行等待。 三号任务进来了,再次询问。 房间内的人回答:现在的核心线程已经满了的,任务队列也满了!但是房间内还没有达到最大线程数, 所以,我就勉为其难创建一些非核心线程(worker线程) 去执行你吧! 于是,三号任务进入非核心线程里被处理 四号任务有点倒霉了。再次询问。 房间内的人回答:现在的核心线程已经满了的,任务队列也满了!且达到最大线程数, 所以,请你离开吧! 四号任务就被无情拒绝了!!

小总结

  • 当新任务进来的时候,线程会优先分配给空闲的核心线程。
  • 如果核心线程都处于忙碌状态,那就会被放入一个队列中。
  • 如果队列满了,就会放入非核心线程处理。
  • 如果非核心线程都无法处理,那么线程池自然会采取拒绝策略。

对于这些参数,比如 corePoolSize(核心线程数),需要根据I/O密集型和CPU密集型任务去设置。网上常说会有一套公式去定义这些参数,但是其实这些参数也需要动态调整才算合理。

Spring线程池的使用

通常,在Spring中,我们会利用 Async 注解去实现异步。 长这样:

    @Async
    public void testAsync(){
        System.out.println(Thread.currentThread().getName()+" async test");
    }

它的底层使用的是自带的线程池。叫 applicationTaskExecutor。它有一个致命的缺陷。 深入浅出线程池 很离谱的是,这个线程池的maxPoolSize很大。按照我们刚才的故事讲解,几乎所有任务来了都会被放入非核心线程里,因此会导致oom异常。 为了避免这样的情况发生,我们需要去替换这个自定义线程池。

@Configuration
public class AsyncExecuteConfig extends AsyncConfigurerSupport {

    @Bean
    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(3);
        threadPool.setMaxPoolSize(3);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        return threadPool;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }
}