线程并发中的工具类(一):Fork-Join
线程并发中的工具类fork-join
-
fork-join
- 概述:
- 一般情况下使用多线程:Thread/线程池
- 引入fork-join的好处:
- 在使用时,完全屏蔽掉了Thread/Runnable相关的知识
- 只要遵循它的规范那么就可以写出成一个很好的并发程序
- fork-join的场景:分而治之
- 可以分割成相同子结构,且子结构之间无关联
- 常见分而治之算法:快排,归并,二分
- 大数据的起源(MapRdios,后面就不用这个了)还是分而治之的
- 外部排序:归并排序的一种衍生
- 内存只有50MB,要去排10000个数字;
- 使用归并排序,利用内存去处理
- 分成子表
- 子表从根上进行两两合并
- 使用归并排序,利用内存去处理
- 内存只有50MB,要去排10000个数字;
- 概述:
-
Fork-join原理
-
流程
- fork:将原来的任务分成若干个子任务交给线程执行,粒度由用户决定
- join:子任务处理完后,进行合并
-
示意图:
-
概念:
- 工作密取:因为任务量一般是多余线程数的,那么每一个线程就有一个自己的任务列表;当有些线程执行完后,就悄悄地跑到其他线程的任务列表里面去拿任务,做完以后又给他放回去
- 充分利用线程的威力,自己就不要去自定义工作密取了
- 为什么会出现这种工作密取呢?
- 粒度是程序猿决定的,那为什么会有线程提前完成呢?
- 任务粒度确实是样的,但是每个任务计算量不同
- 所以某些线程会提前结束
- 粒度是程序猿决定的,那为什么会有线程提前完成呢?
- 工作密取:因为任务量一般是多余线程数的,那么每一个线程就有一个自己的任务列表;当有些线程执行完后,就悄悄地跑到其他线程的任务列表里面去拿任务,做完以后又给他放回去
-
-
Fork-join实战:在处理SQL/Excel查询的时候很不错
-
整体流程图:
-
流程:
- 拿到ForkJoinPool:多个线程各自有自己的任务队列
- ForkJoinTask:JDK帮助我们抽象出了任务
- 这个是一个抽象类
- JDK继续抽象出了RecursiveTask与RecursiveAction(就跟MutableLiveData一样)
- RecursiveTask:支持泛型,有返回结果
- RecursiveAction:不支持泛型,没有返回结果
- 但是这个还是抽象类
- JDK继续抽象出了RecursiveTask与RecursiveAction(就跟MutableLiveData一样)
- 在使用Fork-Join的时候,通常去继承RecursiveTask与RecursiveAction
- 这个是一个抽象类
- 将这个任务通过invoke或者commit提交到ForkJoinPool里面去
- 最后调用join,就能拿到大任务的执行结果
-
具体在内部是如何进行拆分的?
-
因为这两个是抽象类,例如在使用RecursiveTask的时候,我们需要重写其中的抽象方法(compute)
-
Fork-join只是框架而已,具体的任务怎么拆,任务粒度是多大,这些都是由程序猿进行决定的,那么我们的业务逻辑就需要写在compute里面
-
compute方法执行流程:
-
示意图:
-
-
-
-
细节:
- Fork-Join返回值:
- 定义成Task就有返回值
- 定义成Action就没有返回值
- Fork-Join的同步/异步性
- 同步:以invoke进行提交
- 异步:以excute进行提交
- Fork-Join返回值:
-
-
Fork-Join实战:使用Fork-Join同步方法,统计整形数组中的所有元素和
-
结论:
-
对比Fork-Join(同步方法)与单线程:子任务中无耗时操作
- 结果:单线程是比多线程快的
- 原因:Fork-Join从本质上来讲是一种递归,函数递归意味着方法递归,意味着会有频繁的入栈/弹栈;同时在处理这种计算问题的时候,单线程是一往无前的,而Fork-Join由于其并发性会导致线程上下文频繁切换(一次切换约为2万个CPU周期)
-
对比Fork-Join(同步方法)与单线程:子任务中含有耗时操作
- 结果:单线程是比多线程慢的
- 原因:多线程并发执行
-
对比Fork-Join(同步方法)与单线程(归并排序)
- 对比理由:二者均是分治思想
- 结果:在数据量大,且子任务中含有耗时操作时Fork-Join明显优于归并
-
-
代码展示:
-
MakeArray
package cn.enjoyedu.ch2.forkjoin.sum; import java.util.Random; //定义了一个长度为4000的整形数组,采用随机数进行填充 public class MakeArray { //数组长度 public static final int ARRAY_LENGTH = 4000; public static int[] makeArray() { //new一个随机数发生器 Random r = new Random(); int[] result = new int[ARRAY_LENGTH]; for(int i=0;i<ARRAY_LENGTH;i++){ //用随机数填充数组 result[i] = r.nextInt(ARRAY_LENGTH*3); } return result; } }
-
单线程进行执行:SumNormal
package cn.enjoyedu.ch2.forkjoin.sum; import cn.enjoyedu.tools.SleepTools; public class SumNormal { //采用单线程进行累加操作 public static void main(String[] args) { int count = 0; int[] src = MakeArray.makeArray(); long start = System.currentTimeMillis(); for(int i= 0;i<src.length;i++){ //模拟:每次计算(子任务)中没有耗时操作 //SleepTools.ms(1); count = count + src[i]; } System.out.println("The count is "+count +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } }
-
单线运行截图1:每次计算(子任务)中没有耗时操作
-
单线程运行截图2:每次计算(子任务)中含有耗时操作--->放开哪行代码注释
-
采用Fork-Join:SumArray
package cn.enjoyedu.ch2.forkjoin.sum; import cn.enjoyedu.tools.SleepTools; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class SumArray { //需要返回值,继承RecursiveTask,并实现泛型, // 这个东西本质上来说是一个递归,是方法反复递归调用, // 并且采用多线程处理计算任务会带来任务切换,会带来频繁的上下文切换,那么在子任务中如果有耗时操作就用Fork-Join private static class SumTask extends RecursiveTask<Integer>{ //阈值:规定任务拆分到多小的时候就行了 private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10; private int[] src; private int fromIndex; private int toIndex; public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } //实现compute方法 @Override protected Integer compute() { //判断任务大小满足条件 if (toIndex - fromIndex < THRESHOLD){ System.out.println(" from index = "+fromIndex+" toIndex="+toIndex); int count = 0; for(int i= fromIndex;i<=toIndex;i++){ //每次计算之前休眠1ms,就能体现出Fork-Join的威力并且当计算量越大威力越好 SleepTools.ms(1); count = count + src[i]; } return count; }else{ //大于规定的条件,继续拆分 //fromIndex....mid.....toIndex int mid = (fromIndex+toIndex)/2; //将任务拆分,折半,注意边界不能重合 SumTask left = new SumTask(src,fromIndex,mid); SumTask right = new SumTask(src,mid+1,toIndex); //将左右子任务交给池执行 invokeAll(left,right); //拿到子任务的值 return left.join()+right.join(); } } } public static void main(String[] args) { //获取ForkJoinPool实例 ForkJoinPool pool = new ForkJoinPool(); int[] src = MakeArray.makeArray(); //new出Task的实例(因为此时需要返回值,需要打印出数组的和),交给池子进行执行的, SumTask innerFind = new SumTask(src,0,src.length-1); long start = System.currentTimeMillis(); //同步执行 pool.invoke(innerFind); //System.out.println("Task is Running....."); System.out.println("The count is "+innerFind.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } }
-
Fork-Join运行截图1:在每次计算(子任务)中不增添耗时操作
-
Fork-Join运行截图2:在每次计算(子任务)中增添耗时操作
-
-
-
Fork-Join实战:采用其异步方法(commit提交子任务给池子):遍历指定目录寻找一个特定类型的文件
-
实现细节:
-
到底是同步方法还是异步方法?
//就是异步的了 pool.excute();
-
task.join():
- 异步方法同主线程并发执行:
- 这个是异步方法(由其提交方式决定的)那么这个是可以跟主线程并发执行的,是可以查到主线程之后的;
- 这个是带阻塞的:
- 那么主线程中,这行代码后面的代码,需要等这个执行完才执行
- 异步方法同主线程并发执行:
-
task.invokeAll():这个是将所有的子任务全部扔到那个池子里面去的
- 存在参数重载:三种实现
- 两个参数的:
- 就是传入左右边界,实现分治
- 三个参数的:前两个是边界,最后一个是集合
- 传入一个集合,并且会返回一个集合
- 两个参数的:
- 存在参数重载:三种实现
-
关于异步提交:内在机制差不多
- pool.submit():将任务以异步形式传到这个池子里面,带有返回值
- pool.excute():将任务以异步形式传到这个池子里面,不带返回值
-
-
代码实现:
package cn.enjoyedu.ch2.forkjoin; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; /** *类说明:遍历指定目录(含子目录)找寻指定类型文件 */ public class FindDirsFiles extends RecursiveAction { //外部传入路径 private File path; public FindDirsFiles(File path) { this.path = path; } @Override protected void compute() { List<FindDirsFiles> subTasks = new ArrayList<>(); File[] files = path.listFiles(); if (files!=null){ for (File file : files) { if (file.isDirectory()) { // 对每个子目录都新建一个子任务。 subTasks.add(new FindDirsFiles(file)); } else { // 遇到文件,检查。 if (file.getAbsolutePath().endsWith("txt")){ System.out.println("文件:" + file.getAbsolutePath()); } } } if (!subTasks.isEmpty()) { // 在当前的 ForkJoinPool 上调度所有的子任务。 //invokeAll是有三个实现的(两个参数的,类似二分; // 三个参数的:最后一个参数是一个集合,传进去一个集合,结果还是以集合形式传出来) for (FindDirsFiles subTask : invokeAll(subTasks)) { subTask.join(); } } } } public static void main(String [] args){ try { // 用一个 ForkJoinPool 实例调度总任务 ForkJoinPool pool = new ForkJoinPool(); //不要求返回值,扩展自Action FindDirsFiles task = new FindDirsFiles(new File("H:/")); //异步提交 pool.execute(task); //主线程的业务逻辑 System.out.println("Task is Running......"); Thread.sleep(1); int otherWork = 0; for(int i=0;i<100;i++){ otherWork = otherWork+i; } System.out.println("Main Thread done sth......,otherWork="+otherWork); //需要拿到子任务的话,在这个地方拿;并且这个是异步线程(join带阻塞的), // 会发现,fork-join线程与主线程之间是可以并行的,类似CMS垃圾处理器的标记整理算法 task.join(); System.out.println("Task end");//但是这句话就不一样了,需要等上面的哪一行代码执行完后才行 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
-
运行截图:
-
转载自:https://juejin.cn/post/7067214865877696548