Guava的FuturessuccessFulAsList

1.前置知识

首先要了解Futures.successFulAsList方法原理,我们需要一下前置知识.

  • ListenableFuture这个接口,它是基于Future的子接口,而Guava 在的异步任务类基本都是实现了这个接口实现的,而具体异步任务类ListenableFutureTask除了实现了ListenableFuture这个接口之外还继承了jdk中的异步任务类FutureTask,
  • 简单来说ListenableFutureTask 其实就是对FutureTask进行了扩展封装。

2.Futures执行流程图:

Futures执行流程.png

所以我们要看到Futures.successFulAsList的源码和原理还需要了解FutureTask的原理。(相关原理可以参考 FutureTask参考)

3.对于ListenableFutureTask这个类的分析

ListenableFutureTask是guava扩展futureTask的一个很重要的类,后续guava的Futures.successFulList为什么能把多个futureTask合并也是基于该实现类,所以这个类非常重要,下面开始对这个类进行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typescript复制代码public class ListenableFutureTask<V> extends FutureTask<V>
implements ListenableFuture<V> {

// The execution list to hold our listeners.
private final ExecutionList executionList = new ExecutionList();

public static <V> ListenableFutureTask<V> create(Callable<V> callable) {
return new ListenableFutureTask<V>(callable);
}
@Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}

//任务执行结束后就会调用ExecutionList的execute方法就会开始执行监听器(这个监听器就行下面那个问题的答案,何时开始执行任务future的监听器类获取结果。)
@Override
protected void done() {
executionList.execute();
}

从上面源码我们可以看到,ListenableFutureTask其实就是对FutureTask进行扩展了,它是如何扩展?可以看到done()方法,该方法在FutureTask执行完成后会自动调用该钩子方法,表示FutureTask任务已经完成,然后就可以执行监听器链了。另外ListenableFutre是有一个监听器执行列表该表是一个单链表,用于执行回调方法用的。

4.Futures.successFulAsList 的简单使用

首先Futures.successFulAsList它的作用,当我们创建多个异步任务的时候,我们可能需要一个个的调用get方法获取结果,而且这个给获取结果的过程可能是阻塞的(任务没完成时),而且需要我们一个个的去调用,那么successFulAsList的作用就是帮助我们把多个future的聚合成一个ListtenableFutre
然后我们再通过调用聚会后的这个ListenableFutre就可以获取到所有futre的结果集了(整个整合过程也是异步处理的)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码下面是简单用法。

ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
ListenableFuture<String> lf1 = listeningExecutorService.submit(() -> {
return "listFuture1";
});
ListenableFuture<String> lf2 = listeningExecutorService.submit(() -> {
return "listFuture2";
});
List<ListenableFuture<String>> listenableFutures = Lists.newArrayList(lf1, lf2);
ListenableFuture<List<String>> listListenableFuture = Futures.successfulAsList(listenableFutures);
Thread thread = new Thread(() -> {
while (true) {
System.out.println(Thread.currentThread().getName()+"并发工作中.....");
if (listListenableFuture.isDone()){
break;
}
}
},"test-thread");

System.out.println(listListenableFuture.get());

3.successFulAsList 的原理及源码分析

首先当successFulAsList传入多个future时后会调用 Futures.lsitFuture这个方法,这个方法其实没干啥,就是创建了一个用来整合多个future的CombinedFuture类以及一个future结果处理的FutureCombiner接口的处理逻辑

  • listFuture这个方法处理主要是通过把future集传递给CombinedFutre这个类来处理ListenableFuture集,
  • 这个CombinedFuture功能和名字一样就是联合几个future集一起处理,一般需要传入一个Future集,以及执行监听器任务的线程池,
  • 同时需要传入FutureCombiner接口的实现类,这个接口主要是用来把CombinedFutre内部执行完Future集之后的结果是如何处理的,
  • 而下面这个是把结果集通过一个list存储起来返回,因为这个ListenableFuture其实主要功能就是可以提交一个List的Future去处理并且返回多个Future执行结果集,而不需要类似于Java中FutureTask,一个一个获取结果,然后存储在list中,guava其实就是把这一般进行封装,同时扩展了future执行完成后可以进行这些future的监听器,以及回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
swift复制代码//传入多个Futres然后以及线程池去处理,返回一个futeres 执行成功的结果集。
@Beta
public static <V> ListenableFuture<List<V>> successfulAsList(
Iterable<? extends ListenableFuture<? extends V>> futures) {
//先通过拷贝元素把ListenableFuture集变成一个不可变,然后调用listFuture,传入listenableFuture集,是否全部的必须成功以及线程池,listFuture正方法是把futures做处理的
return listFuture(ImmutableList.copyOf(futures), false, directExecutor());
}

private static <V> ListenableFuture<List<V>> listFuture(
ImmutableList<ListenableFuture<? extends V>> futures,
boolean allMustSucceed, Executor listenerExecutor) {
return new CombinedFuture<V, List<V>>(
futures, allMustSucceed, listenerExecutor,
//这直接new了一个接口然后写了如何把future结果结合起来返回。
new FutureCombiner<V, List<V>>() {
@Override
//由于这些可能存在则npe问题,所以内部处理的时候采用了Optional来处理避免NPE
public List<V> combine(List<Optional<V>> values) {
List<V> result = Lists.newArrayList();
for (Optional<V> element : values) {
//如果是异常或者没有数据,则添加null处理
result.add(element != null ? element.orNull() : null);
}
//最后
return Collections.unmodifiableList(result);
}
});
}

3.1 CombinedFuture类分析

CombinedFuture主要是给多个Future设置监听器,然后通过线程池执行监听器链,每个监听器都会调用setOneValue这个方法,然后在这个方法里进行自旋获取future的结果。

我们先看看CombinedFuture类的相关成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
scala复制代码/CombinedFuture是一个把多个future联合处理的一个静态内部类
private static class CombinedFuture<V, C> extends AbstractFuture<C> {
private static final Logger logger = Logger.getLogger(CombinedFuture.class.getName());
//ListenableFuture 任务集
ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
//是否必须区别执行成功,SuccessASFuture只返回执行成功的,allAsFuture需要区别执行成功,有一个失败都返回失败结果
final boolean allMustSucceed;
//记录剩余需要执行的future个数
final AtomicInteger remaining;
//把futre任务结果处理的一个类
FutureCombiner<V, C> combiner;
List<Optional<V>> values;
//异常锁
final Object seenExceptionsLock = new Object();
//存储出现的异常信息
Set<Throwable> seenExceptions;

//构造函数初始化相关参数并且调用init开始执行获取future结果
CombinedFuture(
ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
boolean allMustSucceed, Executor listenerExecutor,
FutureCombiner<V, C> combiner) {
//先初始化参数
this.futures = futures;
this.allMustSucceed = allMustSucceed;
//初始化没执行的future个数为future集的大小,主要这个类是原子类,是线程安全的。
this.remaining = new AtomicInteger(futures.size());
this.combiner = combiner;
//存储结果的一个list,会先根据futures的大小创建
this.values = Lists.newArrayListWithCapacity(futures.size());
//初始化方法,主要是作业是获取future结果,并且增加CombinedFuture类的监听器,future集执行完成之后相关参数清除。
//而且它必须在构造函数结尾调用,init方法使用了很多ConbinedFuture的成员变量。
init(listenerExecutor);
}

这个init其实就是对上面的成员变量进行初始化以及为各个ListenableFuture设置监听器来回调setOneValue ,同时也会设置一个监听器是处理future结束进行清理工作的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码    protected void init(final Executor listenerExecutor) {
//先添加一个监听器处理futre执行完成后进行相关清理工作
addListener(new Runnable() {
@Override
public void run() {
// 遍历调用future.cancel方法处理.
if (CombinedFuture.this.isCancelled()) {
for (ListenableFuture<?> future : CombinedFuture.this.futures) {
future.cancel(CombinedFuture.this.wasInterrupted());
}
}

//把futures的引用是否,等待gc回收
CombinedFuture.this.futures = null;

// By now the values array has either been set as the Future's value,
// or (in case of failure) is no longer useful.
CombinedFuture.this.values = null;

// The combiner may also hold state, so free that as well
CombinedFuture.this.combiner = null;
}
}, directExecutor());

//如果futures是空的则调用AbstractFuture抽象类上的set方法把把空的不可变结果集设置为Future的结果,内部有一个基于AQS的子类Sync通过cas处理
if (futures.isEmpty()) {
//先调用FutureCombiner的combine方法把这个结果遍历添加的list里面(开头那个接口里有combine相关处理流程)
set(combiner.combine(ImmutableList.<Optional<V>>of()));
return;
}


// 它会先把结果集全部用null占位.
for (int i = 0; i < futures.size(); ++i) {
values.add(null);
}

//遍历future任务集,并且为每个future添加监听器,
//future任务都完成之后就会调用监听器,
int i = 0;
for (final ListenableFuture<? extends V> listenable : futures) {
final int index = i++;
listenable.addListener(new Runnable() {
@Override
public void run() {
//setOneValue是调用linstenable.get()(其实就是future.get())
setOneValue(index, listenable);
}
}, listenerExecutor);
}
}

上图中的第二个addListener增加监听器后会跑到ListenableFutureTask 这个具体实现类中添加监听器,它有一个executionList监听器成员变量,这个变量是一个单链表的执行器类

下面这个是ListenableFutureTask 的addListener:

1
2
3
4
typescript复制代码  @Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}

下面来看一下ExecutionList这个类是如何处理新增的监听器的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
java复制代码//executionList 和字面意识一样是一个执行列表,他其实就是包含多个Runnable的list加线程池的组合
//前置知识线程池的执行流程
public final class ExecutionList {

@VisibleForTesting static final Logger log = Logger.getLogger(ExecutionList.class.getName());
//一个封装了Runnable 和线程池的单链表类
@GuardedBy("this")
private RunnableExecutorPair runnables;
//判断这个执行列表类的runnables这个单链表是否已经被执行过
@GuardedBy("this")
private boolean executed;

/** Creates a new, empty {@link ExecutionList}. */
public ExecutionList() {}

//添加一个执行任务,需要提供一个基于runnable接口的实现类以及一个线程池
public void add(Runnable runnable, Executor executor) {

Preconditions.checkNotNull(runnable, "Runnable was null.");
Preconditions.checkNotNull(executor, "Executor was null.");

//在执行链表添加节点时,通过synchronized锁住这个类,避免线程安全问题
synchronized (this) {
if (!executed) {
runnables = new RunnableExecutorPair(runnable, executor, runnables);
return;
}
}
//默认添加完之后会,参数把这个任务丢个线程池去处理,并不一定能处理成功,可能会存储在线程池的阻塞队列里
executeListener(runnable, executor);
}

//具体执行方法,他会先判断是否已经调用过execute这个方法执行链表,如果没有,
//则拿到执行链表的表头用一个临时链表存储表头,然后把这个类的链表置空
//由于添加链表节点的时候采用的是头插法,所以需要进行一次链表反转,然后遍历链表把runnbale交给线程池处理,
//这个有个坑,如果不熟悉线程池执行流程的话,可能会以为,这里链表存储了一次,然后add完之后交给线程池要么执行,要么存放在阻塞队列里,
//会以为出现两次执行,但是如果看过线程池源码的话,其实你可以发现当runnable被线程执行完之后,会把这个runnable置空,然后结合Java的基础知识
//包装类型以及对象类型、数组类型等参数传递过程是传递对象的地址,所以在修改传递的对象时,存放在其他地方的引用也会发生改变,所以如果runnable执行过
//那么在executeList中的执行链中这个runnable是已经被置空了的,然后线程池在线程执行
public void execute() {
// Lock while we update our state so the add method above will finish adding
// any listeners before we start to run them.
RunnableExecutorPair list;
synchronized (this) {
if (executed) {
return;
}
//把表示表示为已经执行过
executed = true;
//拿到执行链表头
list = runnables;
//置空,等待GC回收
runnables = null; // allow GC to free listeners even if this stays around for a while.
}

//线程池Executor中线程执行任务的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//拿到 Worker 中的runnable
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//这些是先执行当前worker节点中的任务,执行完之后会从任务队列里一直循环取任务执行任务,
//执行完一个runnable任务后会置为空等待GC回收
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//链表反转
RunnableExecutorPair reversedList = null;
while (list != null) {
RunnableExecutorPair tmp = list;
list = list.next;
tmp.next = reversedList;
reversedList = tmp;
}
//遍历链表交给线程池执行
while (reversedList != null) {
executeListener(reversedList.runnable, reversedList.executor);
reversedList = reversedList.next;
}
}

//添加完成后尝试把这个任务丢个线程池处理
private static void executeListener(Runnable runnable, Executor executor) {
try {
executor.execute(runnable);
} catch (RuntimeException e) {
log.log(Level.SEVERE, "RuntimeException while executing runnable "
+ runnable + " with executor " + executor, e);
}
}

//静态内部类,封装runnable以及对应执行runnbale的线程池
private static final class RunnableExecutorPair {
final Runnable runnable;
final Executor executor;
@Nullable RunnableExecutorPair next;
//基于头插法添加
RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
this.runnable = runnable;
this.executor = executor;
this.next = next;
}
}
}

在上面这个ExecutionList这个类中先add方法判断这个执行链是否已经开始执行了,如果没开始执行他会先添加到链表头,然后返回,如果已经开始执行了则调用executeListener这个方法让线程池执行。

那么这些Future的监听器是何时开始执行的?

(这个可以去看我另外一篇博客FutureTask源码分析,里面会提到:)

这个其实就是给future设置的监听器处理,然后内部有一个促发回调机制,促发之后就执行监听器方法会回调setOneValue这个方法。
下面看看setOneValue方法是是怎么处理的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
scss复制代码   /**
* 根据指定索引index,把future的结果设置进values这个结果集。
*/
private void setOneValue(int index, Future<? extends V> future) {
List<Optional<V>> localValues = values;
//如果任务已经完成或者结果集为空,然后进行判断是否是需要所有的Future成功才处理返回结果集,否则这个一段逻辑不受影响
if (isDone() || localValues == null) {
// Some other future failed or has been cancelled, causing this one to
// also be cancelled or have an exception set. This should only happen
// if allMustSucceed is true or if the output itself has been
// cancelled.
checkState(allMustSucceed || isCancelled(),
"Future was done before all dependencies completed");
}

try {
//判断当前传入需要获取结果这个future是否已经完成.
checkState(future.isDone(),
"Tried to set value from future which is not done");
//调用不可中断方法自旋获取future结果
V returnValue = getUninterruptibly(future);
if (localValues != null) {
//然后如果结果存在则把结果设置为指定索引的位置
localValues.set(index, Optional.fromNullable(returnValue));
}
} catch (CancellationException e) {
if (allMustSucceed) {
// Set ourselves as cancelled. Let the input futures keep running
// as some of them may be used elsewhere.
cancel(false);
}
} catch (ExecutionException e) {
setExceptionAndMaybeLog(e.getCause());
} catch (Throwable t) {
setExceptionAndMaybeLog(t);
} finally {
//记录剩余没执行完成的future个数
int newRemaining = remaining.decrementAndGet();
checkState(newRemaining >= 0, "Less than 0 remaining futures");
//如果这个个数等于零证明是最后一个
//则通过combiner接口的实现方法去把结果联合起来并且设置到AbstractFuture中的value(这个value就是get()方法获取的结果集)
if (newRemaining == 0) {
FutureCombiner<V, C> localCombiner = combiner;
if (localCombiner != null && localValues != null) {
//调用AbstractFuture的set方法把 FutureCombine.combine处理后的结果集设置到value中.
set(localCombiner.combine(localValues));
} else {
checkState(isDone());
}
}
}
}
}

每次future的监听器执行后会执行setOneValue方法,这个方法会判断当前这个future是否已经执行过,如果没完成任务则调用getUniterruptible这个方法不可中断的自旋调用future.get方法获取结果
然后存储在临时结果集的value中。然后当执行到最后一个是会通过set方法把整个聚会后的结果集赋值到futureTask 的value。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码//Uninterruptibles类中的getUniterruptible 不间断的调用get方法 其实就是类似于Future中get方法获取任务执行的结果,
//而且是不可被中断的,中断之后重新设置可中断标记为TRUE
public static <V> V getUninterruptibly(Future<V> future)
throws ExecutionException {
boolean interrupted = false;
try {
//自旋是为了如果被中断,重新设置中断标记后重新执行get,从而达到不可中断的效果
while (true) {
try {
return future.get();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
//任务执行完成之后才把当前线程中断
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

//AbstractFuture抽象类中set方法调用aqs子类syn.set设置结果集,然后开始调用执行监听器执行链
protected boolean set(@Nullable V value) {
//调用内部aqs子类的sync通过cas设置AbstractFutre的结果
boolean result = sync.set(value);
//如果设置成功则判断设置成功,如果成功则证明listenablefuture任务集执行完成,开始执行监听器执行链
if (result) {
executionList.execute();
}
return result;
}

总结:successFulList是如何合并多个Future结果的:successFulAsList这个方法核心原理: 利用ListenableFuture 这个接口的监听器方法然后 每个这个接口的实现类都会有有一个ExecutionList类的成员变量,然后这个ExecutionList内部一个封装着Runnable以及线程池的单链表,除此之外这个ExecutionList还有一个变量executed,这个变量是用来控制对于ListenableFuture相关实现类新增监听器时,是否立刻执行还是添加在链表里面,另外当ListenableFuture这个类的子任务类,当任务的future类完成时就会促发调用ExecutionList这个类的execute这个方法进行执行每个future的监听器,在SuccessFulList这个方法中通过创建CombinedFuture类去给各个任务的future添加监听器去调用setOneValue这个方法,然后这个方法去获取future结果.当最后一个任务Future结果获取完成后,就把结果设置到顶层的

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%