FutureTask源码分析

1.本文默认,都是已使用过FutreTask类以及线程池

1.FutureTask 是什么?

Future类图.png

我们可以看到Futuetask 是继承了RunnableFuture接口,然后RunnableFuture继承了Future和Runnable接口,Runnable接口就不多说了,而这个Future接口是一个异步计算类接口,它提供了获取结果、判断是否执行完成、取消任务等接口,所以FutureTask其实就是一个支持异步任务获取结果的一个异步任务类,其实内部还使用一个callable接口这个接口是Runnable接口的进阶版它也是创建线程的一种方式只是他支持返回结果以及抛出异常,FutureTask它支持callable接口以及Runnable接口提交任务,内部使用了一个RunnableAdapter接口做适配.

2.FutureTask如何使用?

  1. 提交的是Runnable 的任务
1
2
3
4
5
6
typescript复制代码FutureTask<String> task = new FutureTask<String>(new Runnable() {
@Override
public void run() {
//处理逻辑
}
},"");
  1. 提交的是Callable的任务
1
2
3
4
5
6
7
typescript复制代码FutureTask<String> callableTask = new FutureTask<String>(new Callable<String>() {

@Override
public String call() throws Exception {
return "callable";
}
});
  1. 基于函数式编程提交任务(其实就是callable)
1
2
3
4
arduino复制代码FutureTask<String> functionTask = new FutureTask<String>(()->{
//处理逻辑
return "demo";
});

最后把任务提交给线程池处理即可

3.FutureTask源码分析

3.1 FutureTask构建函数以及相关变量

FutureTask类是一个一个stat变量用来记录当前任务执行的状态:新建、正在计算(完成的中间)、完成、取消、正在中断、中断等几个状态。
构造函数有两个,一个是基于Callable接口提交任务,一个基于Runnable,但是FutureTask的任务是callable所以,它需要通过适配器类进行适配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码全局状态,用来判断当前任务处于那个阶段
private volatile int state;
处于新建状态
private static final int NEW = 0;
任务处理完成中间状态,马上就可以获得结果
private static final int COMPLETING = 1;
任务完成时的状态
private static final int NORMAL = 2;
任务异常后的状态
private static final int EXCEPTIONAL = 3;
任务被取消状态
private static final int CANCELLED = 4;
中断时的中间状态
private static final int INTERRUPTING = 5;
中断状态
private static final int INTERRUPTED = 6;

(具体执行任务的 接口带返回值的Callable接口
private Callable<V> callable;
任务结果:可能是正常结果,也可能是异常信息
private Object outcome; // non-volatile, protected by state reads/writes
执行callable的线程
private volatile Thread runner;
在等待的线程,是一个单链表
private volatile WaitNode waiters;

callable的构造器函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

//基于runnable接口的构造函数,runnable没有返回值,所以需要传入一个结果类型。
public FutureTask(Runnable runnable, V result) {
// 调用适配器类Executors去返回一个callable接口的子类。
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

//Executors.callable,这里用到了适配器模式
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
//创建了一个适配器类返回
return new RunnableAdapter<T>(task, result); //适配器只需要实现callable接口即可
}

3.2 获取任务结果的get方法

FutureTask的get()获取结果的方法,有两个一个是不指定时间获取,一直阻塞到任务执行完成,第二个是可以指定时间获取结果,如果获取不到就会抛出异常信息和返回异常信息。

get方法做的操作主要有几个:

  • 调用awaitDone方法进行获取任务状态,该方法主要进行几个判断:
  1. 判断线程是否已经中断,如果在一九被中断,则从等待线程链表节点中移除,然后抛出中断移除
  2. 判断当前任务状态,如果状态是已经完成状态,则把当前线程节点的线程置空,然后返回任务状态
  3. 如果当前结果正在计算中(完成前的中间状态),则让当前线程进行礼让(让出cpu)
  4. 如果当前节点是null ,则构建一个waiterNode节点(默认是当前线程)
  5. 如果节点没入队,则把waiteNode节点入队
  6. 如果等待有时间限制,则判断是否超过时间,如果超过则移除节点,返回状态,否则通过LockSupport挂起线程(分有时间限制和没时间限制)
  • 调用report方法处理结果,该方法通过判断当前任务状态处理什么状态返回返回不同的结果。

下面源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
java复制代码/**
* 获取结果,如果任务没完成则调用awaitDone()方法等待任务完成,返回一个状态然后让report处理结果
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//最终调用report返回结果
return report(s);
}

/**
* 指点时间返回
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

//主要是通过中断和超时等待结果或者正常完成或者终止。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//获取处理时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;
//判断节点是否在链表中
boolean queued = false;
//自旋
for (;;) {
//1.线程已经中断判断是否已经被中断,如果中断则移除该任务
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

//2.如果线程已经完成,则直接把当前waitNode置空,并且返回当前状态
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果当前状态处于计算机结果中,则线程进行礼让(让出cpu时间片)
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//如果节点等null,则创建节点
else if (q == null)
q = new WaitNode();
//如果发现不在链表中则添加到链表头部
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
//如果等待有时间限制,则判断是否已经超时,如果超时则移除节点,并返回状态
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//如果没超时则,设置一个有效时间,通过LockSupport把当前线程挂起.
LockSupport.parkNanos(this, nanos);
}
else
//挂起线程。
LockSupport.park(this);
}
}


//内部方法主要用来返回结果,它会判断当前状态是否是正常,如果正常直接返回,否则抛出一个异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

3.3 任务取消方法Cancel

FutureTask 取消任务方法主要是做了几个操作:

  • 如果任务状态新建并且通过cas修改任务状态为中断或者取消状态失败则直接返回,如果修改成功,则进行一个中断处理,然后通过Unsafe方法 需要任务状态设为中断状态
  • 最后调用finishCompletion方法进行收尾工作,其实就是把线程等待节点处理。
    • finishCompletion方法主要是通过Unsafe方法把waiteNode节点轮流置空和把线程唤醒, 处理完成后,会回调一个done方法,这个方法是个钩子方法,让子类去做扩展操作的,在guava中ListenableFuture中就通过这个done方法执行监听器执行链的

下面是源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
typescript复制代码//用于取消任务执行,是否支持中断取消
public boolean cancel(boolean mayInterruptIfRunning) {
//如果表示新建状态,或者通过CAS修改状态为中断或者取消状态失败,则直接返回取消失败
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { //如果是可中断处理的则获取执行callable的线程执行中断处理
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // 调用UNSAFE修改stat状态的偏移量为中断中断状态。
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//最后调用finishCompletion 处理结果。
finishCompletion();
}
return true;
}

//判断是否以及取消
public boolean isCancelled() {
return state >= CANCELLED;
}
//判断是否以及完成任务,会发现其他几种除了中间状态其实都代表着任务已经完成了(可能是异常完成或者中断)
public boolean isDone() {
return state != NEW;
}

//任务执行完成或者取消后调用该方法收尾。
private void finishCompletion() {
// 这里主要是移除所以等待线程,等待GC回收
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//把等待线程唤醒。
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//这里调用了一个done方法这个方法是抽象方法主要是用于扩展用的,如果你在任务执行完成后还需要做一些其他操作则可以实现该接口处理,
//guava中ListenablFuture就是基于这个接口任务完成后回调done方法去执行监听器执行处理。
done();

callable = null; // 置空等待回收
}


//抽象方法留给子类扩展使用,如果完成任务后需要做额外操作可以通过该接口扩展。
protected void done() { }

3.4 FutureTask的run方法以及Set方法

FutureTask的run方法是重新了接口中的run方法,线程池中创建线程出来执行任务就是调用run方法去执行,run方法的主要操作有几个

  • 判断执行任务的线程状态是否是当前线程以及当前任务的状态,通过cas把当前线程设置为执行任务线程,如果设置失败证明线程已经被其他线程处理了,或者当前任务状态表示新建也是表示已经有其他线程处理了。
  • 如果是为执行的任务,则再次判断状态已经任务callable是否合法,如果校验通过,则调用callable的call方法执行热为奴,如果出现异常则调用setException方法把异常处理
  • 如果正常执行完成,则通过set方法设置结构到全局结构中。
  • 最后把执行任务线程置空,避免重复执行任务(因为是并发执行),,然后会调用一个handlePOossibleCancellationinterrupt来判断是否被其他线程取消任务修改状态
    set方法主要是用来处理结果的,一个是通过判断当前任务状态,如果状态是完成的中间状态的着把结果赋值给全局任务结果outCome ,然后把状态修改为完成,最后调用finishCompletion方法处理收尾工作。
  • runAndReset方法和run方法类似,只这个方法是处理定时任务线程池执行任务才会调用这个,这个方法执行完成后会把任务状态设置为新建状态,为下一次定时执行做准备。

下面是源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
typescript复制代码 //该方法是当线程执行完run方法之后会调用set方法使用cas修改状态、设置结果,处理善后工作.
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

//处理异常结果的方法
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

//实现了RunnableFuture接口的run方法,线程池内部执行是会调用该接口
//这个接口干了啥?
//1. 先判断当前线程的状态,如果表示new则证明已经被调用过
//
//2. 如果没执行过,尝试把当前线程设置为执行这个任务的线程,并且 通过cas把执行线程修改成功则,继续执行,否则cas修改失败证明,这个执行任务线程已经被其他线程设置(已经有线程执行这个操作了)
//修改成功后,执行 callable的call方法执行任务。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {//如果异常则把异常信息处理到结果中
result = null;
ran = false;
setException(ex);
}
//如果正常完成着调用set方法把结果设置到全局outCome
if (ran)
set(result);
}
} finally {
//最后执行完后要置空,防止并非调用call或者run方法
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;//重新获取,可能有其他线程修改了,然后进行中断中间状态或者中断状态
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

//处理来只取消的中断处理,如果处于中断中则调用线程礼让(方法)方法让出cpu
private void handlePossibleCancellationInterrupt(int s) {

if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}

//该方法是定时线程池中执行run方法会调用这个方法,定时任务中执行一次完成后需要把状态重新设置为new,为下一轮执行做准备。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

3.5 WaitNode内部类及remodeWaitNode方法

waitNode类是一个简单的单链表,主要是用来存储一个堆栈中的等待线程,而waitNode方法主要是进行节点移除操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
ini复制代码  /**
* 单链表结构,用来记录一个堆栈中的等待线程
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}



//移除等待的线程节点
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

// Unsafe 获取 任务状态stat ,已经当前任务执行的线程和等待的线程
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
  1. 总结
    FuturTask 是一个可以获取任务结果和处理移除的异步任务类,它需要结合线程池来使用,任务的执行入口其实是在线程池中,线程池通过调用FutureTask实现的run方法,然后FutureTask同Unsafe类处理任务状态以及任务线程来保证线程只有一次,同时还预留了扩展接口done,让我们可以对FutureTask类进行扩展,而guava中的ListenableFutre就是基于FutureTask的done接口进行了扩展实现了future的监听器。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%