Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

4.3 - ChannelHandlerContext

Created by : Mr Dk.

2021 / 02 / 20 22:37

Ningbo, Zhejiang, China


ChannelHandlerContext

使得一个 ChannelHandler 能够与 ChannelPipeline 及其其它 ChannelHandler 交互:

  • 通知 ChannelPipeline 中下一个 ChannelHandler
  • 动态修改 ChannelPipeline

另外,还可以用于保存状态变量。

Definition

ChannelHandlerContext 继承了 ChannelInboundInvoker 和 ChannelOutboundInvoker 接口,从而继承了调用 ChannelInboundHandler 和 ChannelOutboundHandler 的所有函数。

/**
 * Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline}
 * and other handlers. Among other things a handler can notify the next {@link ChannelHandler} in the
 * {@link ChannelPipeline} as well as modify the {@link ChannelPipeline} it belongs to dynamically.
 *
 * <h3>Notify</h3>
 *
 * You can notify the closest handler in the same {@link ChannelPipeline} by calling one of the various methods
 * provided here.
 *
 * Please refer to {@link ChannelPipeline} to understand how an event flows.
 *
 * <h3>Modifying a pipeline</h3>
 *
 * You can get the {@link ChannelPipeline} your handler belongs to by calling
 * {@link #pipeline()}.  A non-trivial application could insert, remove, or
 * replace handlers in the pipeline dynamically at runtime.
 *
 * <h3>Retrieving for later use</h3>
 *
 * You can keep the {@link ChannelHandlerContext} for later use, such as
 * triggering an event outside the handler methods, even from a different thread.
 * <pre>
 * public class MyHandler extends {@link ChannelDuplexHandler} {
 *
 *     <b>private {@link ChannelHandlerContext} ctx;</b>
 *
 *     public void beforeAdd({@link ChannelHandlerContext} ctx) {
 *         <b>this.ctx = ctx;</b>
 *     }
 *
 *     public void login(String username, password) {
 *         ctx.write(new LoginMessage(username, password));
 *     }
 *     ...
 * }
 * </pre>
 *
 * <h3>Storing stateful information</h3>
 *
 * {@link #attr(AttributeKey)} allow you to
 * store and access stateful information that is related with a {@link ChannelHandler} / {@link Channel} and its
 * context. Please refer to {@link ChannelHandler} to learn various recommended
 * ways to manage stateful information.
 *
 * <h3>A handler can have more than one {@link ChannelHandlerContext}</h3>
 *
 * Please note that a {@link ChannelHandler} instance can be added to more than
 * one {@link ChannelPipeline}.  It means a single {@link ChannelHandler}
 * instance can have more than one {@link ChannelHandlerContext} and therefore
 * the single instance can be invoked with different
 * {@link ChannelHandlerContext}s if it is added to one or more {@link ChannelPipeline}s more than once.
 * Also note that a {@link ChannelHandler} that is supposed to be added to multiple {@link ChannelPipeline}s should
 * be marked as {@link io.netty.channel.ChannelHandler.Sharable}.
 *
 * <h3>Additional resources worth reading</h3>
 * <p>
 * Please refer to the {@link ChannelHandler}, and
 * {@link ChannelPipeline} to find out more about inbound and outbound operations,
 * what fundamental differences they have, how they flow in a  pipeline,  and how to handle
 * the operation in your application.
 */
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

}

Pipeline

返回 ChannelHandlerContext 绑定的 Channel、ChannelHandler。

/**
 * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
 */
Channel channel();

/**
 * Returns the {@link EventExecutor} which is used to execute an arbitrary task.
 */
EventExecutor executor();

/**
 * The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
 * was added to the {@link ChannelPipeline}. This name can also be used to access the registered
 * {@link ChannelHandler} from the {@link ChannelPipeline}.
 */
String name();

/**
 * The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
 */
ChannelHandler handler();

/**
 * Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
 * from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
 * {@link EventLoop}.
 */
boolean isRemoved();

/**
 * Return the assigned {@link ChannelPipeline}
 */
ChannelPipeline pipeline();

/**
 * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
 */
ByteBufAllocator alloc();

AbstractChannelHandlerContext

ChannelHandlerContext 接口的抽象类实现。

Definition

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

}

State

使用一个变量来记录 ChannelHandlerContext 的状态。

private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

/**
 * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
 */
private static final int ADD_PENDING = 1;
/**
 * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
 */
private static final int ADD_COMPLETE = 2;
/**
 * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
 */
private static final int REMOVE_COMPLETE = 3;
/**
 * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
 * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
 */
private static final int INIT = 0;

private volatile int handlerState = INIT;

Next

类内维护了对前一个 ChannelHandlerContext 和后一个 ChannelHandlerContext 的引用,形成了逻辑上的双向链表。

volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;

该逻辑链表用于实现 ChannelInboundHandler 和 ChannelOutboundHandler 接口中定义的 调用下一个同方向 handler 的同名函数 的逻辑。

  1. 从当前 ChannelHandlerContext 开始沿链表找到下一个同方向 (入站 / 出站) 的 ChannelHandlerContext
  2. 取得下一个处理事件的 ChannelHandlerContext 的执行线程
  3. 如果执行线程是 EventLoop 线程,则立刻开始执行下一个 ChannelHandlerContext 的同名函数;否则先将任务添加到 EventLoop 的任务队列稍后执行

以下函数根据入站 / 出站方向遍历 ChannelHandlerContext 链表,直到找到下一个同向的 ChannelHandlerContext:

private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.next;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}

private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.prev;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
    return ctx;
}

private static boolean skipContext(
    AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
    // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
    return (ctx.executionMask & (onlyMask | mask)) == 0 ||
        // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
        // everything to preserve ordering.
        //
        // See https://github.com/netty/netty/issues/10067
        (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

找到下一个同向的 ChannelHandlerContext 后,获取到执行器并调用同名函数:

  • 如果当前线程是 EventLoop 线程,则立刻调用
  • 如果当前线程不是 EventLoop 线程,则将调用逻辑包装为一个 Runnable 任务,添加到执行器的任务队列中
@Override
public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
    return this;
}

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelActive();
    }
}

/**
 * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
 * yet. If not return {@code false} and if called or could not detect return {@code true}.
 *
 * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
 * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
 * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
 */
private boolean invokeHandler() {
    // Store in local variable to reduce volatile reads.
    int handlerState = this.handlerState;
    return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}

DefaultChannelHandlerContext

基本上所有的成员变量都已经被维护在 AbstractChannelHandlerContext 类内。该类继承抽象类,类内仅维护对应的 ChannelHandler。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}
Edit this page on GitHub
Prev
4.2 - ChannelHandler
Next
4.4 - ChannelPipeline