Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

5.1 - Future

Created by : Mr Dk.

2021 / 02 / 22 0:56 🌙

Ningbo, Zhejiang, China


About

Future 是异步计算中的核心概念,代表了异步计算的结果。该类提供成员函数来检查异步计算是否已经完成、获取异步计算结果、或取消异步计算的执行。

JDK Future

Netty 中的 Future 继承了 JDK 中原生的 Future 接口,所以先看看 JDK 原生的 Future 接口。FutureTask 是 Future 接口的实现,同时还实现了 Runnable 接口,因此可以被执行器执行。

Definition

/**
 * A {@code Future} represents the result of an asynchronous
 * computation.  Methods are provided to check if the computation is
 * complete, to wait for its completion, and to retrieve the result of
 * the computation.  The result can only be retrieved using method
 * {@code get} when the computation has completed, blocking if
 * necessary until it is ready.  Cancellation is performed by the
 * {@code cancel} method.  Additional methods are provided to
 * determine if the task completed normally or was cancelled. Once a
 * computation has completed, the computation cannot be cancelled.
 * If you would like to use a {@code Future} for the sake
 * of cancellability but not provide a usable result, you can
 * declare types of the form {@code Future<?>} and
 * return {@code null} as a result of the underlying task.
 *
 * <p>
 * <b>Sample Usage</b> (Note that the following classes are all
 * made-up.)
 * <pre> {@code
 * interface ArchiveSearcher { String search(String target); }
 * class App {
 *   ExecutorService executor = ...
 *   ArchiveSearcher searcher = ...
 *   void showSearch(final String target)
 *       throws InterruptedException {
 *     Future<String> future
 *       = executor.submit(new Callable<String>() {
 *         public String call() {
 *             return searcher.search(target);
 *         }});
 *     displayOtherThings(); // do other things while searching
 *     try {
 *       displayText(future.get()); // use future
 *     } catch (ExecutionException ex) { cleanup(); return; }
 *   }
 * }}</pre>
 *
 * The {@link FutureTask} class is an implementation of {@code Future} that
 * implements {@code Runnable}, and so may be executed by an {@code Executor}.
 * For example, the above construction with {@code submit} could be replaced by:
 *  <pre> {@code
 * FutureTask<String> future =
 *   new FutureTask<String>(new Callable<String>() {
 *     public String call() {
 *       return searcher.search(target);
 *   }});
 * executor.execute(future);}</pre>
 *
 * <p>Memory consistency effects: Actions taken by the asynchronous computation
 * <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
 * actions following the corresponding {@code Future.get()} in another thread.
 *
 * @see FutureTask
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface Future<V> {

}

Cancellation

试图取消一个异步任务的执行。如果任务已经完成,则取消失败。如果任务已经启动,则可以通过参数控制是否中断任务的执行。如果任务已经完成,则 isDone() 返回 true;如果任务已经被取消,则 isCancelled() 返回 true。

/**
 * Attempts to cancel execution of this task.  This attempt will
 * fail if the task has already completed, has already been cancelled,
 * or could not be cancelled for some other reason. If successful,
 * and this task has not started when {@code cancel} is called,
 * this task should never run.  If the task has already started,
 * then the {@code mayInterruptIfRunning} parameter determines
 * whether the thread executing this task should be interrupted in
 * an attempt to stop the task.
 *
 * <p>After this method returns, subsequent calls to {@link #isDone} will
 * always return {@code true}.  Subsequent calls to {@link #isCancelled}
 * will always return {@code true} if this method returned {@code true}.
 *
 * @param mayInterruptIfRunning {@code true} if the thread executing this
 * task should be interrupted; otherwise, in-progress tasks are allowed
 * to complete
 * @return {@code false} if the task could not be cancelled,
 * typically because it has already completed normally;
 * {@code true} otherwise
 */
boolean cancel(boolean mayInterruptIfRunning);

/**
 * Returns {@code true} if this task was cancelled before it completed
 * normally.
 *
 * @return {@code true} if this task was cancelled before it completed
 */
boolean isCancelled();

/**
 * Returns {@code true} if this task completed.
 *
 * Completion may be due to normal termination, an exception, or
 * cancellation -- in all of these cases, this method will return
 * {@code true}.
 *
 * @return {@code true} if this task completed
 */
boolean isDone();

Get

等待 (可以指定超时时间) 并获取异步任务的执行结果。

/**
 * Waits if necessary for the computation to complete, and then
 * retrieves its result.
 *
 * @return the computed result
 * @throws CancellationException if the computation was cancelled
 * @throws ExecutionException if the computation threw an
 * exception
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 */
V get() throws InterruptedException, ExecutionException;

/**
 * Waits if necessary for at most the given time for the computation
 * to complete, and then retrieves its result, if available.
 *
 * @param timeout the maximum time to wait
 * @param unit the time unit of the timeout argument
 * @return the computed result
 * @throws CancellationException if the computation was cancelled
 * @throws ExecutionException if the computation threw an
 * exception
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 * @throws TimeoutException if the wait timed out
 */
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

Netty Future

Netty 继承自 java.util.concurrent.Future,扩展了 JDK 原生 Future 的功能。

Definition

/**
 * The result of an asynchronous operation.
 */
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> {

}

Status

Netty 中的 Future 添加了对于异步任务执行状态的更详细的区分,对于已经 完成 的任务,Netty 中的 Future 将细分任务是否 成功完成。如果任务完成但失败,还可以获得代表失败原因的 Throwable 对象。

/**
 * Returns {@code true} if and only if the I/O operation was completed
 * successfully.
 */
boolean isSuccess();

/**
 * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
 */
boolean isCancellable();

/**
 * Returns the cause of the failed I/O operation if the I/O operation has
 * failed.
 *
 * @return the cause of the failure.
 *         {@code null} if succeeded or this future is not
 *         completed yet.
 */
Throwable cause();

Listeners

Netty 中的 Future 添加了对于 监听器 的支持。Future 上可以注册若干个监听器,当 Future 的 isDone() 返回 true (即任务完成) 时,已注册的监听器将会被通知,注册过的回调函数将会被回调。

/**
 * Adds the specified listener to this future.  The
 * specified listener is notified when this future is
 * {@linkplain #isDone() done}.  If this future is already
 * completed, the specified listener is notified immediately.
 */
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

/**
 * Adds the specified listeners to this future.  The
 * specified listeners are notified when this future is
 * {@linkplain #isDone() done}.  If this future is already
 * completed, the specified listeners are notified immediately.
 */
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

/**
 * Removes the first occurrence of the specified listener from this future.
 * The specified listener is no longer notified when this
 * future is {@linkplain #isDone() done}.  If the specified
 * listener is not associated with this future, this method
 * does nothing and returns silently.
 */
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

/**
 * Removes the first occurrence for each of the listeners from this future.
 * The specified listeners are no longer notified when this
 * future is {@linkplain #isDone() done}.  If the specified
 * listeners are not associated with this future, this method
 * does nothing and returns silently.
 */
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Synchronization

阻塞等待 Future 对应异步任务完成。

/**
 * Waits for this future until it is done, and rethrows the cause of the failure if this future
 * failed.
 */
Future<V> sync() throws InterruptedException;

/**
 * Waits for this future until it is done, and rethrows the cause of the failure if this future
 * failed.
 */
Future<V> syncUninterruptibly();

/**
 * Waits for this future to be completed.
 *
 * @throws InterruptedException
 *         if the current thread was interrupted
 */
Future<V> await() throws InterruptedException;

/**
 * Waits for this future to be completed without
 * interruption.  This method catches an {@link InterruptedException} and
 * discards it silently.
 */
Future<V> awaitUninterruptibly();

/**
 * Waits for this future to be completed within the
 * specified time limit.
 *
 * @return {@code true} if and only if the future was completed within
 *         the specified time limit
 *
 * @throws InterruptedException
 *         if the current thread was interrupted
 */
boolean await(long timeout, TimeUnit unit) throws InterruptedException;

/**
 * Waits for this future to be completed within the
 * specified time limit.
 *
 * @return {@code true} if and only if the future was completed within
 *         the specified time limit
 *
 * @throws InterruptedException
 *         if the current thread was interrupted
 */
boolean await(long timeoutMillis) throws InterruptedException;

/**
 * Waits for this future to be completed within the
 * specified time limit without interruption.  This method catches an
 * {@link InterruptedException} and discards it silently.
 *
 * @return {@code true} if and only if the future was completed within
 *         the specified time limit
 */
boolean awaitUninterruptibly(long timeout, TimeUnit unit);

/**
 * Waits for this future to be completed within the
 * specified time limit without interruption.  This method catches an
 * {@link InterruptedException} and discards it silently.
 *
 * @return {@code true} if and only if the future was completed within
 *         the specified time limit
 */
boolean awaitUninterruptibly(long timeoutMillis);

Get Now

以非阻塞的方式获取异步任务的执行结果。如果异步任务还没有完成,那么返回 null。

/**
 * Return the result without blocking. If the future is not done yet this will return {@code null}.
 *
 * As it is possible that a {@code null} value is used to mark the future as successful you also need to check
 * if the future is really done with {@link #isDone()} and not rely on the returned {@code null} value.
 */
V getNow();

Abstract Future

实现了 Future 接口的抽象类。

Definition

/**
 * Abstract {@link Future} implementation which does not allow for cancellation.
 *
 * @param <V>
 */
public abstract class AbstractFuture<V> implements Future<V> {

}

Get

实现了超时与非超时版本的 get()。首先阻塞等待任务结束,然后根据任务是否抛出了 Throwable 对象来判断任务是否成功完成。如果任务成功完成,那么调用非阻塞版本的 getNow() 获取任务的执行结果并返回。

@Override
public V get() throws InterruptedException, ExecutionException {
    await();

    Throwable cause = cause();
    if (cause == null) {
        return getNow();
    }
    if (cause instanceof CancellationException) {
        throw (CancellationException) cause;
    }
    throw new ExecutionException(cause);
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (await(timeout, unit)) {
        Throwable cause = cause();
        if (cause == null) {
            return getNow();
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        throw new ExecutionException(cause);
    }
    throw new TimeoutException();
}
Edit this page on GitHub
Prev
4.4 - ChannelPipeline
Next
5.2 - CompleteFuture