# 线程池分析(1)|FutureTask源码解析

[TOC]

## 源码解析

维护了一个`state`状态值,并有如下状态值

```java
private static final int NEW          = 0;// 未开始,新任务
private static final int COMPLETING   = 1;// 正在完成(用户处理已完成,正在进行结果的属性赋值)
private static final int NORMAL       = 2;// 已完成,正常返回结果
private static final int EXCEPTIONAL  = 3;// 已完成,异常返回结果
private static final int CANCELLED    = 4;// 已完成,任务已取消
private static final int INTERRUPTING = 5;// 已完成,任务正在中断
private static final int INTERRUPTED  = 6;// 已完成,任务已中断
```



有如下属性

```java
// 正在执行该Future任务的线程,要知道一个任务可以被多个线程执行,所以执行该任务完毕后,也要清空到任务的初始化阶段
private volatile Thread runner;
// 等待节点(单链表形式,存有调用该任务get()方法后,所有的等待线程)
private volatile WaitNode waiters;
// callable(用户真正允许有返回值的业务逻辑内容)
private Callable<V> callable;
```

有如下方法

> public boolean cancel(boolean mayInterruptIfRunning)

```java
// 参数:是否应该中断的方式取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
  // 如果不是崭新任务状态,或者任务状态值修改为正在中断或取消不可成功,则返回取消失败
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {  
          // 如果需要中断任务执行,就直接中断线程,完成后直接修改状态为:已中断
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
          // 最终完成处理
            finishCompletion();
        }
        return true;
    }
```

> private void finishCompletion()

这里你可以学到如何遍历单链表

```java
private void finishCompletion() {
    // assert state > COMPLETING;
  // 任务处理完成后(状态为已取消、异常、中断),唤醒由于get而进入等待的所有线程拿到该任务结果继续执行
    for (WaitNode q; (q = waiters) != null;) {
      // 依次释放等待线程节点
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
              // 释放当前等待节点的线程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                  // 唤醒这个等待线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
              // 跳出循环判断
                if (next == null)
                    break;
								// 释放等待节点
                q.next = null; // unlink to help gc
              // 进入下一次循环
                q = next;
            }
            break;
        }
    }
		// 执行子类的done方法(由子类实现),例如:ExecutorCompletionService 利用它来维护了一个‘已完成任务’的队列,实现了边产生已完成任务边处理已完成任务的结果
    done();
// 释放callable
    callable = null;        // to reduce footprint
}
```

> public V get()

```java
public V get() throws InterruptedException, ExecutionException {
        int s = state;
  // 崭新任务,则需要等待完成
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
  // 报告任务(判断输出结果是正常值还是异常信息)
        return report(s);
    }
```

> private V report(int s)

```java
private V report(int s) throws ExecutionException {
        Object x = outcome;
  // 正常值,直接强转范性返回
        if (s == NORMAL)
            return (V)x;
  // 已经取消的任务,报错
        if (s >= CANCELLED)
            throw new CancellationException();
  // 其他报错
        throw new ExecutionException((Throwable)x);
    }

```

> private int awaitDone(boolean timed, long nanos)

```java
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
  // 最长等待期限
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      // 如果等待该任务执行完毕的线程,还在超时时间内,但被中断,则抛出中断异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            // 表示已处理完成(用户内容已经处理完毕,并且已经将结果赋值到属性outcome上)
            if (q != null)
              // 但这个时候,我们却已经创建了一个等待节点(要明白任务执行线程与get()方法执行线程不互不干涉),那么我们就需要重新释放掉!
                q.thread = null;
          // 终止等待
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
          // 正在完成(用户逻辑内容已经执行完毕,正在进行outcome赋值),我们仅仅需要让出一下cpu,来等待完成
            Thread.yield();
        else if (q == null)
          // 能执行到这里,说明这是个新任务,应该直接创建等待节点
            q = new WaitNode();
        else if (!queued)
          // 能执行到这里,说明这是个新任务并且已经创建等待节点,带没有入等待单链表中,这时应该放入等待节点到等待单链表
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
          // 执行到这里,说明这个已经放入等待单链表中的等待节点是有超时时间的,过了超时时间,就不应该存在于等待节点单链表中,应该将这个节点删除掉
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
              // 删除有超时时间的等待节点(过了超时时间,这个线程就不等待了)
                removeWaiter(q);
                return state;
            }
          // 使得当前线程await,等待被unpark唤醒
            LockSupport.parkNanos(this, nanos);
        }
        else
          // 直接阻塞当前线程
            LockSupport.park(this);
    }
}
```

> public void run()

```java
// 不是新的 || 不能将 runner 改为当前线程
if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                 null, Thread.currentThread()))
    return;
try {
  // 可回调的
    Callable<V> c = callable;
    if (c != null && state == NEW) {
        V result;
      // 是否正常获取到处理结果标识
        boolean ran;
        try {
          // 获取结果(用户真正的内容处理逻辑,与结果返回逻辑)
            result = c.call();
            ran = true;
        } catch (Throwable ex) {
            result = null;
            ran = false;
          // 走异常结果处理(就是将状态改变一下,并将结果赋值给该任务的outcome属性)
            setException(ex);
        }
        if (ran)
          // 走正常结果处理(就是将状态改变一下,并将结果赋值给该任务的outcome属性)
            set(result);
    }
} finally {
// runner 运行必须赋值为null
    runner = null;
// 状态
    int s = state;
  // 如果正在中断,走中断处理
    if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
}
```

## 测试流程

```java
/**
 * 测试futureTask#get()
 */
@Test
public void test3() throws Exception {
  // 创建任务
    FutureTask<Integer> futureTask = new FutureTask<>(() -> {
        Thread.sleep(100000);
        return 1;
    });
  // 线程调用处理用户逻辑
    new Thread(futureTask).start();
  // 异步等待获取结果
    Integer integer = futureTask.get();
    System.out.println(integer);
}
```

1. 创建任务
2. 异步执行任务
3. 阻塞等待结果返回

幸运的是,这些操作都在`AbstractExecutorService`里面做了封装,甚至做了一些扩展

```java
java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)
```

## 底层原理

## `FutureTask`使用执行流程

**`FutureTask`使用执行流程:**

- 定义`FurureTask`实例,会初始化内部的`Callable`,`Callable`是用户自定义的逻辑内容
- 实例定义完成后,我们可以开启一个新的线程将这个`FutureTask`任务传进去,调用线程的`start()`方法实现异步,则会执行`FutureTask`实例中的`run()`
- 另一方面,主要线程调`FutureTask`实例的 `get()`去等待结果
- 所有线程都可多次调用该任务实例的`get()`方法,在改任务中这些线程是**以等待线程单链表的形式存储在内部**

### `get()` 底层原理

- **任务为完成状态:**直接调用`report()`处理调用`run()`后返回原生结果的属性`outcome`,返回结果
- **任务未完成状态:**死循环,等待结果
  - 任务未完成时:创建该线程的`WaitNode`实例,放在``waiters调单链表的头节点,并使用`LockSupport.park(this)`使await该线程
  - 任务正在完成时,直接调用`Thread.yield()`,让出一下cpu资源,重新抢CPU资源(因为这个时候是已经处理完用户内容,进入了一个结果赋值的状态,马上就能获取到结果)
  - 任务完成时,如果已经创建了该线程的`WaitNode`实例就释放该`WaitNode`
  - 若想要获取该任务结果的线程,在等待过程中,线程中断,则会抛出中断异常
  - 若指定了等待时常,底层则直接用:`LockSupport.parkNanos(this, nanos)`-->`UNSAFE.park(false, nanos)`

### `run()`底层原理

- 获取当前线程设置为该新任务runner,无法设置完成则不执行
- 获取内部`Callable`调用并执行
  - **返回结果正常**
    - 设置状态正在完成、设置改任务outcome属性值为返回结果值,设置改任务执行完成一切正常
    - 释放所有调用改任务`get`方法后,等待的线程(底层就是unsafe的unpark操作唤醒线程的)
    - 唤醒之后,释放等待的节点
    - 调用子类的任务完成模板方法
    - 释放`Callable`
  - **返回结果异常**
- 释放`runner`
- **如果状态为中断的话,有处理中断程序**

**总结:**

- 所有调用该任务`get()`的线程,都会依次**头插法的方式进入**一个`WaitNode`**内部线程等待单链表**中,供`run()`完成结果处理后,依次**循环单链表进行线程唤醒继续执行**
- 底层主要**使用`UNSAFE.park()`与`UNSAFE.unpark`方法**进行线程的结果等待
- 该类仅代表一个有返回值的异步任务