Java 多线程 并行处理 Fork Join

总文档 :文章目录

Github : github.com/black-ant

Fork / Join 是一个工具框架 , 其核心思想在于将一个大运算切成多个小份 , 最大效率的利用资源 , 其主要涉及到三个类 : ForkJoinPool / ForkJoinTask / RecursiveTask

一 . Fork / Join 入门

什么是 Fork / Join
该框架是一个工具 , 通过分而治之的方式尝试将所有可用的处理器内核使用起来帮助加速并行处理

  • fork : 递归地将任务分解为较小的独立子任务 , 直到它们足够简单以便异步执行
  • join : 将所有子任务的结果递归的连接成单个结果

原理简述 :

  • Fork / Join 的执行是先把一个大任务分解(fork)成许多个独立的小任务,然后起多线程并行去处理这些小任务。处理完得到结果后再进行合并(join)就得到我们的最终结果。
  • Fork / Join 使用的算法为 work-stealing(工作窃取)
    • 该算法会把分解的小任务放在多个双端队列中,而线程在队列的头和尾部都可获取任务。
    • 当有线程把当前负责队列的任务处理完之后,它还可以从那些还没有处理完的队列的尾部窃取任务来处理

Fork / Join 线程池 :

  • ForkJoinPool : 用于管理 ForkJoinWorkerThread 类型的工作线程
    • 实现了 ExecutorService接口 的多线程处理器
    • 把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率

参考 @ blog.csdn.net/tyrroo/arti…

ForkJoin.png

二 . 说一说 RecursiveTask

RecursiveTask 是一种 ForkJoinTask 的递归实现 , 例如可以用于计算斐波那契数列 :

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码 class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}

RecursiveTask 继承了 ForkJoinTask 接口 ,其内部有几个主要的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码
// Node 1 : 返回结果 , 存放最终结果
V result;

// Node 2 : 抽象方法 compute , 用于计算最终结果
protected abstract V compute();

// Node 3 : 获取最终结果
public final V getRawResult() {
return result;
}

// Node 4 : 最终执行方法 , 这里是需要调用具体实现类compute
protected final boolean exec() {
result = compute();
return true;
}

常见使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码@ 
public class ForkJoinPoolService extends RecursiveTask<Integer> {

private static final int THRESHOLD = 2; //阀值
private int start;
private int end;

public ForkJoinPoolService(Integer start, Integer end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
int middle = (start + end) / 2;
ForkJoinPoolService leftTask = new ForkJoinPoolService(start, middle);
ForkJoinPoolService rightTask = new ForkJoinPoolService(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完,并得到其结果
Integer rightResult = rightTask.join();
Integer leftResult = leftTask.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}

}

三 . Fork Join 用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码// 前提 : 需要继承 RecursiveTask<Integer> 类 , 且实现 compute 方法
public class ForkJoinPoolReferenceService extends RecursiveTask<Integer> {

private File file;
private Integer salt;

public ForkJoinPoolReferenceService(File file, Integer salt) {
this.file = file;
this.salt = salt;
}

@Override
protected Integer compute() {
return ForkFileUtils.read(file, salt);
}
}


// 方式一 : Fork Join 方式
ForkJoinPoolReferenceService rt = new ForkJoinPoolReferenceService(files.get(0), i);
rt.fork();
result = result + rt.join();

// 方式二 : submit 方式
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(rt);
result = result + forkJoinTask.get();

三 . ForkJoinTask 用法

ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。

四 . ForkJoinPool 线程池

作用 : ForkJoinPool为来自非ForkJoinTask客户端的提交提供入口点,以及管理和监视操作,最原始的任务都要交给它才能处理 .

主要功能包括 :

  • 负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。
  • 负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。
  • 把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。

备注 :
ForkJoinPool不同于其他类型的ExecutorService,主要是因为它使用了窃取工作:池中的所有线程都试图找到并执行提交到池中的任务和/或其他活动任务创建的任务(如果没有工作,最终会阻塞等待工作)。

ForkJoinPool 基础用法

1
2
3
4
5
java复制代码ForkJoinPool forkJoinPool = new ForkJoinPool();
// 这是一个继承 ForkJoinTask 的类
ForkJoinPoolService countTask = new ForkJoinPoolService(1, 200);
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask);
System.out.println(forkJoinTask.get());

当大多数任务衍生出其他子任务时,以及当许多小任务从外部客户端提交到池时,这使得高效处理成为可能。

特别是当在构造函数中将asyncMode设置为true时,ForkJoinPool s也可能适合用于从未连接的事件风格任务。

ForkJoin 前知识点 : ForkJoinWorkerThread

ForkJoinWorkerThread 为 fork/join里面真正干活的”工人”,本质是一个线程 ,其里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。

ForkJoinWorkerThread 依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码
// Node 1 : 内部属性 , 一个是当前工作线程池
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

// Node 2 : WorkQueue 对象 , WorkQueue 是 ForkJoinPool 内部类
// 1 支持工作窃取和外部任务提交的队列
// 2 可以避免多个工作队列实例或多个队列数组共享缓存线
// 3 双端队列
static final class WorkQueue {
// TODO : 后续有必要会深入了解该类
}

// Node 3 : run 方法
Step 1 : 判断workQueue 是否为空
Step 2 : pool.runWorker(workQueue) 将 workQueue 加入 pool 池

// Node 4 : InnocuousForkJoinWorkerThread
private static final ThreadGroup innocuousThreadGroup =createThreadGroup();
?- 正如之前猜测的 , 线程组果然用在了这里

ForkJoinPool 代码深入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码
// Node 1 : 内部构建线程方式
static final class DefaultForkJoinWorkerThreadFactory
M- ForkJoinWorkerThread newThread(ForkJoinPool pool)
?- 通过 new ForkJoinWorkerThread(pool) 返回新 Thread

// Node 2 : 内部属性
ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
?- 用于创建 ForkJoinWorkerThread ,可以被重写
RuntimePermission modifyThreadPermission;
?- 该对象用于控制启用和杀掉线程的权限
static final ForkJoinPool common;
?- 公共静态池
volatile long ctl;
?- 主要用于判断状态


// Node 3 : 默认属性
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L;
?- 线程的初始超时值(以纳秒为单位)
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;
?- 空闲超时时间
private static final int DEFAULT_COMMON_MAX_SPARES = 256
?- 在静态初始化期间commonMaxSpares的初始值
private static final int SPINS = 0
?- 在阻塞前自旋等待的次数




// Node 4 : 主要方法
> tryAddWorker :
主要1 :add = U.compareAndSwapLong(this, CTL, c, nc); CAS 判断状态
主要2 : createWorker(); 创建工作

> WorkQueue registerWorker(ForkJoinWorkerThread wt)
主要1 : WorkQueue w = new WorkQueue(this, wt);
主要2 : 推入 workQueues

> runWorker : 运行一个 worker
重点1 : t = scan(w, r) : 扫描并试图窃取一个顶级任务
重点2 : w.runTask(t); : 执行给定的任务和任何剩余的本地任务

> scan : 窃取逻辑
// TODO

> <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)


// Node 5 :构造函数

public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode)
- parallelism : 可并行级别 , 即可并行运行的线程数量
- factory : 线程工厂
- handler : 异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获
- asyncMode : 工作模式类型 , 存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式

总结

总体来说还是不够深入 , 包括其中的性能 , invoke 实际上都还没有测试 , 实际上 ForkJoinPool 源码深入都不到一成 , 但是看源码看的有点头疼了 ,先这样了 , 后续会尽力把他完善清楚

参考与感谢

1
2
3
java复制代码[芋道源码](http://www.iocoder.cn/JUC/sike/aqs-3/)

[死磕系列](http://cmsblogs.com/?cat=151)

了解更多

Fork / Join 的性能问题

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%