likes
comments
collection
share

JUC(5) : ForkJoinPool | 线程的极致管理

作者站长头像
站长
· 阅读数 37

一、前言

前文介绍了线程的异步编排工具类 CompletableFuture 的使用,使用它能够很好的完成线程任务的编排工作,但同时,我们也注意到,其使用的默认线程池是 ForkJoinPool.commonPool() 的方法。则这个线程池是共用的,而为了业务之间互不影响,比如 A 业务秒杀并发量大,占用了大多数的线程,那 B 业务再使用这个线程池的话,就无法很好的推进下去。

JUC(5) : ForkJoinPool | 线程的极致管理

观其源码也知在并行流时用到了 ForkJoinPool 的公共线程池。ForkJoinPool 是专门设计用于 Fork/Join 的线程池,什么是 Fork/Join 呢?这个 ForkJoinPool 这个线程池和我们学的 ThreadPoolExcutor 有啥区别呢?下图为ForkJoinPool 与我们熟知的线程池间的继承关系。下文,我们将解答这些疑惑,并学习相关概念并上手使用。

JUC(5) : ForkJoinPool | 线程的极致管理

二、ForkJoinPool

2.1 概述

首先,查看该类介绍,该类是 Doug Lea 在JDK1.7版本中添加的新的线程池。Fork为分叉,Join 为连接,而其设计的思想也确实是先分开处理后合并处理的工作原理。核心思想就是把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的结果。

Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并

这个思想就很像 Hadoop 中 MapReduce 的工作原理,都是先组装成各个 Map ,然后在 Reduce 中完成合并。 ForkJoinPool 也是使用了分而治之的算法,用相对较少的线程处理大量的任务,将大任务一拆为二,以此类推,每个子任务再分成两半,知道达到阈值(自己设定)然后组成 N 个具有父子层级关系的任务。这时候 ForkJoinPool 就会安排合理的工作线程,例如 8个线程,让它们不断得去干活,先把它们的最下层的小任务干完,接着再去干小任务的父任务,一层层直到任务结束。

JUC(5) : ForkJoinPool | 线程的极致管理

由此,可知既然它擅长干一些能拆分成父子任务的工作。所以例如快速排序、二叉查找等任务。最适合的是计算密集型的任务,比如是数据量很大的一个统计,如果是存在 IO交互,线程间同步,那就不太适合了。

2.2 累加案例

同样是线程池,为什么还要加个 ForkJoinPool 呢?你与我们老牌线程池有什么区别?就考虑一个场景,如果要计算 1-10000000 的复杂任务计算,如果使用一个原有的线程池 ThreadPool ,它只能有一个线程上场,而线程池里的其他线程就在那干瞪眼,这时候,为了进一步解决此类场景的问题,ForkJoinPool 提出了,下面我们通过一个计算案例简单了解下其是如何使用的。

public class ForkJoinPoolDemo {
    public static void main(String[] args) {
        singleDeal(1,100000000L);
        System.out.println("==============");
        ForkJoinDeal(1,100000000L);
    }
​
    private static void singleDeal(int start,long end) {
        long startTime = System.currentTimeMillis();
        long sum=0;
        for (int i = start; i <= end; i++) {
            // 累加
            sum +=i;
        }
        System.out.println(sum);
        System.out.println("单线程消耗的时间:"+(System.currentTimeMillis() - startTime));
    }
​
    private static void ForkJoinDeal(int start,long end) {
        long startTime = System.currentTimeMillis();
        //定义任务
        TaskExample taskExample = new TaskExample(start, end);
        //定义执行对象
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        //加入任务执行
        ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);
        //输出结果
        try {
            System.out.println(result.get());
        }catch (Exception e){
            e.printStackTrace(
        );
        }finally {
            forkJoinPool.shutdown();
            System.out.println("ForkJoin 消耗的时间:"+(System.currentTimeMillis() - startTime));
        }
    }
}
@Slf4j
public class TaskExample extends RecursiveTask<Long> {
    private long start;
    private long end;
    private long sum;
    /**
     * 构造函数
     * @param start
     * @param end
     */
    public TaskExample(long start, long end){
        this.start = start;
        this.end = end;
    }
    @Override
    protected Long compute() {
        // 大于 100 个数相加切分,小于直接加
        if(end -start <10000){
            for (long i = start; i <= end; i++) {
                // 累加
                sum +=i;
            }
        }else{
            // 切分为 2 块
            long middle = (start +end)/2;
            // 递归调用,切分为 2 个小任务
            TaskExample taskExample = new TaskExample(start, middle);
            TaskExample taskExample2 = new TaskExample(middle+1,end);
            // 执行:异步
            taskExample.fork();
            taskExample2.fork();
            // 同步阻塞获取执行结果
            sum = taskExample.join() + taskExample2.join();
        }
        return sum;
    }
}

JUC(5) : ForkJoinPool | 线程的极致管理

观察上述代码,使用 4个线程,去计算任务,将一个大的计算任务拆分为 0—10000 、10000-20000依次类推,然后最终计算结果被依次合并,得到最终的结果。

也看到 ForkJoin 比使用单线程计算会慢很多,但如果改变任务的切分粒度,比如提到10w 一个计算任务,那效率就会提示一些,所以,ForkJoin 在面对一些复杂的计算型任务时可以考虑,一般情况下也用不上。

2.3 ThreadPool 与 ForkJoinPool

事实上,它更多的是原有线程池的一个补充,在特定的场景下,使用它会更合适。下面对两者进行一个简单对比:

  • 应用场景:

    • ThreadPool :常用于线程并发,阻塞延时较长的任务,这张任务一般要求线程个数较多。
    • ForkJoinPool:用于大任务可分解成小任务的情况下,一般是处理计算密集型任务。
  • 基本原理对比:

    • ThreadPool:所有线程共用一个队列,然后这些线程排着队去干活,来一个任务,派一个线程去,避免了线程的创建和销毁的开销。

    JUC(5) : ForkJoinPool | 线程的极致管理

    • ForkJoinPool: 每个线程都是一个队列,可以用极少的线程干非常多的父子任务,然后将每个任务的结果合并进而得出最终结果。

    JUC(5) : ForkJoinPool | 线程的极致管理

工作窃取机制

ForkJoinPool 提供了一个有效利用线程池机制的方法,就是当一个任务执行完成后,就是自动从队列尾部获取新的任务去执行,这样在任务量很大的时候,CPU 多的计算机会表现出很好的性能。

JUC(5) : ForkJoinPool | 线程的极致管理

如图中的 A、B线程,分别从队列中读取任务,然后放入(push)进自己的队列中,同时也取出(pop)自己队列的任务进行消费,那为什么又放又取呢?不直接取最右边的任务?这主要是设计者为了CPU缓存的命中率,然后B 线程空闲时,可以偷取(poll) A 线程的未执行的任务。提高 线程工作效率。

三个API 方法:

方法名说明
invoke(ForkJoinTask)提交任务并一直阻塞直到任务执行完成返回合并结果
execute(ForkJoinTask)异步执行任务,无返回值。
submit(ForkJoinTask)异步执行任务,返回task本身,可以通过task.get()方法获取合并之后的结果。

2.4 ForkJoinTask

上面的计算案例中,我们创建出一个类,继承了 RecursiveTask ,然后交给 ForkJoinPool 提交任务。因为,ForkJoinPool 只会处理 ForkJoinTask 的任务。而我们用的 RecursiveTask 是其重要的实现类

JUC(5) : ForkJoinPool | 线程的极致管理

我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了几个子类:

  • RecursiveAction:用于没有返回结果的任务

  • RecursiveTask : 用于有返回结果的任务

  • CountedCompleter : 无返回结果,完成任务后可以触发回调

JUC(5) : ForkJoinPool | 线程的极致管理

FrokJoinTask 提供了两个重要的方法:

  • fork:让 task 异步执行
  • join:让 task 同步执行,可以获取返回值

ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而 ForkJoinWorkerThread 负责执行这些任务。

三、小结

对于 ForkJoinPool 相关的知识点需要记住以下几点:

  • CompletableFuture 在使用并行流计算的时候会调用 ForkJoinPool 的commonPool 方法,但这个方法可能会被很多任务共同使用,所以要谨慎使用,使用自己创建的线程池。
  • ForkJoinPool 是针对计算密集型的任务比较适合,如果数据量不大,没有使用 ForkJoinPool的必要,单线程也许更快。
  • ForkJoinPool 的执行效率与任务分片的粒度和线程数和数据量都有关联,需要仔细评估使用