5.2 - CompleteFuture
Created by : Mr Dk.
2021 / 02 / 22 11:21
Ningbo, Zhejiang, China
CompleteFuture
抽象类 AbstractFuture
的进一步抽象实现,表示一个已经完成的异步任务结果。
Definition
/**
* A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already.
*/
public abstract class CompleteFuture<V> extends AbstractFuture<V> {
}
Executor
类内维护一个执行器,用于执行异步任务的回调。
private final EventExecutor executor;
/**
* Creates a new instance.
*
* @param executor the {@link EventExecutor} associated with this future
*/
protected CompleteFuture(EventExecutor executor) {
this.executor = executor;
}
/**
* Return the {@link EventExecutor} which is used by this {@link CompleteFuture}.
*/
protected EventExecutor executor() {
return executor;
}
Listeners
由于 CompleteFuture
对应的异步任务已经完成,因此添加监听器时可以 立刻通知 监听器,立刻调用回调函数;移除监听器没有效果。
@Override
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
DefaultPromise.notifyListener(executor(), this, ObjectUtil.checkNotNull(listener, "listener"));
return this;
}
@Override
public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
for (GenericFutureListener<? extends Future<? super V>> l:
ObjectUtil.checkNotNull(listeners, "listeners")) {
if (l == null) {
break;
}
DefaultPromise.notifyListener(executor(), this, l);
}
return this;
}
@Override
public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
// NOOP
return this;
}
@Override
public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
// NOOP
return this;
}
Synchronization
由于 CompleteFuture
对应的异步任务已经完成,因此同步函数可以立刻返回。区别在于判断当前线程是否收到了中断信号,以决定返回 true
/ false
。
@Override
public Future<V> await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
@Override
public Future<V> sync() throws InterruptedException {
return this;
}
@Override
public Future<V> syncUninterruptibly() {
return this;
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
@Override
public Future<V> awaitUninterruptibly() {
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return true;
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
}
Cancellable
由于任务已经完成,因此任务不可取消。
@Override
public boolean isDone() {
return true;
}
@Override
public boolean isCancellable() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
/**
* {@inheritDoc}
*
* @param mayInterruptIfRunning this value has no effect in this implementation.
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
ChannelFuture
ChannelFuture
接口继承自 Future
接口,只多定义函数以获取 Future
对应的 Channel
对象。
Definition
注释内详细说明了一个 ChannelFuture
可能进入的状态,以及成员函数应该返回的值:
- 未完成
- 完成 (成功)
- 完成 (失败)
- 完成 (被取消)
/**
* The result of an asynchronous {@link Channel} I/O operation.
* <p>
* All I/O operations in Netty are asynchronous. It means any I/O calls will
* return immediately with no guarantee that the requested I/O operation has
* been completed at the end of the call. Instead, you will be returned with
* a {@link ChannelFuture} instance which gives you the information about the
* result or status of the I/O operation.
* <p>
* A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
* When an I/O operation begins, a new future object is created. The new future
* is uncompleted initially - it is neither succeeded, failed, nor cancelled
* because the I/O operation is not finished yet. If the I/O operation is
* finished either successfully, with failure, or by cancellation, the future is
* marked as completed with more specific information, such as the cause of the
* failure. Please note that even failure and cancellation belong to the
* completed state.
* <pre>
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = true |
* +--------------------------+ | | isSuccess() = true |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = false | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = true |
* | isCancelled() = false | | | cause() = non-null |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = true |
* | isCancelled() = true |
* +---------------------------+
* </pre>
*
* Various methods are provided to let you check if the I/O operation has been
* completed, wait for the completion, and retrieve the result of the I/O
* operation. It also allows you to add {@link ChannelFutureListener}s so you
* can get notified when the I/O operation is completed.
*
* <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3>
*
* It is recommended to prefer {@link #addListener(GenericFutureListener)} to
* {@link #await()} wherever possible to get notified when an I/O operation is
* done and to do any follow-up tasks.
* <p>
* {@link #addListener(GenericFutureListener)} is non-blocking. It simply adds
* the specified {@link ChannelFutureListener} to the {@link ChannelFuture}, and
* I/O thread will notify the listeners when the I/O operation associated with
* the future is done. {@link ChannelFutureListener} yields the best
* performance and resource utilization because it does not block at all, but
* it could be tricky to implement a sequential logic if you are not used to
* event-driven programming.
* <p>
* By contrast, {@link #await()} is a blocking operation. Once called, the
* caller thread blocks until the operation is done. It is easier to implement
* a sequential logic with {@link #await()}, but the caller thread blocks
* unnecessarily until the I/O operation is done and there's relatively
* expensive cost of inter-thread notification. Moreover, there's a chance of
* dead lock in a particular circumstance, which is described below.
*
* <h3>Do not call {@link #await()} inside {@link ChannelHandler}</h3>
* <p>
* The event handler methods in {@link ChannelHandler} are usually called by
* an I/O thread. If {@link #await()} is called by an event handler
* method, which is called by the I/O thread, the I/O operation it is waiting
* for might never complete because {@link #await()} can block the I/O
* operation it is waiting for, which is a dead lock.
* <pre>
* // BAD - NEVER DO THIS
* {@code @Override}
* public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
* {@link ChannelFuture} future = ctx.channel().close();
* future.awaitUninterruptibly();
* // Perform post-closure operation
* // ...
* }
*
* // GOOD
* {@code @Override}
* public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
* {@link ChannelFuture} future = ctx.channel().close();
* future.addListener(new {@link ChannelFutureListener}() {
* public void operationComplete({@link ChannelFuture} future) {
* // Perform post-closure operation
* // ...
* }
* });
* }
* </pre>
* <p>
* In spite of the disadvantages mentioned above, there are certainly the cases
* where it is more convenient to call {@link #await()}. In such a case, please
* make sure you do not call {@link #await()} in an I/O thread. Otherwise,
* {@link BlockingOperationException} will be raised to prevent a dead lock.
*
* <h3>Do not confuse I/O timeout and await timeout</h3>
*
* The timeout value you specify with {@link #await(long)},
* {@link #await(long, TimeUnit)}, {@link #awaitUninterruptibly(long)}, or
* {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O
* timeout at all. If an I/O operation times out, the future will be marked as
* 'completed with failure,' as depicted in the diagram above. For example,
* connect timeout should be configured via a transport-specific option:
* <pre>
* // BAD - NEVER DO THIS
* {@link Bootstrap} b = ...;
* {@link ChannelFuture} f = b.connect(...);
* f.awaitUninterruptibly(10, TimeUnit.SECONDS);
* if (f.isCancelled()) {
* // Connection attempt cancelled by user
* } else if (!f.isSuccess()) {
* // You might get a NullPointerException here because the future
* // might not be completed yet.
* f.cause().printStackTrace();
* } else {
* // Connection established successfully
* }
*
* // GOOD
* {@link Bootstrap} b = ...;
* // Configure the connect timeout option.
* <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b>
* {@link ChannelFuture} f = b.connect(...);
* f.awaitUninterruptibly();
*
* // Now we are sure the future is completed.
* assert f.isDone();
*
* if (f.isCancelled()) {
* // Connection attempt cancelled by user
* } else if (!f.isSuccess()) {
* f.cause().printStackTrace();
* } else {
* // Connection established successfully
* }
* </pre>
*/
public interface ChannelFuture extends Future<Void> {
}
Channel
获取该 Future
对应的 I/O 操作来自于哪一个 Channel
。
/**
* Returns a channel where the I/O operation associated with this
* future takes place.
*/
Channel channel();
Is Void
一个 Void
的 ChannelFuture
不允许调用同步函数,或添加监听器。
/**
* Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
* following methods:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #addListeners(GenericFutureListener[])}</li>
* <li>{@link #await()}</li>
* <li>{@link #await(long, TimeUnit)} ()}</li>
* <li>{@link #await(long)} ()}</li>
* <li>{@link #awaitUninterruptibly()}</li>
* <li>{@link #sync()}</li>
* <li>{@link #syncUninterruptibly()}</li>
* </ul>
*/
boolean isVoid();
CompleteChannelFuture
一个实现了 ChannelFuture
接口的 CompleteFuture
抽象类实现,代表了一个已经完成的 ChannelFuture
。
Definition
/**
* A skeletal {@link ChannelFuture} implementation which represents a
* {@link ChannelFuture} which has been completed already.
*/
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture {
}
Executor
类内维护了 Future
对应的 Channel
。CompleteFuture
中维护的 Executor
来自于 Channel
的 EventLoop
。
private final Channel channel;
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
*/
protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
super(executor);
this.channel = ObjectUtil.checkNotNull(channel, "channel");
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
SucceededChannelFuture
CompleteChannelFuture
的具体实现,表示 Channel
的异步任务执行成功的结果。由于成功意味着没有抛出异常,所以 cause()
返回 null
,isSuccess()
返回 true
。
/**
* The {@link CompleteChannelFuture} which is succeeded already. It is
* recommended to use {@link Channel#newSucceededFuture()} instead of
* calling the constructor of this future.
*/
final class SucceededChannelFuture extends CompleteChannelFuture {
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
*/
SucceededChannelFuture(Channel channel, EventExecutor executor) {
super(channel, executor);
}
@Override
public Throwable cause() {
return null;
}
@Override
public boolean isSuccess() {
return true;
}
}
FailedChannelFuture
CompleteChannelFuture
的另一个具体实现,表示 Channel
的异步任务执行失败的结果。类内维护了任务执行失败所抛出的 Throwable
对象。
/**
* The {@link CompleteChannelFuture} which is failed already. It is
* recommended to use {@link Channel#newFailedFuture(Throwable)}
* instead of calling the constructor of this future.
*/
final class FailedChannelFuture extends CompleteChannelFuture {
private final Throwable cause;
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
* @param cause the cause of failure
*/
FailedChannelFuture(Channel channel, EventExecutor executor, Throwable cause) {
super(channel, executor);
this.cause = ObjectUtil.checkNotNull(cause, "cause");
}
@Override
public Throwable cause() {
return cause;
}
@Override
public boolean isSuccess() {
return false;
}
@Override
public ChannelFuture sync() {
PlatformDependent.throwException(cause);
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
PlatformDependent.throwException(cause);
return this;
}
}