Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

4.2 - ChannelHandler

Created by : Mr Dk.

2021 / 02 / 19 17:53

Ningbo, Zhejiang, China


ChannelHandler

截获或处理 I/O 事件,并转发到 ChannelPipeline 的下一个 ChannelHandler 中。其本身并没有提供很多的函数,而是由 ChannelInboundHandler 和 ChannelOutboundHandler 分别定义了出入站的 I/O 事件处理方式。

Definition

/**
 * Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
 * its {@link ChannelPipeline}.
 *
 * <h3>Sub-types</h3>
 * <p>
 * {@link ChannelHandler} itself does not provide many methods, but you usually have to implement one of its subtypes:
 * <ul>
 * <li>{@link ChannelInboundHandler} to handle inbound I/O events, and</li>
 * <li>{@link ChannelOutboundHandler} to handle outbound I/O operations.</li>
 * </ul>
 * </p>
 * <p>
 * Alternatively, the following adapter classes are provided for your convenience:
 * <ul>
 * <li>{@link ChannelInboundHandlerAdapter} to handle inbound I/O events,</li>
 * <li>{@link ChannelOutboundHandlerAdapter} to handle outbound I/O operations, and</li>
 * <li>{@link ChannelDuplexHandler} to handle both inbound and outbound events</li>
 * </ul>
 * </p>
 * <p>
 * For more information, please refer to the documentation of each subtype.
 * </p>
 *
 * <h3>The context object</h3>
 * <p>
 * A {@link ChannelHandler} is provided with a {@link ChannelHandlerContext}
 * object.  A {@link ChannelHandler} is supposed to interact with the
 * {@link ChannelPipeline} it belongs to via a context object.  Using the
 * context object, the {@link ChannelHandler} can pass events upstream or
 * downstream, modify the pipeline dynamically, or store the information
 * (using {@link AttributeKey}s) which is specific to the handler.
 *
 * <h3>State management</h3>
 *
 * A {@link ChannelHandler} often needs to store some stateful information.
 * The simplest and recommended approach is to use member variables:
 * <pre>
 * public interface Message {
 *     // your methods here
 * }
 *
 * public class DataServerHandler extends {@link SimpleChannelInboundHandler}&lt;Message&gt; {
 *
 *     <b>private boolean loggedIn;</b>
 *
 *     {@code @Override}
 *     public void channelRead0({@link ChannelHandlerContext} ctx, Message message) {
 *         if (message instanceof LoginMessage) {
 *             authenticate((LoginMessage) message);
 *             <b>loggedIn = true;</b>
 *         } else (message instanceof GetDataMessage) {
 *             if (<b>loggedIn</b>) {
 *                 ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
 *             } else {
 *                 fail();
 *             }
 *         }
 *     }
 *     ...
 * }
 * </pre>
 * Because the handler instance has a state variable which is dedicated to
 * one connection, you have to create a new handler instance for each new
 * channel to avoid a race condition where a unauthenticated client can get
 * the confidential information:
 * <pre>
 * // Create a new handler instance per channel.
 * // See {@link ChannelInitializer#initChannel(Channel)}.
 * public class DataServerInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
 *     {@code @Override}
 *     public void initChannel({@link Channel} channel) {
 *         channel.pipeline().addLast("handler", <b>new DataServerHandler()</b>);
 *     }
 * }
 *
 * </pre>
 *
 * <h4>Using {@link AttributeKey}s</h4>
 *
 * Although it's recommended to use member variables to store the state of a
 * handler, for some reason you might not want to create many handler instances.
 * In such a case, you can use {@link AttributeKey}s which is provided by
 * {@link ChannelHandlerContext}:
 * <pre>
 * public interface Message {
 *     // your methods here
 * }
 *
 * {@code @Sharable}
 * public class DataServerHandler extends {@link SimpleChannelInboundHandler}&lt;Message&gt; {
 *     private final {@link AttributeKey}&lt;{@link Boolean}&gt; auth =
 *           {@link AttributeKey#valueOf(String) AttributeKey.valueOf("auth")};
 *
 *     {@code @Override}
 *     public void channelRead({@link ChannelHandlerContext} ctx, Message message) {
 *         {@link Attribute}&lt;{@link Boolean}&gt; attr = ctx.attr(auth);
 *         if (message instanceof LoginMessage) {
 *             authenticate((LoginMessage) o);
 *             <b>attr.set(true)</b>;
 *         } else (message instanceof GetDataMessage) {
 *             if (<b>Boolean.TRUE.equals(attr.get())</b>) {
 *                 ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
 *             } else {
 *                 fail();
 *             }
 *         }
 *     }
 *     ...
 * }
 * </pre>
 * Now that the state of the handler is attached to the {@link ChannelHandlerContext}, you can add the
 * same handler instance to different pipelines:
 * <pre>
 * public class DataServerInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
 *
 *     private static final DataServerHandler <b>SHARED</b> = new DataServerHandler();
 *
 *     {@code @Override}
 *     public void initChannel({@link Channel} channel) {
 *         channel.pipeline().addLast("handler", <b>SHARED</b>);
 *     }
 * }
 * </pre>
 *
 *
 * <h4>The {@code @Sharable} annotation</h4>
 * <p>
 * In the example above which used an {@link AttributeKey},
 * you might have noticed the {@code @Sharable} annotation.
 * <p>
 * If a {@link ChannelHandler} is annotated with the {@code @Sharable}
 * annotation, it means you can create an instance of the handler just once and
 * add it to one or more {@link ChannelPipeline}s multiple times without
 * a race condition.
 * <p>
 * If this annotation is not specified, you have to create a new handler
 * instance every time you add it to a pipeline because it has unshared state
 * such as member variables.
 * <p>
 * This annotation is provided for documentation purpose, just like
 * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
 *
 * <h3>Additional resources worth reading</h3>
 * <p>
 * Please refer to the {@link ChannelHandler}, and
 * {@link ChannelPipeline} to find out more about inbound and outbound operations,
 * what fundamental differences they have, how they flow in a  pipeline,  and how to handle
 * the operation in your application.
 */
public interface ChannelHandler {

}

Lifecycle

在 ChannelHandler 加入 / 移除 ChannelHandlerContext (加入 / 移除 ChannelPipeline) 时被调用。

/**
 * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
 */
void handlerAdded(ChannelHandlerContext ctx) throws Exception;

/**
 * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
 * anymore.
 */
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

ChannelInboundHandler

为处理入站事件设计的 ChannelHandler。

Definition

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {

}

Lifecycle

在 ChannelHandlerContext 对应的 Channel 的状态发生变化时被调用。

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
 */
void channelRegistered(ChannelHandlerContext ctx) throws Exception;

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
 */
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} is now active
 */
void channelActive(ChannelHandlerContext ctx) throws Exception;

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
 * end of lifetime.
 */
void channelInactive(ChannelHandlerContext ctx) throws Exception;

Read

在当前 Channel 从远程接收到消息时被调用。

/**
 * Invoked when the current {@link Channel} has read a message from the peer.
 */
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

/**
 * Invoked when the last message read by the current read operation has been consumed by
 * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
 * attempt to read an inbound data from the current {@link Channel} will be made until
 * {@link ChannelHandlerContext#read()} is called.
 */
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

User Event

用户事件被触发时回调。

/**
 * Gets called if an user event was triggered.
 */
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

Writable

当 Channel 的可写状态改变时被调用。

/**
 * Gets called once the writable state of a {@link Channel} changed. You can check the state with
 * {@link Channel#isWritable()}.
 */
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

Exception

在一个 Throwable 被抛出时回调。

/**
 * Gets called if a {@link Throwable} was thrown.
 */
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

ChannelOutboundHandler

为出站事件设计的 ChannelOutboundHandler。

Definition

/**
 * {@link ChannelHandler} which will get notified for IO-outbound-operations.
 */
public interface ChannelOutboundHandler extends ChannelHandler {

}

Bind

将 Channel 绑定到本地地址。

/**
 * Called once a bind operation is made.
 *
 * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
 * @param localAddress  the {@link SocketAddress} to which it should bound
 * @param promise       the {@link ChannelPromise} to notify once the operation completes
 * @throws Exception    thrown if an error occurs
 */
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

Connect

将 Channel 连接到远程地址。

/**
 * Called once a connect operation is made.
 *
 * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
 * @param remoteAddress     the {@link SocketAddress} to which it should connect
 * @param localAddress      the {@link SocketAddress} which is used as source on connect
 * @param promise           the {@link ChannelPromise} to notify once the operation completes
 * @throws Exception        thrown if an error occurs
 */
void connect(
        ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception;

/**
 * Called once a disconnect operation is made.
 *
 * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
 * @param promise           the {@link ChannelPromise} to notify once the operation completes
 * @throws Exception        thrown if an error occurs
 */
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

Close

/**
 * Called once a close operation is made.
 *
 * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
 * @param promise           the {@link ChannelPromise} to notify once the operation completes
 * @throws Exception        thrown if an error occurs
 */
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

Deregister

当 Channel 从当前的 EventLoop 解除注册时回调。

/**
 * Called once a deregister operation is made from the current registered {@link EventLoop}.
 *
 * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
 * @param promise           the {@link ChannelPromise} to notify once the operation completes
 * @throws Exception        thrown if an error occurs
 */
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

Read / Write

/**
 * Intercepts {@link ChannelHandlerContext#read()}.
 */
void read(ChannelHandlerContext ctx) throws Exception;

/**
 * Called once a write operation is made. The write operation will write the messages through the
 * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
 * {@link Channel#flush()} is called
 *
 * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
 * @param msg               the message to write
 * @param promise           the {@link ChannelPromise} to notify once the operation completes
 * @throws Exception        thrown if an error occurs
 */
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

/**
 * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
 * that are pending.
 *
 * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
 * @throws Exception        thrown if an error occurs
 */
void flush(ChannelHandlerContext ctx) throws Exception;

ChannelHandlerAdapter

ChannelHandler 接口的抽象类实现。

Definition

/**
 * Skeleton implementation of a {@link ChannelHandler}.
 */
public abstract class ChannelHandlerAdapter implements ChannelHandler {

}

Sharable

判断当前 ChannelHandler 实例是否可以被共享。通过反射机制获取 ChannelHandler 上的 @Sharable 注解,并保存到 ThreadLocal 中作为缓存。如果不可被共享,那么抛出异常。

/**
 * Throws {@link IllegalStateException} if {@link ChannelHandlerAdapter#isSharable()} returns {@code true}
 */
protected void ensureNotSharable() {
    if (isSharable()) {
        throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared");
    }
}

/**
 * Return {@code true} if the implementation is {@link Sharable} and so can be added
 * to different {@link ChannelPipeline}s.
 */
public boolean isSharable() {
    /**
     * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
     * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
     * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
     * {@link Thread}s are quite limited anyway.
     *
     * See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
     */
    Class<?> clazz = getClass();
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    if (sharable == null) {
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}

ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter

ChannelInboundHandler 接口的默认实现。其中提供了接口内所有函数的默认实现:将操作传递到 ChannelPipeline 的下一个 ChannelHandler 中。子类可以继承该类,重写想要重新定制的函数。

/**
 * Abstract base class for {@link ChannelInboundHandler} implementations which provide
 * implementations of all of their methods.
 *
 * <p>
 * This implementation just forward the operation to the next {@link ChannelHandler} in the
 * {@link ChannelPipeline}. Sub-classes may override a method implementation to change this.
 * </p>
 * <p>
 * Be aware that messages are not released after the {@link #channelRead(ChannelHandlerContext, Object)}
 * method returns automatically. If you are looking for a {@link ChannelInboundHandler} implementation that
 * releases the received messages automatically, please see {@link SimpleChannelInboundHandler}.
 * </p>
 */
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

}
/**
 * Skeleton implementation of a {@link ChannelOutboundHandler}. This implementation just forwards each method call via
 * the {@link ChannelHandlerContext}.
 */
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

}

其默认行为即调用 ChannelHandlerContext 的相同函数实现转发,比如:

/**
 * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
 * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
 *
 * Sub-classes may override this method to change behavior.
 */
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelRegistered();
}

SimpleChannelInboundHandler

Netty 自身实现了一个简单的抽象类示例 ChannelInboundHandlerAdapter,用于 显式地处理指定类型的消息。

Definition

/**
 * {@link ChannelInboundHandlerAdapter} which allows to explicit only handle a specific type of messages.
 *
 * For example here is an implementation which only handle {@link String} messages.
 *
 * <pre>
 *     public class StringHandler extends
 *             {@link SimpleChannelInboundHandler}&lt;{@link String}&gt; {
 *
 *         {@code @Override}
 *         protected void channelRead0({@link ChannelHandlerContext} ctx, {@link String} message)
 *                 throws {@link Exception} {
 *             System.out.println(message);
 *         }
 *     }
 * </pre>
 *
 * Be aware that depending of the constructor parameters it will release all handled messages by passing them to
 * {@link ReferenceCountUtil#release(Object)}. In this case you may need to use
 * {@link ReferenceCountUtil#retain(Object)} if you pass the object to the next handler in the {@link ChannelPipeline}.
 */
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {

}

Constructor

类内维护了一个用于检测消息是否属于指定类型的匹配器。该匹配器在构造函数中被初始化。

private final TypeParameterMatcher matcher;
private final boolean autoRelease;

/**
 * see {@link #SimpleChannelInboundHandler(Class, boolean)} with {@code true} as boolean value.
 */
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
    this(inboundMessageType, true);
}

/**
 * Create a new instance
 *
 * @param inboundMessageType    The type of messages to match
 * @param autoRelease           {@code true} if handled messages should be released automatically by passing them to
 *                              {@link ReferenceCountUtil#release(Object)}.
 */
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
    matcher = TypeParameterMatcher.get(inboundMessageType);
    this.autoRelease = autoRelease;
}

Read

重写了读取消息的 channelRead() 函数,在函数内使用 matcher 对消息的类型进行判断。如果消息属于指定的类型,那么调用抽象函数 channelRead0() 读取指定类型的消息。该函数由子类根据指定的对象类型自行实现。

/**
 * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
 * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
 */
public boolean acceptInboundMessage(Object msg) throws Exception {
    return matcher.match(msg);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }
    }
}

/**
 * Is called for each message of type {@link I}.
 *
 * @param ctx           the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
 *                      belongs to
 * @param msg           the message to handle
 * @throws Exception    is thrown if an error occurred
 */
protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
Edit this page on GitHub
Prev
4.1 - Channel
Next
4.3 - ChannelHandlerContext