FutureTask详解
# 简介
FutureTask
表示一个可取消的异步计算。此类提供 Future
的基本实现,具有启动和取消计算、查询计算是否完成以及检索计算结果的方法。只有在计算完成时才能检索结果;如果计算尚未完成,get
方法将阻塞。计算完成后,无法重新启动或取消计算(除非使用 runAndReset 调用计算)。
FutureTask
可用于包装 Callable
或 Runnable
对象。因为 FutureTask
实现了 Runnable
,所以可以将 FutureTask 提交给 Executor
执行。
# 类关系
# Futrue
Future
类定义了上面提到的FutureTask
的那些功能,FutureTask
是具体的实现类。Future
定义如下
public interface Future<V> {
/**
* 尝试取消执行任务,如果任务已完成、已取消或由于其他原因无法取消,则返回false。
* 如果成功,并且在调用时此任务尚未启动,则此任务不会再运行。
* 如果任务已经开始,则mayInterruptRunning参数决定是否中断执行任务的线程。
*
* 此方法返回后,调用isDone将始终返回true,调用isCancelled将始终返回true。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否被取消。如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
*/
boolean isCancelled();
/**
* 判断任务是否已经完成,如果完成则返回true,否则返回false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
*/
boolean isDone();
/**
* 等待执行完成并获取结果
* @throws CancellationException 任务被取消
* @throws ExecutionException 任务执行报错
* @throws InterruptedException 线程中断
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待任务完成直到超时,返回结果。
* @throws CancellationException 任务被取消报错
* @throws ExecutionException 执行报错
* @throws InterruptedException 线程中断报错
* @throws TimeoutException 等待超时报错
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Callable
FutureTask
类中有Callable
类型成员变量,用于将无返回值的Runnable
类型的任务和有返回值的Callable
类型任务统一到一个变量上。这个从FutureTask
的构造函数上可以看出:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
2
3
4
5
6
7
8
9
10
# WaitNode
FutureTask
类中的一个成员变量,一个简单的链表,用于记录等待获取执行结果的线程。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
2
3
4
5
# 任务状态
此任务的运行状态,最初为 NEW。只有在方法 set
(设置执行结果)、setException
(设置异常)和 cancel
(取消任务)中,运行状态才会转换为终止状态。在完成期间,状态可能会呈现瞬态值 COMPLETING(在设置结果时)或 INTERRUPTING(仅在中断运行程序以满足 cancel(true) 时)。从这些中间状态到最终状态的转换使用低消耗的有序/惰性写入,因为值是唯一的并且无法进一步修改。
可能的状态转换:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELED
NEW -> INTERRUPTING -> INTERRUPTED
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
2
3
4
5
6
7
8
需要指出的是COMPLETING
这个状态并不是表示任务正在执行中,它是一个瞬时状态,当任务执行完成,先将状态设置为COMPLETING,然后记录执行结果,再将状态设置为NORMAL
或EXCEPTIONAL
。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 执行任务
执行FutureTask
的run
方法,不过实际的任务是封装在callable
成员变量中。
public void run() {
//不是初始化状态不执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行真正的任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//执行出错,记录报错
setException(ex);
}
if (ran)
//执行成功,记录结果
set(result);
}
} finally {
// 执行完毕,重置执行线程,防止并发调用
runner = null;
// 重新读取一遍状态,因为可能在任务执行时被其他线程并发中断
int s = state;
if (s >= INTERRUPTING)
//正在被其他线程中断,等待中断完成
handlePossibleCancellationInterrupt(s);
}
}
private void handlePossibleCancellationInterrupt(int s) {
//自旋等待其他设置中断的线程将任务状态设置为INTERRUPTED
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
FutureTask
还提供了一个protected
的方法runAndReset
,该方法执行任务,但不设置结果,为需要执行多次的任务而设计。如果执行出错则设置异常,跟新状态。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 取消任务
尝试取消执行此任务。如果任务已经完成、已被取消或由于其他原因无法取消,则此尝试将失败。如果成功,并且在调用cancel时此任务尚未启动,则此任务永远不会运行。如果任务已经启动,则mayInterruptIfRunning
参数确定是否应中断执行此任务的线程以尝试停止任务。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 获取结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
如果任务还未完成,则调用awaitDone
等待完成。之后调用report
方法获取执行结果。
# awaitDone
将当前线程的等待节点加入等待队列,然后通过LockSupport
阻塞当前线程,实现等待。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//线程中断 那么任务取消 删除等待获取结果的线程节点
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//已完成
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
//正在设置执行结果,那么先让出时间片
Thread.yield();
else if (q == null)
//走到这里那么需要等待结果,先初始化一个等待节点
q = new WaitNode();
else if (!queued)
//走到这里,说明等待节点已创建,添加到等待队列
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
//设置了超时时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//超时后被唤醒,将当前节点冲等待队列中删除
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# report
获取结果可能返回执行结果,也可能是抛出任务执行时的异常。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
2
3
4
5
6
7
8
# 唤醒等待线程
任务进行到最终状态时,需要唤醒正在等待结果的线程,在cancel
、set
、setException
方法中最后都会执行finishCompletion
private void finishCompletion() {
// assert state > COMPLETING;
//遍历等待节点
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒等待线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//任务执行完毕的回调方法
done();
callable = null; // to reduce footprint
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 任务执行回调
FutureTask
提供了回调方法done
,在任务执行完成时调用。方法本身为空实现,需要时可以自定义FutureTask
重写该方法。
该方法位于finishCompletion
中,所以这里的完成指的是走到最终状态。