java Future 源码详解

xiaoxiao2021-02-28  40

缘起

java线程池提供了几种执行线程的方式,其中Future是可以获取到线程的返回值的,那么他是怎么做到的呢?获取线程的返回值又有什么应用场景呢?本文带你走进Future,探究获取线程返回值的奥秘。

Future的应用场景

比如我们登陆一个电商网站,登陆后网站需要去用户接口查询我们的名字等信息,假设这个接口耗时3s。然后又要去积分接口查询我们的积分信息,这个接口耗时5s.那么按照常规操作我们的代码示意图如下。 那么接下来我们使用线程的方式实现 那么这里引出的问题是:我们是用线程去请求接口的,那么如何拿到这个线程的返回值呢?这里就要用到Future

使用方式:

ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(new impleCallable()); String sss = (String) future.get(); System.out.println(sss); public class impleCallable implements Callable{ @Override public Object call() throws Exception { return "我是实现了Callable的一个线程"; } }

输出:

我是实现了Callable的一个线程

可以看到,通过Future可以获取到线程的返回值,那么他是如何做到的呢?下面我们来看源码。

源码:

下面看调用的submit方法

public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }

可以看到是把我们实现了Callabe的对象封装成了一个FutureTask对象,并且调用了线程池的execute()方法,咦。。这个方法不是我们平常把线程丢到线程池里执行的方法吗?为啥我们调用的是线程池的submit()方法最终是调用了execute()方法呢?这个FutureTask何许人也?下面来看。

public class FutureTask<V> implements RunnableFuture<V> { public void run() { try { Callable<V> c = callable; if (c != null && state == NEW) { V result; try { result = c.call(); } catch (Throwable ex) { } if (ran) set(result); } } } } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; } }

可以看到这个FutureTask本身也是实现Runnable接口,也就是他也是一个线程,看他的run方法可以看到他最终调用的是我们submit(T)的时候传过来的对象的call方法,并且把返回值赋值给result.然后调用set方法把result复制给outcome。至此,线程执行完毕,并且返回值也赋值给了outcome.那么如何获取这个outcome呢,下面我们看FutureTask的get()方法

public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L);//等待线程执行完毕 return report(s); }

get方法会调用awaitDone

private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }

awaitDone方法对主线程进行阻塞,通过LockSupport的park或parkNanos挂起线程,那么挂起的线程什么时候被唤醒呢? 我们看到之前的run方法中会调用一个set方法

protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }

set调用了finishCompletion();

private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //关键代码,唤醒调用get的时候堵塞的主线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }

LockSupport.unpark(t);是唤醒堵塞的主线程的方法,唤醒之后,堵塞在get方法处的主线程就会执行report方法,从而拿到线程的执行结果。

private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

可以看到,get方法就是把outcome给返回出去了。

理解

可以理解为,我们submit一个Callable的时候,实际上是新建了一个线程(FutureTask)来执行我们提交的任务。而这个新建的线程(FutureTask)中含有一个变量outcome用来存储我们提交的任务的返回值。当调用FutureTask的get()的时候会检查我们的任务执行完毕没有,如果没有执行完毕就堵塞:park(Object blocker) 表示阻塞主线程。等待执行完毕之后调用unpark(Thread thread) 唤醒主线程线程,参数thread指定线程对象。

转载请注明原文地址: https://www.6miu.com/read-2621088.html

最新回复(0)