5.1 - Future
Created by : Mr Dk.
2021 / 02 / 22 0:56 🌙
Ningbo, Zhejiang, China
About
Future
是异步计算中的核心概念,代表了异步计算的结果。该类提供成员函数来检查异步计算是否已经完成、获取异步计算结果、或取消异步计算的执行。
JDK Future
Netty 中的 Future
继承了 JDK 中原生的 Future
接口,所以先看看 JDK 原生的 Future
接口。FutureTask
是 Future
接口的实现,同时还实现了 Runnable
接口,因此可以被执行器执行。
Definition
/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
*
* <p>
* <b>Sample Usage</b> (Note that the following classes are all
* made-up.)
* <pre> {@code
* interface ArchiveSearcher { String search(String target); }
* class App {
* ExecutorService executor = ...
* ArchiveSearcher searcher = ...
* void showSearch(final String target)
* throws InterruptedException {
* Future<String> future
* = executor.submit(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* displayOtherThings(); // do other things while searching
* try {
* displayText(future.get()); // use future
* } catch (ExecutionException ex) { cleanup(); return; }
* }
* }}</pre>
*
* The {@link FutureTask} class is an implementation of {@code Future} that
* implements {@code Runnable}, and so may be executed by an {@code Executor}.
* For example, the above construction with {@code submit} could be replaced by:
* <pre> {@code
* FutureTask<String> future =
* new FutureTask<String>(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* executor.execute(future);}</pre>
*
* <p>Memory consistency effects: Actions taken by the asynchronous computation
* <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
* actions following the corresponding {@code Future.get()} in another thread.
*
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface Future<V> {
}
Cancellation
试图取消一个异步任务的执行。如果任务已经完成,则取消失败。如果任务已经启动,则可以通过参数控制是否中断任务的执行。如果任务已经完成,则 isDone()
返回 true
;如果任务已经被取消,则 isCancelled()
返回 true
。
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();
Get
等待 (可以指定超时时间) 并获取异步任务的执行结果。
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
Netty Future
Netty 继承自 java.util.concurrent.Future
,扩展了 JDK 原生 Future
的功能。
Definition
/**
* The result of an asynchronous operation.
*/
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> {
}
Status
Netty 中的 Future
添加了对于异步任务执行状态的更详细的区分,对于已经 完成 的任务,Netty 中的 Future
将细分任务是否 成功完成。如果任务完成但失败,还可以获得代表失败原因的 Throwable
对象。
/**
* Returns {@code true} if and only if the I/O operation was completed
* successfully.
*/
boolean isSuccess();
/**
* returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
*/
boolean isCancellable();
/**
* Returns the cause of the failed I/O operation if the I/O operation has
* failed.
*
* @return the cause of the failure.
* {@code null} if succeeded or this future is not
* completed yet.
*/
Throwable cause();
Listeners
Netty 中的 Future
添加了对于 监听器 的支持。Future
上可以注册若干个监听器,当 Future
的 isDone()
返回 true
(即任务完成) 时,已注册的监听器将会被通知,注册过的回调函数将会被回调。
/**
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Adds the specified listeners to this future. The
* specified listeners are notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* Removes the first occurrence of the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Removes the first occurrence for each of the listeners from this future.
* The specified listeners are no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Synchronization
阻塞等待 Future
对应异步任务完成。
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> sync() throws InterruptedException;
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> syncUninterruptibly();
/**
* Waits for this future to be completed.
*
* @throws InterruptedException
* if the current thread was interrupted
*/
Future<V> await() throws InterruptedException;
/**
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*/
Future<V> awaitUninterruptibly();
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeoutMillis);
Get Now
以非阻塞的方式获取异步任务的执行结果。如果异步任务还没有完成,那么返回 null
。
/**
* Return the result without blocking. If the future is not done yet this will return {@code null}.
*
* As it is possible that a {@code null} value is used to mark the future as successful you also need to check
* if the future is really done with {@link #isDone()} and not rely on the returned {@code null} value.
*/
V getNow();
Abstract Future
实现了 Future
接口的抽象类。
Definition
/**
* Abstract {@link Future} implementation which does not allow for cancellation.
*
* @param <V>
*/
public abstract class AbstractFuture<V> implements Future<V> {
}
Get
实现了超时与非超时版本的 get()
。首先阻塞等待任务结束,然后根据任务是否抛出了 Throwable
对象来判断任务是否成功完成。如果任务成功完成,那么调用非阻塞版本的 getNow()
获取任务的执行结果并返回。
@Override
public V get() throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}