java8(五)并行流之分支/合并(fork/join)框架

「这是我参与11月更文挑战的第3天,活动详情查看:2021最后一次更文挑战」。

并行流背后使用的基础架构是Java 7中引入的分支/合并框架。我们会在本文仔细研究分支/合并框架。

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工作线程。

image.png

一、RecursiveTask

要把任务提交到这个池,必须创建 RecursiveTask 的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction 类型。

要定义 RecursiveTask, 只需实现它唯一的抽象方法compute :

1
java复制代码protected abstract R compute();

在我们实现这个方法时,需要同时定义将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成
单个子任务结果的逻辑。

这个方法的实现类似于下面的伪代码:

1
2
3
4
5
6
7
java复制代码if (任务足够小或不可分) {
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}

递归任务拆分过程如下所示:

image.png

分支/合并框架实例:为一个数字范围Long[]求和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
java复制代码import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

import static com.cloud.bssp.java8.stream.TestStreamParallel.measureSumPerf;

/**
* @description: 使用ForkJoinPool
* @author:weirx
* @date:2021/10/25 14:10
* @version:3.0
*/
public class TestRecursiveTask extends RecursiveTask<Long> {

/**
* 要求和的数组
*/
private final long[] numbers;

/**
* 子任务求和的数组的开始位置
*/
private int start;

/**
* 子任务求和的数组的结束位置
*/
private int end;

/**
* 私有构造,用于以递归方式为主任务创建子任务
*
* @param numbers
* @param start
* @param end
*/
private TestRecursiveTask(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}

/**
* 公共函数用于构建主任务
*
* @param numbers
*/
public TestRecursiveTask(long[] numbers) {
this.numbers = numbers;
}

/**
* 任务拆分的数组最大值
*/
public static final long THRESHOLD = 10000L;

@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
// 如果大小小于等于阈值,则顺序计算
return computeSequentially();
} else {
//创建一个子任务,为数组的前一半求和
TestRecursiveTask left = new TestRecursiveTask(numbers, start, start + length / 2);
//利用另一个ForkJoinPool线程异步执行新创建的子任务
left.fork();
//创建一个子任务,为数组的后一半求和
TestRecursiveTask right = new TestRecursiveTask(numbers, start + length / 2, end);
// 同步执行第二个子任务
Long compute = right.compute();
//读取第一个子任务的结果,没有完成则等待
Long join = left.join();
//结果合并
return compute + join;
}
}

/**
* 当子任务不可拆分时计算结果的简单算法
*
* @return
*/
private Long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}

/**
* 并行对前n个自然数求和
*
* @param n
* @return
*/
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new TestRecursiveTask(numbers);
return new ForkJoinPool().invoke(task);
}

public static void main(String[] args) {
System.out.println("ForkJoin sum done in: " + measureSumPerf(
TestRecursiveTask::forkJoinSum, 10000000) + " msecs");
}

}

输出结果:

1
bash复制代码ForkJoin sum done in: 64 msecs

这个性能看起来比用并行流的版本要差,但这只是因为必须先要把整个数字流都放进一个long[] ,之后才能在任务中使用它。

二、Fork/join的最佳用法

虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用。以下是几个有效使用它的最佳做法:

1)对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。

2)不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。

3) 对子任务调用 fork 方法可以把它排进 ForkJoinPool 。同时对左边和右边的子任务调用fork()似乎很自然,但这样做的效率要比直接对其中一个调用 compute 低。调用compute你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。

4)调试分支/合并框架的并行计算代码可能有点棘手。特别是你平常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支/合并计算上就不行了,因为调用 compute的线程并不是概念上的调用方,后者是调用 fork 的那个。

5)和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时长。

三、工作窃取

工作窃取为何被提出?

如前面的例子,我们指定数组的大小是10000L,即允许任务被拆分为每个数组大小为10000,共1000个任务。

在理想的情况下,每个任务完成的时间应该是相同的,这样在多核cpu的前提下,我们能保证每个核处理的时间都是相同的。

实际情况中,每个子任务花费的时间可以说是天差地别,磁盘,网络,或等等很多的因素导致。

Fork/Join框架为了解决这个提出,提出了工作窃取(work stealing)的概念。

在实际应用中,这意味着这些任务差不多被平均分配到 ForkJoinPool 中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。

基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。

一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。如下图展示了这个过程。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真。

image.png

四、Spliterator

那么Stream是如何实现并行的呢?我们并不需要手动去实现Fork/join,这就意味着,肯定有一种自动机制来为你拆分流。这种新的自动机制称为 Spliterator。

Spliterator 是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitableiterator)。和 Iterator 一样, Spliterator 也用于遍历数据源中的元素,但它是为了并行执行而设计的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public interface Spliterator<T> {

/**
* tryAdvance 方法的行为类似于普通的Iterator ,因为它会按顺序一个一个使用 Spliterator 中的元素,
* 并且如果有其他元素要遍历就返回 true
*/
boolean tryAdvance(Consumer<? super T> action);

/**
* 专为 Spliterator 接口设计的,因为它可以把一些元素划出去分
* 给第二个 Spliterator (由该方法返回),让它们两个并行处理。
*/
Spliterator<T> trySplit();

/**
* estimateSize 方法估计还剩下多少元素要遍历
*/
long estimateSize();

int characteristics();
}

4.1 拆分过程

将 Stream 拆分成多个部分的算法是一个递归过程,这个框架不断对 Spliterator 调用 trySplit直到它返回 null ,表明它处理的数据结构不能再分割,流程如下描述。

1)第一步是对第一个Spliterator 调用 trySplit ,生成第二个 Spliterator 。

2)第二步对这两个 Spliterator 调用trysplit ,这样总共就有了四个 Spliterator 。

3)第三步,对当前所有的Spliterator 调用trysplit ,当所有的trysplit 都返回null,则表示拆分结束。

4.2 Spliterator特性

Spliterator的拆分过程也收到其本身的特性所影响,特性是通过characteristics()方法来声明的。

Spliterator 接口声明的最后一个抽象方法是 characteristics ,它将返回一个 int ,代表 Spliterator 本身特性集的编码。

有如下特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
arduino复制代码    /**
* 元素有既定的顺序(例如 List ),因此 Spliterator 在遍历和划分时也会遵循这一顺序
*/
public static final int ORDERED = 0x00000010;

/**
* 对于任意一对遍历过的元素 x 和 y , x.equals(y) 返回 false
*/
public static final int DISTINCT = 0x00000001;

/**
* 遍历的元素按照一个预定义的顺序排序
*/
public static final int SORTED = 0x00000004;

/**
* 该 Spliterator 由一个已知大小的源建立(例如 Set ),因此 estimatedSize() 返回的是准确值
*/
public static final int SIZED = 0x00000040;

/**
* 保证遍历的元素不会为 null
*/
public static final int NONNULL = 0x00000100;

/**
* Spliterator 的数据源不能修改。这意味着在遍历时不能添加、删除或修改任何元素
*/
public static final int IMMUTABLE = 0x00000400;

/**
* 该 Spliterator 的数据源可以被其他线程同时修改而无需同步
*/
public static final int CONCURRENT = 0x00001000;

/**
* 该 Spliterator 和所有从它拆分出来的 Spliterator 都是 SIZED
*/
public static final int SUBSIZED = 0x00004000;

这里做个简单了解就好了,限于篇幅暂时不深入了。


同学们看到这,觉得有学到一丁点知识的给个赞吧~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%