Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

3.4 - EventLoop

Created by : Mr Dk.

2021 / 02 / 18 23:01

Ningbo, Zhejiang, China


AbstractEventLoop

继承自 AbstractEventExecutor 的抽象类,没啥花头。

Definition

/**
 * Skeletal implementation of {@link EventLoop}.
 */
public abstract class AbstractEventLoop extends AbstractEventExecutor implements EventLoop {

    protected AbstractEventLoop() { }

    protected AbstractEventLoop(EventLoopGroup parent) {
        super(parent);
    }

    @Override
    public EventLoopGroup parent() {
        return (EventLoopGroup) super.parent();
    }

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
}

SingleThreadEventLoop

继承自 SingleThreadEventExecutor 的抽象类,实现了单个线程的 EventLoop。

Definition

/**
 * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

}

Tail Tasks

在该类中维护了一个队列,用于保存在当前 EventLoop 迭代或下一次 EventLoop 迭代时想要被运行一次的任务。

private final Queue<Runnable> tailTasks;

/**
 * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
 *
 * @param task to be added.
 */
@UnstableApi
public final void executeAfterEventLoopIteration(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    if (isShutdown()) {
        reject();
    }

    if (!tailTasks.offer(task)) {
        reject(task);
    }

    if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) {
        wakeup(inEventLoop());
    }
}

/**
 * Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
 *
 * @param task to be removed.
 *
 * @return {@code true} if the task was removed as a result of this call.
 */
@UnstableApi
final boolean removeAfterEventLoopIterationTask(Runnable task) {
    return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}

@Override
protected void afterRunningAllTasks() {
    runAllTasksFrom(tailTasks);
}

ThreadPerChannelEventLoop

适用于 OIO 场景的 EventLoop,一个 ThreadPerChannelEventLoop 服务一个 Channel。

Definition

/**
 * {@link SingleThreadEventLoop} which is used to handle OIO {@link Channel}'s. So in general there will be
 * one {@link ThreadPerChannelEventLoop} per {@link Channel}.
 *
 * @deprecated this will be remove in the next-major release.
 */
@Deprecated
public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {

}

Register

将 Channel 注册到当前 EventLoop 上绑定。在类内持有对 Channel 的引用。

private Channel ch;

@Override
public ChannelFuture register(ChannelPromise promise) {
    return super.register(promise).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                ch = future.channel();
            } else {
                deregister();
            }
        }
    });
}

@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
    return super.register(channel, promise).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                ch = future.channel();
            } else {
                deregister();
            }
        }
    });
}

Run

该类实现了 SingleThreadEventExecutor 中定义的抽象函数 run(),即 EventLoop 的主循环:

  • 执行任务队列中的任务
  • 处理 Channel 关闭
  • 处理 Channel 的解除注册
@Override
protected void run() {
    for (;;) {
        Runnable task = takeTask();
        if (task != null) {
            task.run();
            updateLastExecutionTime();
        }

        Channel ch = this.ch;
        if (isShuttingDown()) {
            if (ch != null) {
                ch.unsafe().close(ch.unsafe().voidPromise());
            }
            if (confirmShutdown()) {
                break;
            }
        } else {
            if (ch != null) {
                // Handle deregistration
                if (!ch.isRegistered()) {
                    runAllTasks();
                    deregister();
                }
            }
        }
    }
}

DefaultEventLoop

默认的单线程 EventLoop 实现。

Definition

public class DefaultEventLoop extends SingleThreadEventLoop {

}

Run

实现了 SingleThreadEventExecutor 中定义的抽象函数 run(),即 EventLoop 的主循环:

@Override
protected void run() {
    for (;;) {
        Runnable task = takeTask();
        if (task != null) {
            task.run();
            updateLastExecutionTime();
        }

        if (confirmShutdown()) {
            break;
        }
    }
}
Edit this page on GitHub
Prev
3.3 - EventExecutor
Next
4 - Channel Concept