likes
comments
collection
share

线程并发中的工具类(一):Fork-Join

作者站长头像
站长
· 阅读数 3

线程并发中的工具类fork-join

  • fork-join

    • 概述:
      • 一般情况下使用多线程:Thread/线程池
      • 引入fork-join的好处:
        • 在使用时,完全屏蔽掉了Thread/Runnable相关的知识
        • 只要遵循它的规范那么就可以写出成一个很好的并发程序
      • fork-join的场景:分而治之
        • 可以分割成相同子结构,且子结构之间无关联
        • 常见分而治之算法:快排,归并,二分
        • 大数据的起源(MapRdios,后面就不用这个了)还是分而治之的
        • 外部排序:归并排序的一种衍生
          • 内存只有50MB,要去排10000个数字;
            • 使用归并排序,利用内存去处理
              1. 分成子表
              2. 子表从根上进行两两合并
  • Fork-join原理

    • 流程

      • fork:将原来的任务分成若干个子任务交给线程执行,粒度由用户决定
      • join:子任务处理完后,进行合并
    • 示意图:

      线程并发中的工具类(一):Fork-Join

    • 概念:

      • 工作密取:因为任务量一般是多余线程数的,那么每一个线程就有一个自己的任务列表;当有些线程执行完后,就悄悄地跑到其他线程的任务列表里面去拿任务,做完以后又给他放回去
        • 充分利用线程的威力,自己就不要去自定义工作密取了
      • 为什么会出现这种工作密取呢?
        • 粒度是程序猿决定的,那为什么会有线程提前完成呢?
          • 任务粒度确实是样的,但是每个任务计算量不同
          • 所以某些线程会提前结束
  • Fork-join实战:在处理SQL/Excel查询的时候很不错

    • 整体流程图:

      线程并发中的工具类(一):Fork-Join

    • 流程:

      1. 拿到ForkJoinPool:多个线程各自有自己的任务队列
      2. ForkJoinTask:JDK帮助我们抽象出了任务
        • 这个是一个抽象类
          • JDK继续抽象出了RecursiveTask与RecursiveAction(就跟MutableLiveData一样)
            • RecursiveTask:支持泛型,有返回结果
            • RecursiveAction:不支持泛型,没有返回结果
            • 但是这个还是抽象类
        • 在使用Fork-Join的时候,通常去继承RecursiveTask与RecursiveAction
      3. 将这个任务通过invoke或者commit提交到ForkJoinPool里面去
      4. 最后调用join,就能拿到大任务的执行结果
    • 具体在内部是如何进行拆分的?

      • 因为这两个是抽象类,例如在使用RecursiveTask的时候,我们需要重写其中的抽象方法(compute)

        • Fork-join只是框架而已,具体的任务怎么拆,任务粒度是多大,这些都是由程序猿进行决定的,那么我们的业务逻辑就需要写在compute里面

        • compute方法执行流程:

          • 示意图:

            线程并发中的工具类(一):Fork-Join

    • 细节:

      • Fork-Join返回值:
        • 定义成Task就有返回值
        • 定义成Action就没有返回值
      • Fork-Join的同步/异步性
        • 同步:以invoke进行提交
        • 异步:以excute进行提交
  • Fork-Join实战:使用Fork-Join同步方法,统计整形数组中的所有元素和

    • 结论:

      • 对比Fork-Join(同步方法)与单线程:子任务中无耗时操作

        • 结果:单线程是比多线程快的
        • 原因:Fork-Join从本质上来讲是一种递归,函数递归意味着方法递归,意味着会有频繁的入栈/弹栈;同时在处理这种计算问题的时候,单线程是一往无前的,而Fork-Join由于其并发性会导致线程上下文频繁切换(一次切换约为2万个CPU周期)
      • 对比Fork-Join(同步方法)与单线程:子任务中含有耗时操作

        • 结果:单线程是比多线程慢的
        • 原因:多线程并发执行
      • 对比Fork-Join(同步方法)与单线程(归并排序)

        • 对比理由:二者均是分治思想
        • 结果:在数据量大,且子任务中含有耗时操作时Fork-Join明显优于归并
    • 代码展示:

      • MakeArray

         package cn.enjoyedu.ch2.forkjoin.sum;
         ​
         import java.util.Random;
         //定义了一个长度为4000的整形数组,采用随机数进行填充
         public class MakeArray {
             //数组长度
             public static final int ARRAY_LENGTH  = 4000;
         ​
             public static int[] makeArray() {
         ​
                 //new一个随机数发生器
                 Random r = new Random();
                 int[] result = new int[ARRAY_LENGTH];
                 for(int i=0;i<ARRAY_LENGTH;i++){
                     //用随机数填充数组
                     result[i] =  r.nextInt(ARRAY_LENGTH*3);
                 }
                 return result;
         ​
             }
         }
        
      • 单线程进行执行:SumNormal

         package cn.enjoyedu.ch2.forkjoin.sum;
         ​
         import cn.enjoyedu.tools.SleepTools;
         ​
         public class SumNormal {
             //采用单线程进行累加操作
             public static void main(String[] args) {
                 int count = 0;
                 int[] src = MakeArray.makeArray();
         ​
                 long start = System.currentTimeMillis();
                 for(int i= 0;i<src.length;i++){
                      //模拟:每次计算(子任务)中没有耗时操作
                     //SleepTools.ms(1);
                     count = count + src[i];
                 }
                 System.out.println("The count is "+count
                         +" spend time:"+(System.currentTimeMillis()-start)+"ms");       
             }
         }
        
      • 单线运行截图1:每次计算(子任务)中没有耗时操作

        线程并发中的工具类(一):Fork-Join

      • 单线程运行截图2:每次计算(子任务)中含有耗时操作--->放开哪行代码注释

        线程并发中的工具类(一):Fork-Join

      • 采用Fork-Join:SumArray

         package cn.enjoyedu.ch2.forkjoin.sum;
         ​
         import cn.enjoyedu.tools.SleepTools;
         ​
         import java.util.concurrent.ForkJoinPool;
         import java.util.concurrent.RecursiveTask;
         ​
         public class SumArray {
             //需要返回值,继承RecursiveTask,并实现泛型,
             // 这个东西本质上来说是一个递归,是方法反复递归调用,
             // 并且采用多线程处理计算任务会带来任务切换,会带来频繁的上下文切换,那么在子任务中如果有耗时操作就用Fork-Join
         ​
             private static class SumTask extends RecursiveTask<Integer>{
         ​
                 //阈值:规定任务拆分到多小的时候就行了
                 private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;
                 private int[] src;
                 private int fromIndex;
                 private int toIndex;
         ​
                 public SumTask(int[] src, int fromIndex, int toIndex) {
                     this.src = src;
                     this.fromIndex = fromIndex;
                     this.toIndex = toIndex;
                 }
         ​
                 //实现compute方法
                 @Override
                 protected Integer compute() {
                     //判断任务大小满足条件
                     if (toIndex - fromIndex < THRESHOLD){
                         System.out.println(" from index = "+fromIndex+" toIndex="+toIndex);
                         int count = 0;
                         for(int i= fromIndex;i<=toIndex;i++){
                             //每次计算之前休眠1ms,就能体现出Fork-Join的威力并且当计算量越大威力越好
                            SleepTools.ms(1);
                              count = count + src[i];
                         }
                         return count;
                     }else{
                         //大于规定的条件,继续拆分
                         //fromIndex....mid.....toIndex
                         int mid = (fromIndex+toIndex)/2;
                         //将任务拆分,折半,注意边界不能重合
                         SumTask left = new SumTask(src,fromIndex,mid);
                         SumTask right = new SumTask(src,mid+1,toIndex);
                         //将左右子任务交给池执行
                         invokeAll(left,right);
                         //拿到子任务的值
                         return left.join()+right.join();
                     }
                 }
             }
         ​
         ​
             public static void main(String[] args) {
         ​
                 //获取ForkJoinPool实例
                 ForkJoinPool pool = new ForkJoinPool();
                 int[] src = MakeArray.makeArray();
         ​
                 //new出Task的实例(因为此时需要返回值,需要打印出数组的和),交给池子进行执行的,
                 SumTask innerFind = new SumTask(src,0,src.length-1);
         ​
                 long start = System.currentTimeMillis();
         ​
                 //同步执行
                 pool.invoke(innerFind);
                 //System.out.println("Task is Running.....");
         ​
                 System.out.println("The count is "+innerFind.join()
                         +" spend time:"+(System.currentTimeMillis()-start)+"ms");
         ​
             }
         }
        
      • Fork-Join运行截图1:在每次计算(子任务)中不增添耗时操作

        线程并发中的工具类(一):Fork-Join

      • Fork-Join运行截图2:在每次计算(子任务)中增添耗时操作

        线程并发中的工具类(一):Fork-Join


  • Fork-Join实战:采用其异步方法(commit提交子任务给池子):遍历指定目录寻找一个特定类型的文件

    • 实现细节:

      • 到底是同步方法还是异步方法?

        //就是异步的了
        pool.excute();
        
      • task.join():

        • 异步方法同主线程并发执行:
          • 这个是异步方法(由其提交方式决定的)那么这个是可以跟主线程并发执行的,是可以查到主线程之后的;
        • 这个是带阻塞的:
          • 那么主线程中,这行代码后面的代码,需要等这个执行完才执行
      • task.invokeAll():这个是将所有的子任务全部扔到那个池子里面去的

        • 存在参数重载:三种实现
          • 两个参数的:
            • 就是传入左右边界,实现分治
          • 三个参数的:前两个是边界,最后一个是集合
            • 传入一个集合,并且会返回一个集合
      • 关于异步提交:内在机制差不多

        • pool.submit():将任务以异步形式传到这个池子里面,带有返回值
        • pool.excute():将任务以异步形式传到这个池子里面,不带返回值
    • 代码实现:

      package cn.enjoyedu.ch2.forkjoin;
      
      import java.io.File;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.ForkJoinPool;
      import java.util.concurrent.RecursiveAction;
      
      /**
       *类说明:遍历指定目录(含子目录)找寻指定类型文件
       */
      public class FindDirsFiles extends RecursiveAction {
      
          //外部传入路径
          private File path;
      
          public FindDirsFiles(File path) {
              this.path = path;
          }
      
          @Override
          protected void compute() {
              List<FindDirsFiles> subTasks = new ArrayList<>();
      
              File[] files = path.listFiles();
              if (files!=null){
                  for (File file : files) {
                      if (file.isDirectory()) {
                          // 对每个子目录都新建一个子任务。
                          subTasks.add(new FindDirsFiles(file));
                      } else {
                          // 遇到文件,检查。
                          if (file.getAbsolutePath().endsWith("txt")){
                              System.out.println("文件:" + file.getAbsolutePath());
                          }
                      }
                  }
                  if (!subTasks.isEmpty()) {
                      // 在当前的 ForkJoinPool 上调度所有的子任务。
                      //invokeAll是有三个实现的(两个参数的,类似二分;
                      // 三个参数的:最后一个参数是一个集合,传进去一个集合,结果还是以集合形式传出来)
                      for (FindDirsFiles subTask : invokeAll(subTasks)) {
                          subTask.join();
                      }
                  }
              }
          }
      
          public static void main(String [] args){
              try {
                  // 用一个 ForkJoinPool 实例调度总任务
                  ForkJoinPool pool = new ForkJoinPool();
                  //不要求返回值,扩展自Action
                  FindDirsFiles task = new FindDirsFiles(new File("H:/"));
      
                  //异步提交
                  pool.execute(task);
      
                  //主线程的业务逻辑
                  System.out.println("Task is Running......");
                  Thread.sleep(1);
                  int otherWork = 0;
                  for(int i=0;i<100;i++){
                      otherWork = otherWork+i;
                  }
                  System.out.println("Main Thread done sth......,otherWork="+otherWork);
                  //需要拿到子任务的话,在这个地方拿;并且这个是异步线程(join带阻塞的),
                  // 会发现,fork-join线程与主线程之间是可以并行的,类似CMS垃圾处理器的标记整理算法
                  task.join();
                  System.out.println("Task end");//但是这句话就不一样了,需要等上面的哪一行代码执行完后才行
              } catch (Exception e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
          }
      }
      
    • 运行截图:

      线程并发中的工具类(一):Fork-Join