Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

5.2 - CompleteFuture

Created by : Mr Dk.

2021 / 02 / 22 11:21

Ningbo, Zhejiang, China


CompleteFuture

抽象类 AbstractFuture 的进一步抽象实现,表示一个已经完成的异步任务结果。

Definition

/**
 * A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already.
 */
public abstract class CompleteFuture<V> extends AbstractFuture<V> {

}

Executor

类内维护一个执行器,用于执行异步任务的回调。

private final EventExecutor executor;

/**
 * Creates a new instance.
 *
 * @param executor the {@link EventExecutor} associated with this future
 */
protected CompleteFuture(EventExecutor executor) {
    this.executor = executor;
}

/**
 * Return the {@link EventExecutor} which is used by this {@link CompleteFuture}.
 */
protected EventExecutor executor() {
    return executor;
}

Listeners

由于 CompleteFuture 对应的异步任务已经完成,因此添加监听器时可以 立刻通知 监听器,立刻调用回调函数;移除监听器没有效果。

@Override
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    DefaultPromise.notifyListener(executor(), this, ObjectUtil.checkNotNull(listener, "listener"));
    return this;
}

@Override
public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
    for (GenericFutureListener<? extends Future<? super V>> l:
         ObjectUtil.checkNotNull(listeners, "listeners")) {

        if (l == null) {
            break;
        }
        DefaultPromise.notifyListener(executor(), this, l);
    }
    return this;
}

@Override
public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
    // NOOP
    return this;
}

@Override
public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
    // NOOP
    return this;
}

Synchronization

由于 CompleteFuture 对应的异步任务已经完成,因此同步函数可以立刻返回。区别在于判断当前线程是否收到了中断信号,以决定返回 true / false。

@Override
public Future<V> await() throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    return this;
}

@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    return true;
}

@Override
public Future<V> sync() throws InterruptedException {
    return this;
}

@Override
public Future<V> syncUninterruptibly() {
    return this;
}

@Override
public boolean await(long timeoutMillis) throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    return true;
}

@Override
public Future<V> awaitUninterruptibly() {
    return this;
}

@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
    return true;
}

@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
    return true;
}

Cancellable

由于任务已经完成,因此任务不可取消。

@Override
public boolean isDone() {
    return true;
}

@Override
public boolean isCancellable() {
    return false;
}

@Override
public boolean isCancelled() {
    return false;
}

/**
 * {@inheritDoc}
 *
 * @param mayInterruptIfRunning this value has no effect in this implementation.
 */
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    return false;
}

ChannelFuture

ChannelFuture 接口继承自 Future 接口,只多定义函数以获取 Future 对应的 Channel 对象。

Definition

注释内详细说明了一个 ChannelFuture 可能进入的状态,以及成员函数应该返回的值:

  • 未完成
  • 完成 (成功)
  • 完成 (失败)
  • 完成 (被取消)
/**
 * The result of an asynchronous {@link Channel} I/O operation.
 * <p>
 * All I/O operations in Netty are asynchronous.  It means any I/O calls will
 * return immediately with no guarantee that the requested I/O operation has
 * been completed at the end of the call.  Instead, you will be returned with
 * a {@link ChannelFuture} instance which gives you the information about the
 * result or status of the I/O operation.
 * <p>
 * A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
 * When an I/O operation begins, a new future object is created.  The new future
 * is uncompleted initially - it is neither succeeded, failed, nor cancelled
 * because the I/O operation is not finished yet.  If the I/O operation is
 * finished either successfully, with failure, or by cancellation, the future is
 * marked as completed with more specific information, such as the cause of the
 * failure.  Please note that even failure and cancellation belong to the
 * completed state.
 * <pre>
 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+
 * </pre>
 *
 * Various methods are provided to let you check if the I/O operation has been
 * completed, wait for the completion, and retrieve the result of the I/O
 * operation. It also allows you to add {@link ChannelFutureListener}s so you
 * can get notified when the I/O operation is completed.
 *
 * <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3>
 *
 * It is recommended to prefer {@link #addListener(GenericFutureListener)} to
 * {@link #await()} wherever possible to get notified when an I/O operation is
 * done and to do any follow-up tasks.
 * <p>
 * {@link #addListener(GenericFutureListener)} is non-blocking.  It simply adds
 * the specified {@link ChannelFutureListener} to the {@link ChannelFuture}, and
 * I/O thread will notify the listeners when the I/O operation associated with
 * the future is done.  {@link ChannelFutureListener} yields the best
 * performance and resource utilization because it does not block at all, but
 * it could be tricky to implement a sequential logic if you are not used to
 * event-driven programming.
 * <p>
 * By contrast, {@link #await()} is a blocking operation.  Once called, the
 * caller thread blocks until the operation is done.  It is easier to implement
 * a sequential logic with {@link #await()}, but the caller thread blocks
 * unnecessarily until the I/O operation is done and there's relatively
 * expensive cost of inter-thread notification.  Moreover, there's a chance of
 * dead lock in a particular circumstance, which is described below.
 *
 * <h3>Do not call {@link #await()} inside {@link ChannelHandler}</h3>
 * <p>
 * The event handler methods in {@link ChannelHandler} are usually called by
 * an I/O thread.  If {@link #await()} is called by an event handler
 * method, which is called by the I/O thread, the I/O operation it is waiting
 * for might never complete because {@link #await()} can block the I/O
 * operation it is waiting for, which is a dead lock.
 * <pre>
 * // BAD - NEVER DO THIS
 * {@code @Override}
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.awaitUninterruptibly();
 *     // Perform post-closure operation
 *     // ...
 * }
 *
 * // GOOD
 * {@code @Override}
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.addListener(new {@link ChannelFutureListener}() {
 *         public void operationComplete({@link ChannelFuture} future) {
 *             // Perform post-closure operation
 *             // ...
 *         }
 *     });
 * }
 * </pre>
 * <p>
 * In spite of the disadvantages mentioned above, there are certainly the cases
 * where it is more convenient to call {@link #await()}. In such a case, please
 * make sure you do not call {@link #await()} in an I/O thread.  Otherwise,
 * {@link BlockingOperationException} will be raised to prevent a dead lock.
 *
 * <h3>Do not confuse I/O timeout and await timeout</h3>
 *
 * The timeout value you specify with {@link #await(long)},
 * {@link #await(long, TimeUnit)}, {@link #awaitUninterruptibly(long)}, or
 * {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O
 * timeout at all.  If an I/O operation times out, the future will be marked as
 * 'completed with failure,' as depicted in the diagram above.  For example,
 * connect timeout should be configured via a transport-specific option:
 * <pre>
 * // BAD - NEVER DO THIS
 * {@link Bootstrap} b = ...;
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly(10, TimeUnit.SECONDS);
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     // You might get a NullPointerException here because the future
 *     // might not be completed yet.
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
 *
 * // GOOD
 * {@link Bootstrap} b = ...;
 * // Configure the connect timeout option.
 * <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b>
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly();
 *
 * // Now we are sure the future is completed.
 * assert f.isDone();
 *
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
 * </pre>
 */
public interface ChannelFuture extends Future<Void> {

}

Channel

获取该 Future 对应的 I/O 操作来自于哪一个 Channel。

/**
 * Returns a channel where the I/O operation associated with this
 * future takes place.
 */
Channel channel();

Is Void

一个 Void 的 ChannelFuture 不允许调用同步函数,或添加监听器。

/**
 * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
 * following methods:
 * <ul>
 *     <li>{@link #addListener(GenericFutureListener)}</li>
 *     <li>{@link #addListeners(GenericFutureListener[])}</li>
 *     <li>{@link #await()}</li>
 *     <li>{@link #await(long, TimeUnit)} ()}</li>
 *     <li>{@link #await(long)} ()}</li>
 *     <li>{@link #awaitUninterruptibly()}</li>
 *     <li>{@link #sync()}</li>
 *     <li>{@link #syncUninterruptibly()}</li>
 * </ul>
 */
boolean isVoid();

CompleteChannelFuture

一个实现了 ChannelFuture 接口的 CompleteFuture 抽象类实现,代表了一个已经完成的 ChannelFuture。

Definition

/**
 * A skeletal {@link ChannelFuture} implementation which represents a
 * {@link ChannelFuture} which has been completed already.
 */
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture {

}

Executor

类内维护了 Future 对应的 Channel。CompleteFuture 中维护的 Executor 来自于 Channel 的 EventLoop。

private final Channel channel;

/**
 * Creates a new instance.
 *
 * @param channel the {@link Channel} associated with this future
 */
protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
    super(executor);
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
}

@Override
protected EventExecutor executor() {
    EventExecutor e = super.executor();
    if (e == null) {
        return channel().eventLoop();
    } else {
        return e;
    }
}

SucceededChannelFuture

CompleteChannelFuture 的具体实现,表示 Channel 的异步任务执行成功的结果。由于成功意味着没有抛出异常,所以 cause() 返回 null,isSuccess() 返回 true。

/**
 * The {@link CompleteChannelFuture} which is succeeded already.  It is
 * recommended to use {@link Channel#newSucceededFuture()} instead of
 * calling the constructor of this future.
 */
final class SucceededChannelFuture extends CompleteChannelFuture {

    /**
     * Creates a new instance.
     *
     * @param channel the {@link Channel} associated with this future
     */
    SucceededChannelFuture(Channel channel, EventExecutor executor) {
        super(channel, executor);
    }

    @Override
    public Throwable cause() {
        return null;
    }

    @Override
    public boolean isSuccess() {
        return true;
    }
}

FailedChannelFuture

CompleteChannelFuture 的另一个具体实现,表示 Channel 的异步任务执行失败的结果。类内维护了任务执行失败所抛出的 Throwable 对象。

/**
 * The {@link CompleteChannelFuture} which is failed already.  It is
 * recommended to use {@link Channel#newFailedFuture(Throwable)}
 * instead of calling the constructor of this future.
 */
final class FailedChannelFuture extends CompleteChannelFuture {

    private final Throwable cause;

    /**
     * Creates a new instance.
     *
     * @param channel the {@link Channel} associated with this future
     * @param cause   the cause of failure
     */
    FailedChannelFuture(Channel channel, EventExecutor executor, Throwable cause) {
        super(channel, executor);
        this.cause = ObjectUtil.checkNotNull(cause, "cause");
    }

    @Override
    public Throwable cause() {
        return cause;
    }

    @Override
    public boolean isSuccess() {
        return false;
    }

    @Override
    public ChannelFuture sync() {
        PlatformDependent.throwException(cause);
        return this;
    }

    @Override
    public ChannelFuture syncUninterruptibly() {
        PlatformDependent.throwException(cause);
        return this;
    }
}
Edit this page on GitHub
Prev
5.1 - Future
Next
5.3 - FutureListener