并行与并发
关于并发与并行,需要弄清楚的是,并行关注于多个任务同时进行,而并发则通过调度来不停的切换多个任务执行,而实质上多个任务不是同时执的。并发,英文单词为:Concurrent。并行的英文单词为:parallel。如果想对并发和并行有一个比较直观的认识,可以参考下面这张图片:
并行与并发
Fork/Join 框架与 Java Stream API
Fork/Join框架属于并行框架,关于Fork/Join框架的一些内容,可以参考这篇文章:Java Fork/Join并行框架。简单来说,Fork/Join框架可以将大的任务切分为足够小的任务,然后将小任务分配给不同的线程来执行,而线程之间通过工作窃取算法来协调资源,提前昨晚任务的线程可以去“窃取”其他还没有做完任务的线程的任务,而每一个线程都会持有一个双端队列,里面存储着分配给自己的任务,Fork/Join框架在实现上,为了防止线程之间的竞争,线程在消费分配给自己的任务时,是从队列头取任务的,而“窃取”线程则从队列尾部取任务。
Fork/Join框架通过fork方法来分割大任务,通过使用join来获取小任务的结果,然后组合成大任务的结果。关于Fork/Join任务模型,可以参考下面的图片:
Fork/Join的任务模型
关于Java Stream API的相关内容,可以参考该文章:Java Streams API。
Stream在实现上使用了Fork/Join框架来实现并发,所以使用Stream我们可以在不知不觉间就使得我们的程序跑得飞快,究其原因就是Stream使用了Fork/Join并发框架来处理任务,当然,你需要显示的指定Stream为parallel,否则Stream默认都是串行流。比如对于Collection,你可以使用parallelStream来转换为一个并发流,或者使用stream方法转换为串行流,然后使用parallel操作使得串行流变为并发流。本文的重点是剖析Stream是如何使用Fork/Join来做并发的。
Stream的并发实现细节
在了解了Fork/Join并发框架和Java Stream之后,首要的问题就是:Stream是如何使用Fork/Join框架来做到并发的?其实对于使用者来说,了解Stream就是通过Fork/Join框架来做的就好了,但是如果想要深入了解一下Fork/Join框架的实践,以及Java Stream的设计方法,那么去读一下实现的源码还是很有必要的,下文中的分析仅代表个人观点!
需要注意的一点是,Java Stream的操作分为两类,也可以分为三类,具体的细节可以参考该文章:Java Streams API。一个简单的判断一个操作是否是Terminal操作还是Intermediate操作的方法是,如果操作返回的是一个新的Stream,那么就是一个Intermediate操作,否则就是一个Terminal操作。
- Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据操作,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
- Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
- 还有一种操作被称为 short-circuiting。用以指:
+ 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个 有限的新 Stream。
+ 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。
Java Stream对四种类型的Terminal操作使用了Fork/Join实现了并发操作,下面的图片展示了这四种操作类型:
支持并行的四种Stream操作
我们首先来走一遍Stream操作的执行路径,下面的代码是我们想要做的操作流,下文会根据该代码示例来跟踪Stream的执行路径:
1 | 复制代码 Stream.of(1,2,3,4) |
解释一下,上面的代码想要实现的功能是将(1,2,3,4)这四个数字每一个都变为其自身的两倍,然后收集这些元素到一个ArrayList中返回。这是一个非常简单的功能,下面是上面的操作流的执行路径:
1 | 复制代码 |
上面的五个步骤是经过一些省略的,需要注意的一点是,intermediate类型的操作仅仅将操作加到一个upstream里面,具体的原文描述如下:
1 | 复制代码 |
比如上面我们的操作中的map操作,实际上只是将操作加到一个intermediate链条上面,不会立刻执行。重点是第五步,Stream是如何使用Fork/Join来实现并发的。evaluate这个方法至关重要,在方法里面会分开处理,对于设置了并发标志的操作流,会使用Fork/Join来并发执行操作任务,而对于没有打开并发标志的操作流,则串行执行操作。
Fork/Join框架的核心方法是一个叫做compute的方法,下面分析一个forEach操作如何通过Fork/Join框架来实现并发,通过追踪代码,可以发现forEach的并发版本其实是一个交由一个ForEachTask对象来做,而ForEachTask类中实现了compute方法:
1 | 复制代码// Similar to AbstractTask but doesn't need to track child tasks |
在上面的代码中将大任务拆成成了小任务,那哪里收集了这些小任务呢?看下面的代码:
1 | 复制代码 @Override |
可以看到调用了invoke方法,而对invoke的描述如下:
1 | 复制代码 * Commences performing this task, awaits its completion if |
不是说Fork/Join框架嘛?那有了fork为什么没有join而是invoke呢?下面是对join方法的描述:
1 | 复制代码 |
根据join的描述,我们知道还可以使用get方法来获取结果,但是get方法会抛出异常而join和invoke方法都不会抛出异常,而是将异常报告给ForkJoinTask,让ForkJoinTask来抛出异常。
本文转载自: 掘金