简单了解过CompletableFuture的API之后,接下来尝试着从源码的角度说明一下CompletableFuture的执行过程。包括:
- CompletableFuture是如何创建并执行一个异步任务的
- 如何创造一个被回调的任务
- 任务是如何被回调的
如何创建一个异步任务
以supplyAsync为例来说明CompletableFuture如何创建一个异步任务并运行;
supplyAsync的重载方法中,都通过调用asyncSupplyStage方法来创建任务,区别在于二者提供的线程池对象不同。一个使用默认的ForkJoinPool线程池,另一个使用调用者提供的线程池。
supplyAsync
1 | java复制代码 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { |
asyncSupplyStage提交任务的过程可以分为三步:
- 首先,创建了一个代表当前任务执行阶段的CompletableFuture对象并最终返回;
- 其次,将代表当前阶段的CompletableFuture对象与Supplier接口封装到AsyncSupply;
- 最终,将AsyncSupply对象提交到线程池中执行
这一波操作,与平时的使用Runnable、Callable接口创建异步任务提交到线程池执行基本一致。
1 | java复制代码 static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) { |
任务如何被执行的
任务被提交到线程池中后,可能会通过exec方法或run方法被调用,这取决于具体的线程池实现。而AsyncSupply将任务逻辑都封装到了run方法中,Supplier接口的get方法也在这里被调用,我们封装的逻辑也在此时被执行。如果执行没有异常,那么通过completeValue方法将值封装到CompletableFuture中。如果执行出现异常,则通过completeThrowable方法封装一个异常的结果到CompletableFuture中。
1 | java复制代码static final class AsyncSupply<T> extends ForkJoinTask<Void> |
以上就是一个CompletableFuture异步任务的创建与执行过程。那么,如果需要在当前的异步任务完成时执行其他逻辑,CompletableFuture时如何实现的呢?
如何创建一个回调任务
要说清楚CompletableFuture时如何执行一个回调任务,需要先简单说明一下CompletableFuture的主要结构。
CompletableFuture的主要结构
CompletableFuture的源码中,除去注释,前两行源码是这个样子的。
1 | java复制代码public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { |
result代表着CompletableFuture对象的执行结果。Completion类型的stack字段代表着需要回调的后续任务。Completion对象内部维护着一个next指针,可以指向下一个需要被回调的对象,所有需要被回调的对象组成了一个单项链表。链表的节点由Completion对象组成。而stack指向的是这个链表的头结点,也是最后一个入栈的Completion对象。
所以,被封装了回调任务的CompletableFuture对象应该长这个样子。
Completion对象里面封装的是需要被回调的任务逻辑。但是代表当前阶段的任务又在哪里?其实,CompletableFuture并不知道当前阶段的任务在哪里,而是返过来通过任务指向代表当前阶段的对象。Completion对象通过dep字段,持有代表当前任务阶段的CompletableFuture对象。所以,完成的调用链可能长这个样子:
封装回调任务
那么,任务时何时被封装到调用链中的呢?下面以uniApplyXXX方法为例进行说明。uniApplyXXX的三个重载方法最终都转发到了uniApplyStage方法中,区别在于传递的线程池参数不同。
uniApplyStage源码
1 | java复制代码private <V> CompletableFuture<V> uniApplyStage( |
uniApplyStage的源码中,完成将任务封装到stack中的目标,只用了两行代码:
1 | java复制代码 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); |
- 创建Completion对象(UniApply为其子类)
- push到stack队列中,成为新的头结点。
可以通过源码看到,任务并不是无条件的放到节点中的。满足以下其中一个条件,任务才会被放到调用链中:
- 当任务需要提交到线程池中时,无条件的创建Completion对象,push到stack链表中。
- 当uniAppply方法执行失败时,创建Completion对象,push到stack链表中。
uniApply的源码下面会讲到,为了不影响理解这里的逻辑,我们先忽略源码,理解其功能。uniApply的作用是判断任务是否满足执行条件,满足则执行封装的函数式接口。那么,这句话if (e != null || !d.uniApply(this, f, null))
中的uniApply方法的作用就很明显了。即,判断当前任务是否已经满足了执行条件,如果满足就直接执行。这样的操作省去了创建Completion对象和入栈的步骤。这里的“满足执行条件”,可以理解为依赖的前任任务是否已经执行完成。
tryFire方法
那么剩下的这个“尝试开火”方法是做什么的呢?当然是激发弹夹中的子弹了!stack就是CompletableFuture的弹夹(stack–>栈结构–>弹夹🤣),Completion就是子弹。
所以tryFire方法的作用就是尝试执行stack中的任务。此处的tryFire方法,通过刚刚创建的UniApply对象调用,并执行封装在其中的任务逻辑。
此处调用是为了避免任务完成入栈后,前置CompletableFuture已经执行完成,从而错过了回调的时机,导致当前的任务无法被触发的情况。
tryFire源码
1 | java复制代码final CompletableFuture<V> tryFire(int mode) { |
通过源码也可以再次确认,tyrFire方法其中的主要逻辑之一就是尝试执行封装的任务逻辑。
postFire源码
postFire主要用来处理任务执行完成的后续工作。如清理stack中的无效节点,嵌套调用时返回当前CompletableFuture对象或在非嵌套调用时执行postComplete方法,用来激发后续任务。
1 | java复制代码final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { |
UniApply源码
前面已经说过,UniApply的作用就是判断任务是否满足执行条件,然后执行封装的函数式接口。这个过程大概可以分为四个部分:
- 判断前置任务是否完成;
- 判断前置任务是否有异常;
- 判断当前任务是否已经被其他线程声明了执行权限;
- 调用函数式接口中的方法,执行任务逻辑,并封装结果。
1 | java复制代码 final <S> boolean uniApply(CompletableFuture<S> a, |
如何触发回调任务
再说CompletableFuture的结构
虽然前面已经说过了结构的问题,但是实际的情况还要更复杂些。要想说清楚回调的问题,有必要将CompletableFuture的结构说的更准确一些。
前面说过,Completion对象通过dep指向代表当前阶段的CompletableFuture对象。但是没有说的是,这个CompletableFuture对象也可能会有自己的回调链(stack指向的单项链表)。因此,完整的回调结构可能长这个样子的。
什么时候触发回调
看过了CompletableFuture的回调结构,我们来看回调是如何被触发的。
supplyAsync的源码中,执行了这样一个方法d.postComplete();
,该方法就是触发后续任务的关键。
1 | java复制代码final void postComplete() { |
触发的过程可以分为一下几部
- 取下stack中的首节点:首先从当前CompletableFuture对象(this)中,获取到回调链stack。如果stack不为空,先获取首节点的引用,然后将stack通过CAS指向next。如果CAS更新成功,获取了头结点的执行权限,可以进行下一步。否则重复上述过程,直到成功取下一个节点或没有任务需要执行。
- 执行节点的任务逻辑:第一次取得头结点后
if (f != this)
显然是不成立的,先不考虑里面包含的逻辑。关注这行代码h.tryFire(NESTED)
。tryFire方法与前面说的一致,就是执行Completion中封装的任务逻辑。如果一切顺利,那么第一个需要被回调的任务就开始执行了。 - 重新赋值f:tryFire在嵌套调用时,如果Completion指向的CompletableFuture对象也有需要被回调的任务,那么tryFire方法会返回该CompletableFuture对象,否则返回null。因此,
f = (d = h.tryFire(NESTED)) == null ? this : d;
这句话的作用就是:如果有后续任务,依赖于当前执行的阶段,那么返回代表这个阶段的CompletableFuture对象,赋值给f。否则,f仍然指向this。 - 将递归调用转为循环调用:当f指向了下一阶段的CompletableFuture对象后,
if (f != this)
条件成立,执行pushStack方法。该方法把上一步tryFire返回的CompletableFuture对象的回调任务压入到了自己的stack栈中。通过while循环,直到所有的任务都被压入后,f.stack
的值变为null。此时,f被重新指向this继续回调后续的任务,直到所有的任务都被触发。这样做是为了将递归调用改为循环调用,防止递归过深。
过程如下图:
本文转载自: 掘金