likes
comments
collection
share

JUC(2)线程池一览

作者站长头像
站长
· 阅读数 38

前言

本篇主要介绍线程池相关的知识点和面试题知识。

一、什么是线程池

说到线程池,我们可以类比 JDBC 连接池。在之前我们操作数据库连接的时候,有一个创建连接和销毁连接的过程,这个资源开销是比较大的。而真的业务即拿到连接后执行的语句消耗的资源通常是较小的,为了减少在创建连接销毁连接的资源开销,提高系统的性能,我们引入了 JDBC 连接池,系统启动的时候,先放入很多连接放入池子里面,使用的时候,直接从连接池中获取一个,完毕之后返回到池子里面,继续给后续调用者使用,这就整体提高了系统的性能。

线程池和数据库连接池的原理也差不多,创建线程去处理业务,可能创建线程的时间比处理业务的时间还长一些,所以我们就可以使用线程池。创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池中。

二、为什么使用线程池

线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程进行阻塞队列排队等待,执行其他线程执行完毕,再依次执行。

主要特点:线程复用、控制最大并发数、管理线程

  • 1、降低资源消耗,减少资源创建和销毁的性能开销;
  • 2、提高响应速度,当任务到达时,任务可以不需要等待线程创建就立即执行;
  • 3、提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

\

备注:就记住这三个好处即可。来自《java 并发编程艺术》

三、线程池的使用

常见的线程池有 FixedThreadPool,SingleThreadExecutor、CachedThreadPool 和 ScheduledThreadPool。这几个都是 ExecutorService(线程池)实例。

3.1 Executors.newFixedThreadPool(int)

newFixedThreadPool创建的线程池 corePoolSize 和 maximumPoolSize 值是相等的,它使用的是LinkedBlockingQueue 执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • 适用场景:适合处理 CPU 密集型的任务,在长期被工作线程使用的情况下,核心线程不会被回收。但在任务比较多的时候会导致 OOM,不会拒绝任务。

3.2 Executors.newSingleThreadExecutor()

newSingleThreadExecutor 创建的线程池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue一个任务一个任务的执行,一池一线程.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 适用场景:适合串行执行任务的场景。

3.3 Executors.newCachedThreadPool()

newCachedThreadPool创建的线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 线程池
 * Arrays
 * Collections
 * Executors
 */
public class MyThreadPoolDemo {

    public static void main(String[] args) {
        //List list = new ArrayList();
        //List list = Arrays.asList("a","b");
        //固定数的线程池,一池五线程

//       ExecutorService threadPool =  Executors.newFixedThreadPool(5); //一个银行网点,5个受理业务的窗口
//       ExecutorService threadPool =  Executors.newSingleThreadExecutor(); //一个银行网点,1个受理业务的窗口
       ExecutorService threadPool =  Executors.newCachedThreadPool(); //一个银行网点,可扩展受理业务的窗口

        //10个顾客请求
        try {
            for (int i = 1; i <=10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

    }
}

四、ThreadPoolExecutor 底层原理

JUC(2)线程池一览

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize :核心线程大小。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创建新线程,等到工作的线程数大于核心线程数时就不会在创建了,如果调用了线程池的 prestartAllCoreThreads 方法,线程池会提前把核心线程都创造好,并启动。
  • maximumPoolSize:线程池允许创建的最大线程数,此值必须大于等于1。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果我们使用了无界队列,那么所有的任务会假如队列,这个参数就没有什么效果了。
  • keepAliveTime :多余的空闲线程的存活时间,当前线程池中线程数量超过 corePoolSize 时,当空闲时间,达到 KeepAliveTime 时,多余线程会被销毁直到只剩下 corePoolSize 个线程为止,如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率。
  • unit : keepAliveTime 的时间单位,可以选择的单位有 天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。
  • workQueue : 任务队列,被提交但尚未被执行的任务,用于缓存待处理任务的阻塞队列
  • threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可。也可以通过线程工厂给每个创建出来的线程设置更有意义的名字
  • handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何拒绝请求执行的 runnable 的策略。

JUC(2)线程池一览

线程池的执行原理是根据线程池的参数设置有关,比如核心线程数,最大线程数,阻塞队列等。每当有新的任务来临时,它的任务是这样的。

调用线程池的 execute 方法处理任务,执行 execute 方法的过程:

  • 1、判断线程池中运行的线程个数是否小于核心线程数,如果是,则创建线程来处理任务,如果否,则执行下步;
  • 2、尝试将任务放入 workQueue 指定的队列中,如果队列满了,则执行下步;
  • 3、判断线程池中运行的线程数是否小于 maximumPoolSize,如果是:则新增线程处理当前传入的任务,否:将任务传递给 handler 对象的拒绝策略方法处理。
  • 4、当一个线程完成任务时,它会从队列中取下一个任务来执行;
  • 5、当一个线程无事可做超过一定时间,线程会判断:如果当前运行的线程数大于 核心线程数,那么这个线程就会停掉;所以线程池的所有任务完成后,它最终会收缩到 核心线程数的大小。

五、拒绝策略

5.1 线程池的拒绝策略

当阻塞队列排满了,且线程池中的线程数达到了最大线程数,这个时候就需要拒绝策略机制来处理新来的请求任务。

JDK 内置的四种拒绝策略

  • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行,拒绝任务,并抛出异常
  • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是调用任务线程执行当前任务
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交任务
  • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略

以上四种策略均实现了 RejectedExecutionHandle 接口

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo5 {
    static class Task implements Runnable {
        String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "处理" + this.name);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public String toString() {
            return "Task{" +
                    "name='" + name + ''' +
                    '}';
        }
    }
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
                1,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                Executors.defaultThreadFactory(),
                (r, executors) -> {
                    //自定义饱和策略
                    //记录一下无法处理的任务
                    System.out.println("无法处理的任务:" + r.toString());
                });
        for (int i = 0; i < 5; i++) {
            executor.execute(new Task("任务-" + i));
        }
        executor.shutdown();
    }
}
无法处理的任务:Task{name='任务-2'}
无法处理的任务:Task{name='任务-3'}
pool-1-thread-1处理任务-0
无法处理的任务:Task{name='任务-4'}
pool-1-thread-1处理任务-1

输出结果中可以看到有3个任务进入了饱和策略中,记录了任务的日志,对于无法处理多任务,我们最好能够记录一下,让开发人员能够知道。任务进入了饱和策略,说明线程池的配置可能不是太合理,或者机器的性能有限,需要做一些优化调整。

5.2 生产中合理的设置参数

要想合理的配置线程池,要分析任务的特性,可以从以下几个角度分析:

  • 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务
  • 任务的优先级:高、中、低;
  • 任务的执行时间:长、中、短
  • 任务的依赖性:是否依赖其他的系统资源,如数据库连接

性质不同任务可以用不同规模的线程池分开处理。

  • CPU密集型任务应该尽可能小的线程,如配置cpu数量+1个线程的线程池。
  • 由于IO密集型任务并不是一直在执行任务,不能让cpu闲着,则应配置尽可能多的线程,如:cup数量*2。
  • 混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这2个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。可以通过Runtime.getRuntime().availableProcessors()方法获取cpu数量。优先级不同任务可以对线程池采用优先级队列来处理,让优先级高的先执行。

使用队列的时候建议使用有界队列,有界队列增加了系统的稳定性,如果采用无解队列,任务太多的时候可能导致系统OOM,直接让系统宕机。

线程池汇总线程大小对系统的性能有一定的影响,我们的目标是希望系统能够发挥最好的性能,过多或者过小的线程数量无法有消息的使用机器的性能。咋Java Concurrency inPractice书中给出了估算线程池大小的公式:

Ncpu = CUP的数量
Ucpu = 目标CPU的使用率,0<=Ucpu<=1
W/C = 等待时间与计算时间的比例
为保存处理器达到期望的使用率,最有的线程池的大小等于:
Nthreads = Ncpu × Ucpu × (1+W/C)

1、CPU 密集型

// 查看CPU核数
System. out .println(Runtime. getRuntime ().availableProcessors());

JUC(2)线程池一览

2、IO 密集型

由于 IO 密集型任务线程并不是一直执行任务,则赢配置尽可能多的线程,如 CPU 核数 * 2

JUC(2)线程池一览

六、BlockingQueue 阻塞队列

JUC(2)线程池一览

线程1往阻塞队列里添加元素,线程2从阻塞队列里移除元素

当队列是空的,从队列中获取元素的操作将会被阻塞

当队列是满的,从队列中添加元素的操作将会被阻塞

试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素

试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增

种类分析

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序
  • LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于
  • LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用这个队列
  • LinkedTransferQueue:由链表组成的无界阻塞队列。
  • LinkedBlockingDeque:由链表组成的双向阻塞队列。

注意:建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点。

七、自定义线程池

/**
 * @author xiao lei
 * @Date 2020/11/6
 * @module
 */
public class CustomThreadPoolExecutor {
    private ThreadPoolExecutor poolExecutor = null;
    /**
     * 线程池初始化方法
     * <p>
     * corePoolSize 核心线程池大小----1
     * maximumPoolSize 最大线程池大小----3
     * keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit
     * TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES
     * workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(5)====5容量的阻塞队列
     * threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂
     * rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,
     * 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),
     * 任务会交给RejectedExecutionHandler来处理
     */
    public void init() {
        poolExecutor = new ThreadPoolExecutor(
                7,
                7,
                30,
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(300),
                new CustomThreadFactory(),
                new CustomRejectedExecutionHandler());
    }
    public void destory() {
        if (poolExecutor != null) {
            poolExecutor.shutdownNow();
        }
    }
​
    public ExecutorService getCustomThreadPoolExecutor() {
        return this.poolExecutor;
    }

    private class CustomThreadFactory implements ThreadFactory {

        private AtomicInteger count = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
            System.out.println(threadName);
            t.setName(threadName);
            return t;
        }
    }

    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // 核心改造点,由blockingqueue的offer改成put阻塞方法
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

spring中使用,可动态配置

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {


    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
        return new ThreadPoolExecutor(
                pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
    }

}
@ConfigurationProperties(prefix = "xiaoleimall.thread")
@Component
@Data
@Primary
public class ThreadPoolConfigProperties {

    private Integer coreSize;

    private Integer maxSize;

    private Integer keepAliveTime;


}
#配置线程池
xiaoleimall.thread.coreSize=20
xiaoleimall.thread.maxSize=200
xiaoleimall.thread.keepAliveTime=10

八、阿里开发规范

1、【强制】在创建线程或线程池时,请指定有意义的线程名称,方便出错时回溯。

    public class TimeTaskThread extends Thread{
        public TimeTaskThread(){
            super.setName("TimeTaskThread");
        }
    }

2、【强制】线程资源必须通过线程池提供,不允许在应用中自行显示创建线程

【说明】:使用线程池的好处是减少资源开销。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者cpu切换太频繁的问题。

3、【强制】线程池不允许使用 Executors 创建,而是通过 ThreadPoolExecutor的方式创建,这样的处理方式能让编写代码的工程师更加明确线程池的运行规则,规避资源耗尽的风险。

说明:

  • Executors 返回的线程池对象的弊端如下
    • 1)FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
    • 2)CachedThreadPoll 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM
转载自:https://juejin.cn/post/7129013968038789156
评论
请登录