Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

4.4 - ChannelPipeline

Created by : Mr Dk.

2021 / 02 / 21 01:12 🌙

Ningbo, Zhejiang, China


ChannelPipeline

ChannelPipeline 维护了拦截并处理一个 Channel 的所有入站、出站事件的 ChannelHandler 列表,实现了加强版的 拦截过滤模式,给用户完全自由的权限决定事件如何被处理,ChannelHandler 之间如何互相交互。

每个 Channel 都有着自己的 ChannelPipeline,当一个新的 Channel 被创建时,ChannelPipeline 也被创建。I/O 事件要么被 ChannelInboundHandler 处理,要么被 ChannelOutboundHandler 处理,然后通过调用 ChannelHandlerContext 中的传播函数被转发到最近的下一个 ChannelHandler 中。通常来说,入站数据一般来自对远程数据的输入 (比如 SocketChannel 的 read()),数据一般是通过产生或变换后形成出站数据的写请求,最终由 Channel 对应的 I/O 线程写出。ChannelPipeline 中应当至少有一个或更多的 ChannelHandler 来接收 I/O 事件 (读取) 以及请求 I/O 操作 (写入 / 关闭)。

入站的事件传播函数包含:

  • fireChannelRegistered()
  • fireChannelActive()
  • fireChannelRead(Object)
  • fireChannelReadComplete()
  • fireExceptionCaught(Throwable)
  • fireUserEventTriggered(Object)
  • fireChannelWritabilityChanged()
  • fireChannelInactive()
  • fireChannelUnregistered()

出站的事件传播函数包含:

  • bind(SocketAddress, ChannelPromise)
  • connect(SocketAddress, SocketAddress, ChannelPromise)
  • write(Object, ChannelPromise)
  • flush()
  • read()
  • disconnect(ChannelPromise)
  • close(ChannelPromise)
  • deregister(ChannelPromise)

也就是说,调用以上函数,会使事件沿 ChannelPipeline 继续传播下去。

Definition

/**
 * A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a
 * {@link Channel}.  {@link ChannelPipeline} implements an advanced form of the
 * <a href="https://www.oracle.com/technetwork/java/interceptingfilter-142169.html">Intercepting Filter</a> pattern
 * to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline
 * interact with each other.
 *
 * <h3>Creation of a pipeline</h3>
 *
 * Each channel has its own pipeline and it is created automatically when a new channel is created.
 *
 * <h3>How an event flows in a pipeline</h3>
 *
 * The following diagram describes how I/O events are processed by {@link ChannelHandler}s in a {@link ChannelPipeline}
 * typically. An I/O event is handled by either a {@link ChannelInboundHandler} or a {@link ChannelOutboundHandler}
 * and be forwarded to its closest handler by calling the event propagation methods defined in
 * {@link ChannelHandlerContext}, such as {@link ChannelHandlerContext#fireChannelRead(Object)} and
 * {@link ChannelHandlerContext#write(Object)}.
 *
 * <pre>
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+
 * </pre>
 * An inbound event is handled by the inbound handlers in the bottom-up direction as shown on the left side of the
 * diagram.  An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the
 * diagram.  The inbound data is often read from a remote peer via the actual input operation such as
 * {@link SocketChannel#read(ByteBuffer)}.  If an inbound event goes beyond the top inbound handler, it is discarded
 * silently, or logged if it needs your attention.
 * <p>
 * An outbound event is handled by the outbound handler in the top-down direction as shown on the right side of the
 * diagram.  An outbound handler usually generates or transforms the outbound traffic such as write requests.
 * If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the
 * {@link Channel}. The I/O thread often performs the actual output operation such as
 * {@link SocketChannel#write(ByteBuffer)}.
 * <p>
 * For example, let us assume that we created the following pipeline:
 * <pre>
 * {@link ChannelPipeline} p = ...;
 * p.addLast("1", new InboundHandlerA());
 * p.addLast("2", new InboundHandlerB());
 * p.addLast("3", new OutboundHandlerA());
 * p.addLast("4", new OutboundHandlerB());
 * p.addLast("5", new InboundOutboundHandlerX());
 * </pre>
 * In the example above, the class whose name starts with {@code Inbound} means it is an inbound handler.
 * The class whose name starts with {@code Outbound} means it is a outbound handler.
 * <p>
 * In the given example configuration, the handler evaluation order is 1, 2, 3, 4, 5 when an event goes inbound.
 * When an event goes outbound, the order is 5, 4, 3, 2, 1.  On top of this principle, {@link ChannelPipeline} skips
 * the evaluation of certain handlers to shorten the stack depth:
 * <ul>
 * <li>3 and 4 don't implement {@link ChannelInboundHandler}, and therefore the actual evaluation order of an inbound
 *     event will be: 1, 2, and 5.</li>
 * <li>1 and 2 don't implement {@link ChannelOutboundHandler}, and therefore the actual evaluation order of a
 *     outbound event will be: 5, 4, and 3.</li>
 * <li>If 5 implements both {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}, the evaluation order of
 *     an inbound and a outbound event could be 125 and 543 respectively.</li>
 * </ul>
 *
 * <h3>Forwarding an event to the next handler</h3>
 *
 * As you might noticed in the diagram shows, a handler has to invoke the event propagation methods in
 * {@link ChannelHandlerContext} to forward an event to its next handler.  Those methods include:
 * <ul>
 * <li>Inbound event propagation methods:
 *     <ul>
 *     <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelActive()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li>
 *     <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li>
 *     <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelInactive()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li>
 *     </ul>
 * </li>
 * <li>Outbound event propagation methods:
 *     <ul>
 *     <li>{@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#write(Object, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#flush()}</li>
 *     <li>{@link ChannelHandlerContext#read()}</li>
 *     <li>{@link ChannelHandlerContext#disconnect(ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#close(ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#deregister(ChannelPromise)}</li>
 *     </ul>
 * </li>
 * </ul>
 *
 * and the following example shows how the event propagation is usually done:
 *
 * <pre>
 * public class MyInboundHandler extends {@link ChannelInboundHandlerAdapter} {
 *     {@code @Override}
 *     public void channelActive({@link ChannelHandlerContext} ctx) {
 *         System.out.println("Connected!");
 *         ctx.fireChannelActive();
 *     }
 * }
 *
 * public class MyOutboundHandler extends {@link ChannelOutboundHandlerAdapter} {
 *     {@code @Override}
 *     public void close({@link ChannelHandlerContext} ctx, {@link ChannelPromise} promise) {
 *         System.out.println("Closing ..");
 *         ctx.close(promise);
 *     }
 * }
 * </pre>
 *
 * <h3>Building a pipeline</h3>
 * <p>
 * A user is supposed to have one or more {@link ChannelHandler}s in a pipeline to receive I/O events (e.g. read) and
 * to request I/O operations (e.g. write and close).  For example, a typical server will have the following handlers
 * in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the
 * protocol and business logic:
 *
 * <ol>
 * <li>Protocol Decoder - translates binary data (e.g. {@link ByteBuf}) into a Java object.</li>
 * <li>Protocol Encoder - translates a Java object into binary data.</li>
 * <li>Business Logic Handler - performs the actual business logic (e.g. database access).</li>
 * </ol>
 *
 * and it could be represented as shown in the following example:
 *
 * <pre>
 * static final {@link EventExecutorGroup} group = new {@link DefaultEventExecutorGroup}(16);
 * ...
 *
 * {@link ChannelPipeline} pipeline = ch.pipeline();
 *
 * pipeline.addLast("decoder", new MyProtocolDecoder());
 * pipeline.addLast("encoder", new MyProtocolEncoder());
 *
 * // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
 * // in a different thread than an I/O thread so that the I/O thread is not blocked by
 * // a time-consuming task.
 * // If your business logic is fully asynchronous or finished very quickly, you don't
 * // need to specify a group.
 * pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
 * </pre>
 *
 * Be aware that while using {@link DefaultEventLoopGroup} will offload the operation from the {@link EventLoop} it will
 * still process tasks in a serial fashion per {@link ChannelHandlerContext} and so guarantee ordering. Due the ordering
 * it may still become a bottle-neck. If ordering is not a requirement for your use-case you may want to consider using
 * {@link UnorderedThreadPoolEventExecutor} to maximize the parallelism of the task execution.
 *
 * <h3>Thread safety</h3>
 * <p>
 * A {@link ChannelHandler} can be added or removed at any time because a {@link ChannelPipeline} is thread safe.
 * For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it
 * after the exchange.
 */
public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

}

Add

将 ChannelHandler 添加到 ChannelPipeline 上。可选添加参数以指定执行 ChannelHandler 的 EventExecutorGroup。

/**
 * Inserts a {@link ChannelHandler} at the first position of this pipeline.
 *
 * @param name     the name of the handler to insert first
 * @param handler  the handler to insert first
 *
 * @throws IllegalArgumentException
 *         if there's an entry with the same name already in the pipeline
 * @throws NullPointerException
 *         if the specified handler is {@code null}
 */
ChannelPipeline addFirst(String name, ChannelHandler handler);

/**
 * Inserts a {@link ChannelHandler} at the first position of this pipeline.
 *
 * @param group    the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
 *                 methods
 * @param name     the name of the handler to insert first
 * @param handler  the handler to insert first
 *
 * @throws IllegalArgumentException
 *         if there's an entry with the same name already in the pipeline
 * @throws NullPointerException
 *         if the specified handler is {@code null}
 */
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);

/**
 * Appends a {@link ChannelHandler} at the last position of this pipeline.
 *
 * @param name     the name of the handler to append
 * @param handler  the handler to append
 *
 * @throws IllegalArgumentException
 *         if there's an entry with the same name already in the pipeline
 * @throws NullPointerException
 *         if the specified handler is {@code null}
 */
ChannelPipeline addLast(String name, ChannelHandler handler);

/**
 * Appends a {@link ChannelHandler} at the last position of this pipeline.
 *
 * @param group    the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
 *                 methods
 * @param name     the name of the handler to append
 * @param handler  the handler to append
 *
 * @throws IllegalArgumentException
 *         if there's an entry with the same name already in the pipeline
 * @throws NullPointerException
 *         if the specified handler is {@code null}
 */
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);

/**
 * Inserts a {@link ChannelHandler} before an existing handler of this
 * pipeline.
 *
 * @param baseName  the name of the existing handler
 * @param name      the name of the handler to insert before
 * @param handler   the handler to insert before
 *
 * @throws NoSuchElementException
 *         if there's no such entry with the specified {@code baseName}
 * @throws IllegalArgumentException
 *         if there's an entry with the same name already in the pipeline
 * @throws NullPointerException
 *         if the specified baseName or handler is {@code null}
 */
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

/**
 * Inserts a {@link ChannelHandler} before an existing handler of this
 * pipeline.
 *
 * @param group     the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
 *                  methods
 * @param baseName  the name of the existing handler
 * @param name      the name of the handler to insert before
 * @param handler   the handler to insert before
 *
 * @throws NoSuchElementException
 *         if there's no such entry with the specified {@code baseName}
 * @throws IllegalArgumentException
 *         if there's an entry with the same name already in the pipeline
 * @throws NullPointerException
 *         if the specified baseName or handler is {@code null}
 */
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

/**
 * Inserts a {@link ChannelHandler} after an existing handler of this
 * pipeline.
 *
 * @param baseName  the name of the existing handler
 * @param name      the name of the handler to insert after
 * @param handler   the handler to insert after
 *
 * @throws NoSuchElementException
 *         if there's no such entry with the specified {@code baseName}
 * @throws IllegalArgumentException
 *         if there's an entry with the same name already in the pipeline
 * @throws NullPointerException
 *         if the specified baseName or handler is {@code null}
 */
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

/**
 * Inserts a {@link ChannelHandler} after an existing handler of this
 * pipeline.
 *
 * @param group     the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
 *                  methods
 * @param baseName  the name of the existing handler
 * @param name      the name of the handler to insert after
 * @param handler   the handler to insert after
 *
 * @throws NoSuchElementException
 *         if there's no such entry with the specified {@code baseName}
 * @throws IllegalArgumentException
 *         if there's an entry with the same name already in the pipeline
 * @throws NullPointerException
 *         if the specified baseName or handler is {@code null}
 */
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

/**
 * Inserts {@link ChannelHandler}s at the first position of this pipeline.
 *
 * @param handlers  the handlers to insert first
 *
 */
ChannelPipeline addFirst(ChannelHandler... handlers);

/**
 * Inserts {@link ChannelHandler}s at the first position of this pipeline.
 *
 * @param group     the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}s
 *                  methods.
 * @param handlers  the handlers to insert first
 *
 */
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);

/**
 * Inserts {@link ChannelHandler}s at the last position of this pipeline.
 *
 * @param handlers  the handlers to insert last
 *
 */
ChannelPipeline addLast(ChannelHandler... handlers);

/**
 * Inserts {@link ChannelHandler}s at the last position of this pipeline.
 *
 * @param group     the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}s
 *                  methods.
 * @param handlers  the handlers to insert last
 *
 */
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

Remove

将 ChannelHandler 从 ChannelPipeline 中移除。

/**
 * Removes the specified {@link ChannelHandler} from this pipeline.
 *
 * @param  handler          the {@link ChannelHandler} to remove
 *
 * @throws NoSuchElementException
 *         if there's no such handler in this pipeline
 * @throws NullPointerException
 *         if the specified handler is {@code null}
 */
ChannelPipeline remove(ChannelHandler handler);

/**
 * Removes the {@link ChannelHandler} with the specified name from this pipeline.
 *
 * @param  name             the name under which the {@link ChannelHandler} was stored.
 *
 * @return the removed handler
 *
 * @throws NoSuchElementException
 *         if there's no such handler with the specified name in this pipeline
 * @throws NullPointerException
 *         if the specified name is {@code null}
 */
ChannelHandler remove(String name);

/**
 * Removes the {@link ChannelHandler} of the specified type from this pipeline.
 *
 * @param <T>           the type of the handler
 * @param handlerType   the type of the handler
 *
 * @return the removed handler
 *
 * @throws NoSuchElementException
 *         if there's no such handler of the specified type in this pipeline
 * @throws NullPointerException
 *         if the specified handler type is {@code null}
 */
<T extends ChannelHandler> T remove(Class<T> handlerType);

/**
 * Removes the first {@link ChannelHandler} in this pipeline.
 *
 * @return the removed handler
 *
 * @throws NoSuchElementException
 *         if this pipeline is empty
 */
ChannelHandler removeFirst();

/**
 * Removes the last {@link ChannelHandler} in this pipeline.
 *
 * @return the removed handler
 *
 * @throws NoSuchElementException
 *         if this pipeline is empty
 */
ChannelHandler removeLast();

Replace

将 ChannelPipeline 中的 ChannelHandler 替换为另一个。

/**
 * Replaces the specified {@link ChannelHandler} with a new handler in this pipeline.
 *
 * @param  oldHandler    the {@link ChannelHandler} to be replaced
 * @param  newName       the name under which the replacement should be added
 * @param  newHandler    the {@link ChannelHandler} which is used as replacement
 *
 * @return itself

 * @throws NoSuchElementException
 *         if the specified old handler does not exist in this pipeline
 * @throws IllegalArgumentException
 *         if a handler with the specified new name already exists in this
 *         pipeline, except for the handler to be replaced
 * @throws NullPointerException
 *         if the specified old handler or new handler is
 *         {@code null}
 */
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);

/**
 * Replaces the {@link ChannelHandler} of the specified name with a new handler in this pipeline.
 *
 * @param  oldName       the name of the {@link ChannelHandler} to be replaced
 * @param  newName       the name under which the replacement should be added
 * @param  newHandler    the {@link ChannelHandler} which is used as replacement
 *
 * @return the removed handler
 *
 * @throws NoSuchElementException
 *         if the handler with the specified old name does not exist in this pipeline
 * @throws IllegalArgumentException
 *         if a handler with the specified new name already exists in this
 *         pipeline, except for the handler to be replaced
 * @throws NullPointerException
 *         if the specified old handler or new handler is
 *         {@code null}
 */
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

/**
 * Replaces the {@link ChannelHandler} of the specified type with a new handler in this pipeline.
 *
 * @param  oldHandlerType   the type of the handler to be removed
 * @param  newName          the name under which the replacement should be added
 * @param  newHandler       the {@link ChannelHandler} which is used as replacement
 *
 * @return the removed handler
 *
 * @throws NoSuchElementException
 *         if the handler of the specified old handler type does not exist
 *         in this pipeline
 * @throws IllegalArgumentException
 *         if a handler with the specified new name already exists in this
 *         pipeline, except for the handler to be replaced
 * @throws NullPointerException
 *         if the specified old handler or new handler is
 *         {@code null}
 */
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                     ChannelHandler newHandler);

Get

从 ChannelPipeline 中取得 ChannelHandler (ChannelHandlerContext)。

/**
 * Returns the first {@link ChannelHandler} in this pipeline.
 *
 * @return the first handler.  {@code null} if this pipeline is empty.
 */
ChannelHandler first();

/**
 * Returns the context of the first {@link ChannelHandler} in this pipeline.
 *
 * @return the context of the first handler.  {@code null} if this pipeline is empty.
 */
ChannelHandlerContext firstContext();

/**
 * Returns the last {@link ChannelHandler} in this pipeline.
 *
 * @return the last handler.  {@code null} if this pipeline is empty.
 */
ChannelHandler last();

/**
 * Returns the context of the last {@link ChannelHandler} in this pipeline.
 *
 * @return the context of the last handler.  {@code null} if this pipeline is empty.
 */
ChannelHandlerContext lastContext();

/**
 * Returns the {@link ChannelHandler} with the specified name in this
 * pipeline.
 *
 * @return the handler with the specified name.
 *         {@code null} if there's no such handler in this pipeline.
 */
ChannelHandler get(String name);

/**
 * Returns the {@link ChannelHandler} of the specified type in this
 * pipeline.
 *
 * @return the handler of the specified handler type.
 *         {@code null} if there's no such handler in this pipeline.
 */
<T extends ChannelHandler> T get(Class<T> handlerType);

/**
 * Returns the context object of the specified {@link ChannelHandler} in
 * this pipeline.
 *
 * @return the context object of the specified handler.
 *         {@code null} if there's no such handler in this pipeline.
 */
ChannelHandlerContext context(ChannelHandler handler);

/**
 * Returns the context object of the {@link ChannelHandler} with the
 * specified name in this pipeline.
 *
 * @return the context object of the handler with the specified name.
 *         {@code null} if there's no such handler in this pipeline.
 */
ChannelHandlerContext context(String name);

/**
 * Returns the context object of the {@link ChannelHandler} of the
 * specified type in this pipeline.
 *
 * @return the context object of the handler of the specified type.
 *         {@code null} if there's no such handler in this pipeline.
 */
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);

DefaultChannelPipeline

ChannelPipeline 接口的默认实现。

Definition

/**
 * The default {@link ChannelPipeline} implementation.  It is usually created
 * by a {@link Channel} implementation when the {@link Channel} is created.
 */
public class DefaultChannelPipeline implements ChannelPipeline {

}

Linked List

类内维护了 ChannelHandler 链表的头尾指针。

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

在类的构造函数中,分别用当前对象 (ChannelPipeline) 构造了一个头结点和一个尾结点,并使这两个结点互相连接形成双向循环链表。

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

New Context

由于用户会将 ChannelHandler 作为参数添加到 ChannelPipeline 中,而实际上是 ChannelHandlerContext 被添加到了 ChannelPipeline 中。因此,构造 ChannelHandlerContext 的工作将在该类中完成。由于 ChannelHandlerContext 的实例化需要提供一个 EventExecutor,而 ChannelPipeline 的 API 赋予用户决定 EventExecutor 来自哪一个 EventExecutorGroup 的自由 - 所以这里还会调用 EventExecutorGroup 的 next() 函数选择一个 EventExecutor,用于执行相应的 ChannelHandler。

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
    if (pinEventExecutor != null && !pinEventExecutor) {
        return group.next();
    }
    Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
    if (childExecutors == null) {
        // Use size of 4 as most people only use one extra EventExecutor.
        childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
    }
    // Pin one of the child executors once and remember it so that the same child executor
    // is used to fire events for the same channel.
    EventExecutor childExecutor = childExecutors.get(group);
    if (childExecutor == null) {
        childExecutor = group.next();
        childExecutors.put(group, childExecutor);
    }
    return childExecutor;
}

Add

将 ChannelHandler 添加到 ChannelPipeline 中。根据传入的参数实例化 ChannelHandlerContext,然后通过修改链表头尾指针,将 ChannelHandlerContext 添加到 ChannelPipeline 中,然后调用 ChannelHandlerContext 的生命周期函数。在操作时,对当前对象进行了 synchronized 操作,因此 ChannelPipeline 是线程安全的。以 addFirst() 为例:

@Override
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
    return addFirst(null, name, handler);
}

@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        name = filterName(name, handler);

        newCtx = newContext(group, name, handler);

        addFirst0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

private void addFirst0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext nextCtx = head.next;
    newCtx.prev = head;
    newCtx.next = nextCtx;
    head.next = newCtx;
    nextCtx.prev = newCtx;
}

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    assert !registered;

    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) {
        pendingHandlerCallbackHead = task;
    } else {
        // Find the tail of the linked-list.
        while (pending.next != null) {
            pending = pending.next;
        }
        pending.next = task;
    }
}

private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
    newCtx.setAddPending();
    executor.execute(new Runnable() {
        @Override
        public void run() {
            callHandlerAdded0(newCtx);
        }
    });
}

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.callHandlerAdded();
    } catch (Throwable t) {
        boolean removed = false;
        try {
            atomicRemoveFromHandlerList(ctx);
            ctx.callHandlerRemoved();
            removed = true;
        } catch (Throwable t2) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to remove a handler: " + ctx.name(), t2);
            }
        }

        if (removed) {
            fireExceptionCaught(new ChannelPipelineException(
                ctx.handler().getClass().getName() +
                ".handlerAdded() has thrown an exception; removed.", t));
        } else {
            fireExceptionCaught(new ChannelPipelineException(
                ctx.handler().getClass().getName() +
                ".handlerAdded() has thrown an exception; also failed to remove.", t));
        }
    }
}

Remove

从 ChannelPipeline 中移除 ChannelHandler 也是类似。找到要移除的 ChannelHandlerContext,然后修改头尾指针。以 removeFirst() 为例:

@Override
public final ChannelHandler removeFirst() {
    if (head.next == tail) {
        throw new NoSuchElementException();
    }
    return remove(head.next).handler();
}

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        atomicRemoveFromHandlerList(ctx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we remove the context from the pipeline and add a task that will call
        // ChannelHandler.handlerRemoved(...) once the channel is registered.
        if (!registered) {
            callHandlerCallbackLater(ctx, false);
            return ctx;
        }

        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx;
        }
    }
    callHandlerRemoved0(ctx);
    return ctx;
}

/**
 * Method is synchronized to make the handler removal from the double linked list atomic.
 */
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next;
    next.prev = prev;
}

Replace

替换操作是 add() 和 remove() 操作的综合。要为新的 ChannelHandler 实例化 ChannelHandlerContext,还要将旧的 ChannelHandlerContext 从链表中移除。最终还需要调用新旧两个 ChannelHandlerContext 的生命周期函数。

@Override
public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
    replace(getContextOrDie(oldHandler), newName, newHandler);
    return this;
}

@Override
public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
    return replace(getContextOrDie(oldName), newName, newHandler);
}

@Override
@SuppressWarnings("unchecked")
public final <T extends ChannelHandler> T replace(
    Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
    return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
}

private ChannelHandler replace(
    final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
    assert ctx != head && ctx != tail;

    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(newHandler);
        if (newName == null) {
            newName = generateName(newHandler);
        } else {
            boolean sameName = ctx.name().equals(newName);
            if (!sameName) {
                checkDuplicateName(newName);
            }
        }

        newCtx = newContext(ctx.executor, newName, newHandler);

        replace0(ctx, newCtx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we replace the context in the pipeline
        // and add a task that will call ChannelHandler.handlerAdded(...) and
        // ChannelHandler.handlerRemoved(...) once the channel is registered.
        if (!registered) {
            callHandlerCallbackLater(newCtx, true);
            callHandlerCallbackLater(ctx, false);
            return ctx.handler();
        }
        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
                    // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
                    // those event handlers must be called after handlerAdded().
                    callHandlerAdded0(newCtx);
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx.handler();
        }
    }
    // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
    // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
    // event handlers must be called after handlerAdded().
    callHandlerAdded0(newCtx);
    callHandlerRemoved0(ctx);
    return ctx.handler();
}

private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = oldCtx.prev;
    AbstractChannelHandlerContext next = oldCtx.next;
    newCtx.prev = prev;
    newCtx.next = next;

    // Finish the replacement of oldCtx with newCtx in the linked list.
    // Note that this doesn't mean events will be sent to the new handler immediately
    // because we are currently at the event handler thread and no more than one handler methods can be invoked
    // at the same time (we ensured that in replace().)
    prev.next = newCtx;
    next.prev = newCtx;

    // update the reference to the replacement so forward of buffered content will work correctly
    oldCtx.prev = newCtx;
    oldCtx.next = newCtx;
}

Get

根据头尾指针 (或参数内提供的 ChannelHandler 名称),返回 ChannelPipeline 中相应位置的 ChannelHandlerContext。

@Override
public final ChannelHandler first() {
    ChannelHandlerContext first = firstContext();
    if (first == null) {
        return null;
    }
    return first.handler();
}

@Override
public final ChannelHandlerContext firstContext() {
    AbstractChannelHandlerContext first = head.next;
    if (first == tail) {
        return null;
    }
    return head.next;
}

@Override
public final ChannelHandler last() {
    AbstractChannelHandlerContext last = tail.prev;
    if (last == head) {
        return null;
    }
    return last.handler();
}

@Override
public final ChannelHandlerContext lastContext() {
    AbstractChannelHandlerContext last = tail.prev;
    if (last == head) {
        return null;
    }
    return last;
}

@Override
public final ChannelHandler get(String name) {
    ChannelHandlerContext ctx = context(name);
    if (ctx == null) {
        return null;
    } else {
        return ctx.handler();
    }
}

@SuppressWarnings("unchecked")
@Override
public final <T extends ChannelHandler> T get(Class<T> handlerType) {
    ChannelHandlerContext ctx = context(handlerType);
    if (ctx == null) {
        return null;
    } else {
        return (T) ctx.handler();
    }
}

@Override
public final ChannelHandlerContext context(String name) {
    return context0(ObjectUtil.checkNotNull(name, "name"));
}

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    ObjectUtil.checkNotNull(handler, "handler");

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}

@Override
public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
    ObjectUtil.checkNotNull(handlerType, "handlerType");

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {
        if (ctx == null) {
            return null;
        }
        if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
            return ctx;
        }
        ctx = ctx.next;
    }
}

Propagation Function

从第一个 ChannelHandlerContext (头结点) 开始触发入站事件的传播,或从最后一个 ChannelHandlerContext (尾结点) 开始触发出站事件的传播。

@Override
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelUnregistered() {
    AbstractChannelHandlerContext.invokeChannelUnregistered(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelInactive() {
    AbstractChannelHandlerContext.invokeChannelInactive(head);
    return this;
}

@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
    AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
    return this;
}

@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
    AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
    return this;
}

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

@Override
public final ChannelPipeline fireChannelReadComplete() {
    AbstractChannelHandlerContext.invokeChannelReadComplete(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
    AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
    return this;
}
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
    return tail.bind(localAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
    return tail.connect(remoteAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return tail.connect(remoteAddress, localAddress);
}

@Override
public final ChannelFuture disconnect() {
    return tail.disconnect();
}

@Override
public final ChannelFuture close() {
    return tail.close();
}

@Override
public final ChannelFuture deregister() {
    return tail.deregister();
}

@Override
public final ChannelPipeline flush() {
    tail.flush();
    return this;
}

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

@Override
public final ChannelFuture connect(
    SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, localAddress, promise);
}

@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
    return tail.disconnect(promise);
}

@Override
public final ChannelFuture close(ChannelPromise promise) {
    return tail.close(promise);
}

@Override
public final ChannelFuture deregister(final ChannelPromise promise) {
    return tail.deregister(promise);
}

@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}

@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
    return tail.write(msg, promise);
}

@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return tail.writeAndFlush(msg, promise);
}

@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}
Edit this page on GitHub
Prev
4.3 - ChannelHandlerContext
Next
5.1 - Future