1.前置知识
首先要了解Futures.successFulAsList方法原理,我们需要一下前置知识.
- ListenableFuture这个接口,它是基于Future的子接口,而Guava 在的异步任务类基本都是实现了这个接口实现的,而具体异步任务类ListenableFutureTask除了实现了ListenableFuture这个接口之外还继承了jdk中的异步任务类FutureTask,
- 简单来说ListenableFutureTask 其实就是对FutureTask进行了扩展封装。
2.Futures执行流程图:
所以我们要看到Futures.successFulAsList的源码和原理还需要了解FutureTask的原理。(相关原理可以参考 FutureTask参考)
3.对于ListenableFutureTask这个类的分析
ListenableFutureTask是guava扩展futureTask的一个很重要的类,后续guava的Futures.successFulList为什么能把多个futureTask合并也是基于该实现类,所以这个类非常重要,下面开始对这个类进行分析:
1 | typescript复制代码public class ListenableFutureTask<V> extends FutureTask<V> |
从上面源码我们可以看到,ListenableFutureTask其实就是对FutureTask进行扩展了,它是如何扩展?可以看到done()方法,该方法在FutureTask执行完成后会自动调用该钩子方法,表示FutureTask任务已经完成,然后就可以执行监听器链了。另外ListenableFutre是有一个监听器执行列表该表是一个单链表,用于执行回调方法用的。
4.Futures.successFulAsList 的简单使用
首先Futures.successFulAsList它的作用,当我们创建多个异步任务的时候,我们可能需要一个个的调用get方法获取结果,而且这个给获取结果的过程可能是阻塞的(任务没完成时),而且需要我们一个个的去调用,那么successFulAsList的作用就是帮助我们把多个future的聚合成一个ListtenableFutre
然后我们再通过调用聚会后的这个ListenableFutre就可以获取到所有futre的结果集了(整个整合过程也是异步处理的)。
1 | ini复制代码下面是简单用法。 |
3.successFulAsList 的原理及源码分析
首先当successFulAsList传入多个future时后会调用 Futures.lsitFuture这个方法,这个方法其实没干啥,就是创建了一个用来整合多个future的CombinedFuture类以及一个future结果处理的FutureCombiner接口的处理逻辑
- listFuture这个方法处理主要是通过把future集传递给CombinedFutre这个类来处理ListenableFuture集,
- 这个CombinedFuture功能和名字一样就是联合几个future集一起处理,一般需要传入一个Future集,以及执行监听器任务的线程池,
- 同时需要传入FutureCombiner接口的实现类,这个接口主要是用来把CombinedFutre内部执行完Future集之后的结果是如何处理的,
- 而下面这个是把结果集通过一个list存储起来返回,因为这个ListenableFuture其实主要功能就是可以提交一个List的Future去处理并且返回多个Future执行结果集,而不需要类似于Java中FutureTask,一个一个获取结果,然后存储在list中,guava其实就是把这一般进行封装,同时扩展了future执行完成后可以进行这些future的监听器,以及回调函数。
1 | swift复制代码//传入多个Futres然后以及线程池去处理,返回一个futeres 执行成功的结果集。 |
3.1 CombinedFuture类分析
CombinedFuture主要是给多个Future设置监听器,然后通过线程池执行监听器链,每个监听器都会调用setOneValue这个方法,然后在这个方法里进行自旋获取future的结果。
我们先看看CombinedFuture类的相关成员变量:
1 | scala复制代码/CombinedFuture是一个把多个future联合处理的一个静态内部类 |
这个init其实就是对上面的成员变量进行初始化以及为各个ListenableFuture设置监听器来回调setOneValue ,同时也会设置一个监听器是处理future结束进行清理工作的
1 | java复制代码 protected void init(final Executor listenerExecutor) { |
上图中的第二个addListener增加监听器后会跑到ListenableFutureTask 这个具体实现类中添加监听器,它有一个executionList监听器成员变量,这个变量是一个单链表的执行器类
下面这个是ListenableFutureTask 的addListener:
1 | typescript复制代码 @Override |
下面来看一下ExecutionList这个类是如何处理新增的监听器的:
1 | java复制代码//executionList 和字面意识一样是一个执行列表,他其实就是包含多个Runnable的list加线程池的组合 |
在上面这个ExecutionList这个类中先add方法判断这个执行链是否已经开始执行了,如果没开始执行他会先添加到链表头,然后返回,如果已经开始执行了则调用executeListener这个方法让线程池执行。
那么这些Future的监听器是何时开始执行的?
(这个可以去看我另外一篇博客FutureTask源码分析,里面会提到:)
这个其实就是给future设置的监听器处理,然后内部有一个促发回调机制,促发之后就执行监听器方法会回调setOneValue这个方法。
下面看看setOneValue方法是是怎么处理的:
1 | scss复制代码 /** |
每次future的监听器执行后会执行setOneValue方法,这个方法会判断当前这个future是否已经执行过,如果没完成任务则调用getUniterruptible这个方法不可中断的自旋调用future.get方法获取结果
然后存储在临时结果集的value中。然后当执行到最后一个是会通过set方法把整个聚会后的结果集赋值到futureTask 的value。
1 | java复制代码//Uninterruptibles类中的getUniterruptible 不间断的调用get方法 其实就是类似于Future中get方法获取任务执行的结果, |
总结:successFulList是如何合并多个Future结果的:successFulAsList这个方法核心原理: 利用ListenableFuture 这个接口的监听器方法然后 每个这个接口的实现类都会有有一个ExecutionList类的成员变量,然后这个ExecutionList内部一个封装着Runnable以及线程池的单链表,除此之外这个ExecutionList还有一个变量executed,这个变量是用来控制对于ListenableFuture相关实现类新增监听器时,是否立刻执行还是添加在链表里面,另外当ListenableFuture这个类的子任务类,当任务的future类完成时就会促发调用ExecutionList这个类的execute这个方法进行执行每个future的监听器,在SuccessFulList这个方法中通过创建CombinedFuture类去给各个任务的future添加监听器去调用setOneValue这个方法,然后这个方法去获取future结果.当最后一个任务Future结果获取完成后,就把结果设置到顶层的
本文转载自: 掘金