Java并发编程面试7:Fork/Join框架-ForkJoinPool和RecursiveTaskFork/Join框
引言
正文
Fork/Join框架是Java 7中引入的一个用于并行执行任务的框架,它的设计目标是充分利用多核处理器的计算能力。Fork/Join框架基于“分而治之”的原则,将大任务分割成小任务,小任务可以并行执行,最后合并各个小任务的结果以产生大任务的结果。
核心组件
Fork/Join框架主要由以下几个核心组件构成:
-
ForkJoinPool:这是执行Fork/Join任务的线程池。它使用工作窃取算法来平衡工作负载,即空闲的线程可以从其他繁忙线程的队列中“窃取”任务来执行。
-
ForkJoinTask:这是要执行的任务的基类。有两个常用的子类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
工作原理
Fork/Join框架的工作原理可以分为两个主要步骤:Fork(分解)和Join(合并)。
-
Fork(分解):将一个大任务分解成若干个小任务,直到这些小任务足够简单,可以顺利执行且无需进一步分解。
-
Join(合并):执行小任务并合并其结果,形成原始大任务的结果。
使用方法
要使用Fork/Join框架,通常需要执行以下步骤:
-
创建一个继承自
RecursiveTask
(有返回值)或RecursiveAction
(无返回值)的类。 -
实现
compute
方法,这个方法包含了任务的逻辑。如果任务足够小就直接执行,否则将任务分解(fork)成更小的任务。 -
在必要时,调用
fork
方法来异步执行新创建的子任务。 -
调用
join
方法来等待子任务的完成并获取其结果。 -
最终合并这些结果(如果有的话)。
优点
-
利用多核处理器: Fork/Join框架旨在充分利用多核处理器的能力,可以显著提高并行任务的性能。
-
工作窃取算法: 使用工作窃取算法(work-stealing)来平衡不同线程之间的工作负载。当一个线程完成了所有任务时,它可以从其他线程的任务队列中取任务来执行,从而提高线程利用率。
-
递归任务分解: 适合处理可以递归分解为更小任务的问题,特别是对于计算密集型任务,可以很好地提升性能。
-
简化并行编程: 相比于直接使用线程和Runnable任务,Fork/Join框架提供了一种更简单、更高层次的并行编程模型。
-
可扩展性: 在多处理器环境中,Fork/Join框架提供了良好的可扩展性,因为它可以根据系统资源动态地适应任务的执行。
缺点
-
适用性有限: Fork/Join框架主要适用于可分解的递归问题,对于不可分解的任务或者I/O密集型任务,可能不会带来性能上的提升。
-
调试困难: 并行执行的任务可能使得调试变得更加复杂,因为任务的执行不是顺序的,而是并发的。
-
任务管理开销: 如果任务分解得过细,那么任务管理的开销可能会超过任务执行的开销,从而降低性能。
-
内存消耗: Fork/Join框架可能会创建大量的任务对象,这可能会对垃圾收集器造成压力,增加内存消耗。
-
异常处理复杂: 在Fork/Join框架中处理异常可能比在单线程环境中更复杂。如果一个子任务失败了,它可能需要取消其他的子任务。
-
线程竞争: 尽管工作窃取算法可以减少线程间的竞争,但在某些情况下,大量线程同时尝试窃取任务或访问共享资源时,仍然可能会出现竞争。
-
适应性: 对于某些类型的问题,比如那些不容易分解为独立子任务的问题,或者需要频繁进行任务同步的问题,Fork/Join可能不是最佳选择。
总的来说,Fork/Join框架是一个强大的工具,适合处理可以分解为更小部分的大型计算任务。然而,它并不适用于所有类型的并行问题,因此在选择使用Fork/Join框架之前,应该根据具体问题的特点进行权衡。
示例代码
以下是使用Fork/Join框架的一个简单示例:
package com.dereksmart.crawling.core;
import java.util.Random;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
/**
* @Author derek_smart
* @Date 2024/8/15 7:55
* @Description ForkJoin统计计算测试类
*/
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000;
private long[] array;
private int start;
private int end;
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if (end - start <= THRESHOLD) { // 任务足够小直接计算
for (int i = start; i < end; i++) {
sum += array[i];
}
} else { // 任务大则分解
int middle = (start + end) / 2;
SumTask subtask1 = new SumTask(array, start, middle);
SumTask subtask2 = new SumTask(array, middle, end);
subtask1.fork(); // 异步执行第一个子任务
subtask2.fork(); // 异步执行第二个子任务
sum = subtask1.join() + subtask2.join(); // 等待子任务完成,并合并结果
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
long[] array = new long[1001]; // 示例数组
Random random = new Random();
for (int i = 0; i < array.length; i++) {
array[i] = random.nextLong(); // 或者使用其他值填充数组
}
// 初始化数组
SumTask task = new SumTask(array, 0, array.length);
long sum = pool.invoke(task); // 执行任务并获取结果
System.out.println("Sum: " + sum);
}
}
注意事项
使用Fork/Join框架时,有几个要点需要注意:
- 任务的分解应该是有效的,以避免产生过多的任务,导致系统开销增大。
- 应该避免在
ForkJoinTask
中使用同步控制结构,如synchronized
或ReentrantLock
,因为这可能导致死锁。 ForkJoinPool
默认的并行级别等于运行时的可用处理器数量。可以通过构造函数来自定义并行级别。ForkJoinTask
不应该执行阻塞操作,因为这可能导致线程池中的线程耗尽。
Fork/Join框架是Java并发编程的一个强大工具,它能够帮助开发者编写出高效、可伸缩的并行代码。
ForkJoinPool和RecursiveTask 核心概述
ForkJoinPoolForkJoinPool
是 Fork/Join 框架的核心,它是一个专门为了执行 ForkJoinTask
任务而设计的线程池。这个线程池利用了工作窃取算法(work-stealing algorithm)来提高CPU的利用率和减少线程间的竞争。
工作原理
在 ForkJoinPool
中,每个参与的线程都维护着一个双端队列(deque),用来存放任务。当一个线程的队列为空时,它可以从其他线程的队列的末尾“窃取”一个任务来执行。这种策略减少了线程间的交互,提高了效率,尤其是当有大量小任务需要执行时。
主要方法
invoke(ForkJoinTask<T> task)
:同步执行指定的任务,并返回结果。execute(ForkJoinTask<?> task)
:异步执行指定的任务。submit(ForkJoinTask<T> task)
:提交一个任务以供执行,并返回一个Future
来获取结果。
构造函数
ForkJoinPool
有几个构造函数,允许自定义线程池的特性,如并行级别(通常与机器的CPU核心数相等)、工厂方法(用于创建线程)和未捕获异常的处理器等。
RecursiveTask
RecursiveTask
是 ForkJoinTask
的一个抽象子类,用于有返回结果的任务。需要实现 compute
方法来定义任务的逻辑。如果任务足够小,可以直接计算结果;如果任务太大,应该将其分解(fork)为更小的任务。
compute方法
compute
方法是必须实现的方法,它定义了任务的计算逻辑。在 compute
方法中,可以检查任务是否足够小以便直接处理,或者应该进一步分解(fork)成子任务。
fork和join
fork()
:异步执行任务。当调用fork()
方法时,RecursiveTask
会安排在ForkJoinPool
中执行。join()
:等待并获取任务的结果。当调用join()
方法时,执行join()
的线程会等待任务完成,并返回计算的结果。
示例
以下是一个使用 RecursiveTask
的简单示例,它演示了如何使用 ForkJoinPool
和 RecursiveTask
来递归地计算一个数组的和:
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class SumArray extends RecursiveTask<Integer> {
private final int[] array;
private final int start;
private final int end;
public SumArray(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
final int threshold = 10; // 设置任务分解的阈值
if (end - start < threshold) {
// 任务足够小, 直接计算
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 任务过大, 继续分解
int mid = (start + end) / 2;
SumArray leftTask = new SumArray(array, start, mid);
SumArray rightTask = new SumArray(array, mid, end);
leftTask.fork(); // 异步执行左侧任务
rightTask.fork(); // 异步执行右侧任务
// 合并结果
return leftTask.join() + rightTask.join();
}
}
public static void main(String[] args) {
int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; // 示例数组
ForkJoinPool pool = new ForkJoinPool(); // 创建ForkJoinPool实例
SumArray task = new SumArray(array, 0, array.length); // 创建任务
int sum = pool.invoke(task); // 执行任务并获取结果
System.out.println("Sum: " + sum); // 输出结果
}
}
在这个例子中,SumArray
任务会递归地将数组分成更小的部分,直到每个部分的大小小于设定的阈值。然后它会计算这些小部分的和,并通过 join()
方法将结果合并起来。
注意事项
- 在任务执行过程中,尽量避免使用同步控制结构,因为它们可能会引起不必要的阻塞,降低并行效率。
fork()
应该尽可能地被调用在compute
方法的开始部分,并且紧跟着join()
,这样可以最大程度地利用线程池中的线程。- 适当地选择任务分解的阈值是至关重要的,阈值过小会导致过多的任务创建和调度开销,阈值过大则无法充分利用多核处理器的优势。
ForkJoin 具体示例
Java中的parallelStream
是Java 8引入的一个功能,它属于Stream API的一部分。parallelStream
本质上是一个数据并行处理的工具,它允许开发者以声明式的方式来编写并行处理代码。当对一个集合调用parallelStream()
方法时,会得到一个并行流(Parallel Stream)。这个并行流背后的机制是Fork/Join框架,它将数据分割成多个小块,然后利用多个线程并行处理这些小块,最后将结果合并起来。这种方式可以有效地利用多核处理器的计算资源,提高数据处理的速度。
工作原理
-
数据分割:并行流使用Fork/Join框架将任务分割成更小的任务,直到它们足够小可以并行处理。
-
任务执行:分割出的小任务被提交到Fork/Join框架的线程池中,这个线程池默认的线程数目等于处理器核心数目。
-
结果合并:处理完毕后,小任务的结果被合并起来,以生成最终的结果。
使用示例
List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList.parallelStream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
在这个例子中,parallelStream
会并行地执行过滤、映射和排序操作,并且最终的输出顺序可能与元素在原始列表中的顺序不同,因为操作是并行执行的。
优点
- 简单易用:只需将
stream()
替换为parallelStream()
即可实现并行处理。 - 自动并行化:不需要显式地管理线程或执行器服务。
缺点
- 不总是更快:并行处理并不总是比顺序处理快,特别是在数据量较小或者任务本身就不适合并行化的情况下。
- 线程安全:在使用并行流时,需要确保执行的操作不会引起线程安全问题。
- 合并成本:并行流在处理结束后需要合并结果,这个合并过程有时会很昂贵,尤其是当处理的操作涉及到状态变更或复杂的合并逻辑时。
总结
parallelStream
是一个便捷的工具,可以将数据处理任务自动并行化,以利用多核处理器的性能。然而,并行流不适用于所有场景,开发者在使用时应该评估其适用性,并注意线程安全和性能影响。
Fork/Join框架与多线程并发
Fork/Join框架与多线程并发都是解决并行计算问题的方法,它们之间有一些关联之处:1. 基于线程的执行:无论是传统的多线程并发编程还是使用Fork/Join框架,它们的基础都是线程。在多核处理器上,线程可以并发执行,从而提高程序的执行效率。
-
目标相同:两者的主要目标都是为了利用多核处理器的能力,通过将任务分配给多个线程来提高应用程序的性能。
-
任务分割:在多线程并发编程中,开发者通常需要手动分解任务并分配给线程。而在Fork/Join框架中,这一过程通过递归的方式自动进行,框架负责将大任务分解为小任务并分配给线程执行。
-
线程管理:在传统的多线程并发编程中,开发者需要自己管理线程的生命周期,包括创建、启动、同步和终止线程。而在Fork/Join框架中,这些都由ForkJoinPool(线程池)来管理。ForkJoinPool使用工作窃取算法来动态地重新分配任务,以保持所有线程的忙碌状态并减少线程之间的竞争。
-
同步机制:在多线程并发编程中,开发者需要使用各种同步机制,如
synchronized
关键字、ReentrantLock
、CountDownLatch
、CyclicBarrier
、Semaphore
等,来控制线程间的协作和资源共享。在Fork/Join框架中,同步是通过任务的fork()
和join()
操作来实现的,框架内部处理了大部分同步的复杂性。 -
异常处理:多线程并发编程中的异常处理可能会比较复杂,因为异常需要在不同线程间传播和处理。Fork/Join框架提供了一些机制,如
ForkJoinTask
的getException()
方法,来帮助处理并行任务中发生的异常。
尽管Fork/Join框架与多线程并发有很多相似之处,但Fork/Join框架提供了一种更高层次的抽象,它简化了并行任务的分解和线程管理,特别适合于那些可以递归分解为子任务的问题。而传统的多线程并发编程则提供了更大的灵活性,适用于更广泛的并发场景,但通常需要更多的线程管理和同步控制。
基于Fork/Join框架并行快速排序
实现一个并行快速排序算法(Parallel Quicksort)。快速排序是一种分而治之的排序算法,它通过一个分区操作将要排序的数组分割成两个子数组,然后递归地对子数组进行快速排序。
以下是使用Fork/Join框架实现的并行快速排序的示例代码:
package com.dereksmart.crawling.core;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;
/**
* @Author derek_smart
* @Date 2024/8/15 8:15
* @Description ForkJoin并行快速排序
*/
public class ParallelQuickSort extends RecursiveAction {
private int[] array;
private int left;
private int right;
public ParallelQuickSort(int[] array) {
this(array, 0, array.length - 1);
}
public ParallelQuickSort(int[] array, int left, int right) {
this.array = array;
this.left = left;
this.right = right;
}
@Override
protected void compute() {
if (left < right) {
int pivotIndex = partition(array, left, right);
// 递归排序左边子数组
ParallelQuickSort leftTask = new ParallelQuickSort(array, left, pivotIndex - 1);
// 递归排序右边子数组
ParallelQuickSort rightTask = new ParallelQuickSort(array, pivotIndex + 1, right);
// 并行执行两个子任务
invokeAll(leftTask, rightTask);
}
}
private int partition(int[] array, int left, int right) {
int pivot = array[right];
int i = (left - 1);
for (int j = left; j < right; j++) {
if (array[j] <= pivot) {
i++;
// 交换 array[i] 和 array[j]
int temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
// 交换 array[i+1] 和 array[right] (或 pivot)
int temp = array[i + 1];
array[i + 1] = array[right];
array[right] = temp;
return i + 1;
}
public static void parallelQuickSort(int[] array) {
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new ParallelQuickSort(array));
}
public static void main(String[] args) {
int[] array = {3, 5, 2, 1, 4, 6, 9, 8, 7, 0}; // 示例数组
parallelQuickSort(array); // 执行并行快速排序
for (int value : array) {
System.out.print(value + " ");
}
}
}
ParallelQuickSort
类继承自 RecursiveAction
,因为不需要返回排序结果(数组本身被就地排序)。compute
方法实现了快速排序的逻辑,当子数组足够小(即 left
索引小于 right
索引)时,它会进行分区操作,然后创建两个新的 ParallelQuickSort
任务来分别对左右子数组进行排序。通过调用 invokeAll(leftTask, rightTask)
,这两个任务会被并行执行。
partition
方法实现了快速排序的分区操作,它选择最右边的元素作为基准(pivot),然后将数组中的元素重新排列,使得所有小于基准的元素都在基准的左边,而所有大于基准的元素都在基准的右边,然后返回基准的新索引。
parallelQuickSort
静态方法是客户端代码调用的入口点,它创建了一个 ForkJoinPool
并执行了排序任务。
在 main
方法中,创建了一个示例数组,并调用 parallelQuickSort
方法来执行排序。排序完成后,打印排序后的数组。
请注意,虽然并行快速排序在多核处理器上可能会比顺序快速排序快,但由于Fork/Join框架的开销以及快速排序本身的高效性,对于较小的数组,使用并行快速排序可能不会带来显著的性能提升。此外,快速排序的性能也依赖于选择的基准值,不恰当的基准选择可能会导致性能下降。
总结
Fork/Join框架是一个强大的工具,它提供了一种简单的方法来利用现代多核处理器的并行处理能力。通过递归地分解任务,并使用ForkJoinPool
来管理任务的执行,开发者可以编写出高效的并行代码。然而,框架的有效性取决于任务的类型和大小,以及任务之间的依赖关系。对于计算密集型和可以递归分解的任务,Fork/Join框架通常能够提供显著的性能提升。
转载自:https://juejin.cn/post/7402845471645859880