4.2 - ChannelHandler
Created by : Mr Dk.
2021 / 02 / 19 17:53
Ningbo, Zhejiang, China
ChannelHandler
截获或处理 I/O 事件,并转发到 ChannelPipeline
的下一个 ChannelHandler
中。其本身并没有提供很多的函数,而是由 ChannelInboundHandler
和 ChannelOutboundHandler
分别定义了出入站的 I/O 事件处理方式。
Definition
/**
* Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
* its {@link ChannelPipeline}.
*
* <h3>Sub-types</h3>
* <p>
* {@link ChannelHandler} itself does not provide many methods, but you usually have to implement one of its subtypes:
* <ul>
* <li>{@link ChannelInboundHandler} to handle inbound I/O events, and</li>
* <li>{@link ChannelOutboundHandler} to handle outbound I/O operations.</li>
* </ul>
* </p>
* <p>
* Alternatively, the following adapter classes are provided for your convenience:
* <ul>
* <li>{@link ChannelInboundHandlerAdapter} to handle inbound I/O events,</li>
* <li>{@link ChannelOutboundHandlerAdapter} to handle outbound I/O operations, and</li>
* <li>{@link ChannelDuplexHandler} to handle both inbound and outbound events</li>
* </ul>
* </p>
* <p>
* For more information, please refer to the documentation of each subtype.
* </p>
*
* <h3>The context object</h3>
* <p>
* A {@link ChannelHandler} is provided with a {@link ChannelHandlerContext}
* object. A {@link ChannelHandler} is supposed to interact with the
* {@link ChannelPipeline} it belongs to via a context object. Using the
* context object, the {@link ChannelHandler} can pass events upstream or
* downstream, modify the pipeline dynamically, or store the information
* (using {@link AttributeKey}s) which is specific to the handler.
*
* <h3>State management</h3>
*
* A {@link ChannelHandler} often needs to store some stateful information.
* The simplest and recommended approach is to use member variables:
* <pre>
* public interface Message {
* // your methods here
* }
*
* public class DataServerHandler extends {@link SimpleChannelInboundHandler}<Message> {
*
* <b>private boolean loggedIn;</b>
*
* {@code @Override}
* public void channelRead0({@link ChannelHandlerContext} ctx, Message message) {
* if (message instanceof LoginMessage) {
* authenticate((LoginMessage) message);
* <b>loggedIn = true;</b>
* } else (message instanceof GetDataMessage) {
* if (<b>loggedIn</b>) {
* ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
* } else {
* fail();
* }
* }
* }
* ...
* }
* </pre>
* Because the handler instance has a state variable which is dedicated to
* one connection, you have to create a new handler instance for each new
* channel to avoid a race condition where a unauthenticated client can get
* the confidential information:
* <pre>
* // Create a new handler instance per channel.
* // See {@link ChannelInitializer#initChannel(Channel)}.
* public class DataServerInitializer extends {@link ChannelInitializer}<{@link Channel}> {
* {@code @Override}
* public void initChannel({@link Channel} channel) {
* channel.pipeline().addLast("handler", <b>new DataServerHandler()</b>);
* }
* }
*
* </pre>
*
* <h4>Using {@link AttributeKey}s</h4>
*
* Although it's recommended to use member variables to store the state of a
* handler, for some reason you might not want to create many handler instances.
* In such a case, you can use {@link AttributeKey}s which is provided by
* {@link ChannelHandlerContext}:
* <pre>
* public interface Message {
* // your methods here
* }
*
* {@code @Sharable}
* public class DataServerHandler extends {@link SimpleChannelInboundHandler}<Message> {
* private final {@link AttributeKey}<{@link Boolean}> auth =
* {@link AttributeKey#valueOf(String) AttributeKey.valueOf("auth")};
*
* {@code @Override}
* public void channelRead({@link ChannelHandlerContext} ctx, Message message) {
* {@link Attribute}<{@link Boolean}> attr = ctx.attr(auth);
* if (message instanceof LoginMessage) {
* authenticate((LoginMessage) o);
* <b>attr.set(true)</b>;
* } else (message instanceof GetDataMessage) {
* if (<b>Boolean.TRUE.equals(attr.get())</b>) {
* ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
* } else {
* fail();
* }
* }
* }
* ...
* }
* </pre>
* Now that the state of the handler is attached to the {@link ChannelHandlerContext}, you can add the
* same handler instance to different pipelines:
* <pre>
* public class DataServerInitializer extends {@link ChannelInitializer}<{@link Channel}> {
*
* private static final DataServerHandler <b>SHARED</b> = new DataServerHandler();
*
* {@code @Override}
* public void initChannel({@link Channel} channel) {
* channel.pipeline().addLast("handler", <b>SHARED</b>);
* }
* }
* </pre>
*
*
* <h4>The {@code @Sharable} annotation</h4>
* <p>
* In the example above which used an {@link AttributeKey},
* you might have noticed the {@code @Sharable} annotation.
* <p>
* If a {@link ChannelHandler} is annotated with the {@code @Sharable}
* annotation, it means you can create an instance of the handler just once and
* add it to one or more {@link ChannelPipeline}s multiple times without
* a race condition.
* <p>
* If this annotation is not specified, you have to create a new handler
* instance every time you add it to a pipeline because it has unshared state
* such as member variables.
* <p>
* This annotation is provided for documentation purpose, just like
* <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
*
* <h3>Additional resources worth reading</h3>
* <p>
* Please refer to the {@link ChannelHandler}, and
* {@link ChannelPipeline} to find out more about inbound and outbound operations,
* what fundamental differences they have, how they flow in a pipeline, and how to handle
* the operation in your application.
*/
public interface ChannelHandler {
}
Lifecycle
在 ChannelHandler
加入 / 移除 ChannelHandlerContext
(加入 / 移除 ChannelPipeline
) 时被调用。
/**
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
ChannelInboundHandler
为处理入站事件设计的 ChannelHandler
。
Definition
/**
* {@link ChannelHandler} which adds callbacks for state changes. This allows the user
* to hook in to state changes easily.
*/
public interface ChannelInboundHandler extends ChannelHandler {
}
Lifecycle
在 ChannelHandlerContext
对应的 Channel
的状态发生变化时被调用。
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
Read
在当前 Channel
从远程接收到消息时被调用。
/**
* Invoked when the current {@link Channel} has read a message from the peer.
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
User Event
用户事件被触发时回调。
/**
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
Writable
当 Channel
的可写状态改变时被调用。
/**
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
Exception
在一个 Throwable
被抛出时回调。
/**
* Gets called if a {@link Throwable} was thrown.
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
ChannelOutboundHandler
为出站事件设计的 ChannelOutboundHandler
。
Definition
/**
* {@link ChannelHandler} which will get notified for IO-outbound-operations.
*/
public interface ChannelOutboundHandler extends ChannelHandler {
}
Bind
将 Channel
绑定到本地地址。
/**
* Called once a bind operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the bind operation is made
* @param localAddress the {@link SocketAddress} to which it should bound
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
Connect
将 Channel
连接到远程地址。
/**
* Called once a connect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the connect operation is made
* @param remoteAddress the {@link SocketAddress} to which it should connect
* @param localAddress the {@link SocketAddress} which is used as source on connect
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a disconnect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
Close
/**
* Called once a close operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
Deregister
当 Channel
从当前的 EventLoop
解除注册时回调。
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
Read / Write
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/
void read(ChannelHandlerContext ctx) throws Exception;
/**
* Called once a write operation is made. The write operation will write the messages through the
* {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
* {@link Channel#flush()} is called
*
* @param ctx the {@link ChannelHandlerContext} for which the write operation is made
* @param msg the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
* Called once a flush operation is made. The flush operation will try to flush out all previous written messages
* that are pending.
*
* @param ctx the {@link ChannelHandlerContext} for which the flush operation is made
* @throws Exception thrown if an error occurs
*/
void flush(ChannelHandlerContext ctx) throws Exception;
ChannelHandlerAdapter
ChannelHandler
接口的抽象类实现。
Definition
/**
* Skeleton implementation of a {@link ChannelHandler}.
*/
public abstract class ChannelHandlerAdapter implements ChannelHandler {
}
Sharable
判断当前 ChannelHandler
实例是否可以被共享。通过反射机制获取 ChannelHandler
上的 @Sharable
注解,并保存到 ThreadLocal
中作为缓存。如果不可被共享,那么抛出异常。
/**
* Throws {@link IllegalStateException} if {@link ChannelHandlerAdapter#isSharable()} returns {@code true}
*/
protected void ensureNotSharable() {
if (isSharable()) {
throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared");
}
}
/**
* Return {@code true} if the implementation is {@link Sharable} and so can be added
* to different {@link ChannelPipeline}s.
*/
public boolean isSharable() {
/**
* Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
* {@link Thread}s are quite limited anyway.
*
* See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
*/
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter
ChannelInboundHandler
接口的默认实现。其中提供了接口内所有函数的默认实现:将操作传递到 ChannelPipeline
的下一个 ChannelHandler
中。子类可以继承该类,重写想要重新定制的函数。
/**
* Abstract base class for {@link ChannelInboundHandler} implementations which provide
* implementations of all of their methods.
*
* <p>
* This implementation just forward the operation to the next {@link ChannelHandler} in the
* {@link ChannelPipeline}. Sub-classes may override a method implementation to change this.
* </p>
* <p>
* Be aware that messages are not released after the {@link #channelRead(ChannelHandlerContext, Object)}
* method returns automatically. If you are looking for a {@link ChannelInboundHandler} implementation that
* releases the received messages automatically, please see {@link SimpleChannelInboundHandler}.
* </p>
*/
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
}
/**
* Skeleton implementation of a {@link ChannelOutboundHandler}. This implementation just forwards each method call via
* the {@link ChannelHandlerContext}.
*/
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
}
其默认行为即调用 ChannelHandlerContext
的相同函数实现转发,比如:
/**
* Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
SimpleChannelInboundHandler
Netty 自身实现了一个简单的抽象类示例 ChannelInboundHandlerAdapter
,用于 显式地处理指定类型的消息。
Definition
/**
* {@link ChannelInboundHandlerAdapter} which allows to explicit only handle a specific type of messages.
*
* For example here is an implementation which only handle {@link String} messages.
*
* <pre>
* public class StringHandler extends
* {@link SimpleChannelInboundHandler}<{@link String}> {
*
* {@code @Override}
* protected void channelRead0({@link ChannelHandlerContext} ctx, {@link String} message)
* throws {@link Exception} {
* System.out.println(message);
* }
* }
* </pre>
*
* Be aware that depending of the constructor parameters it will release all handled messages by passing them to
* {@link ReferenceCountUtil#release(Object)}. In this case you may need to use
* {@link ReferenceCountUtil#retain(Object)} if you pass the object to the next handler in the {@link ChannelPipeline}.
*/
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
}
Constructor
类内维护了一个用于检测消息是否属于指定类型的匹配器。该匹配器在构造函数中被初始化。
private final TypeParameterMatcher matcher;
private final boolean autoRelease;
/**
* see {@link #SimpleChannelInboundHandler(Class, boolean)} with {@code true} as boolean value.
*/
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
this(inboundMessageType, true);
}
/**
* Create a new instance
*
* @param inboundMessageType The type of messages to match
* @param autoRelease {@code true} if handled messages should be released automatically by passing them to
* {@link ReferenceCountUtil#release(Object)}.
*/
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
matcher = TypeParameterMatcher.get(inboundMessageType);
this.autoRelease = autoRelease;
}
Read
重写了读取消息的 channelRead()
函数,在函数内使用 matcher
对消息的类型进行判断。如果消息属于指定的类型,那么调用抽象函数 channelRead0()
读取指定类型的消息。该函数由子类根据指定的对象类型自行实现。
/**
* Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
* {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*/
public boolean acceptInboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
/**
* Is called for each message of type {@link I}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
* belongs to
* @param msg the message to handle
* @throws Exception is thrown if an error occurred
*/
protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;