Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

3.3 - EventExecutor

Created by : Mr Dk.

2021 / 02 / 18 19:37

Ningbo, Zhejiang, China


AbstractEventExecutor

EventExecutor 接口的第一层抽象类实现,基本上还是使用了未实现的抽象方法。抽象方法由具体的实现类来实现。

Definition

/**
 * Abstract base class for {@link EventExecutor} implementations.
 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {

}

Parent

EventExecutor 接口要实现的主要函数为 parent() 以获取 EventExecutorGroup 的引用。因此该类内保存了该引用。

private final EventExecutorGroup parent;
private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);
protected AbstractEventExecutor() {
    this(null);
}

protected AbstractEventExecutor(EventExecutorGroup parent) {
    this.parent = parent;
}

@Override
public EventExecutorGroup parent() {
    return parent;
}

@Override
public EventExecutor next() {
    return this;
}

Shutdown

定义了优雅关闭的 quiet period 和超时时间。

static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

@Override
public Future<?> shutdownGracefully() {
    return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}

AbstractScheduledEventExecutor

继承自 AbstractEventExecutor 的抽象类,定义了 支持调度执行 的抽象 EventExecutor。

Definition

/**
 * Abstract base class for {@link EventExecutor}s that want to support scheduling.
 */
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

}

Task Queue

该类内的核心是一个维护所有任务执行状态 / 结果的 ScheduledFutureTask 优先队列。

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
            SCHEDULED_FUTURE_TASK_COMPARATOR,
            // Use same initial capacity as java.util.PriorityQueue
            11);
    }
    return scheduledTaskQueue;
}

Cancel All Scheduled Tasks

以下函数对优先队列中的所有任务调用一次 cancel(false),但不中断正在执行的任务。注意,该函数只有当当前线程是 EventLoop 线程时才会继续。

/**
 * Cancel all scheduled tasks.
 *
 * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
 */
protected void cancelScheduledTasks() {
    assert inEventLoop();
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    if (isNullOrEmpty(scheduledTaskQueue)) {
        return;
    }

    final ScheduledFutureTask<?>[] scheduledTasks =
        scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);

    for (ScheduledFutureTask<?> task: scheduledTasks) {
        task.cancelWithoutRemove(false);
    }

    scheduledTaskQueue.clearIgnoringIndexes();
}

Poll Scheduled Task

从优先队列中取出下一个 (在指定纳秒数之后) 已就绪被执行的任务,从队列中移除后返回。

/**
 * @see #pollScheduledTask(long)
 */
protected final Runnable pollScheduledTask() {
    return pollScheduledTask(nanoTime());
}

/**
 * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
 * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
 */
protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();

    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
        return null;
    }
    scheduledTaskQueue.remove();
    scheduledTask.setConsumed();
    return scheduledTask;
}

final ScheduledFutureTask<?> peekScheduledTask() {
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
}

返回下一个被调度任务的开始时间和 deadline。

/**
 * Return the nanoseconds until the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
 */
protected final long nextScheduledTaskNano() {
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    return scheduledTask != null ? scheduledTask.delayNanos() : -1;
}

/**
 * Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1}
 * if no task is scheduled.
 */
protected final long nextScheduledTaskDeadlineNanos() {
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
}

返回优先队列中是否有任务可供调度。

/**
 * Returns {@code true} if a scheduled task is ready for processing.
 */
protected final boolean hasScheduledTasks() {
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
}

Schedule

多个调度函数都用到的核心调度函数。调度的本质实际上是先将任务加入到优先队列中,然后择机执行任务。下面的函数首先判断线程自身是否是 EventLoop 线程,如果是,就将任务加入优先队列中;如果不是,则调用 execute() 函数,其内部也会将任务添加到优先队列中,并启动 EventLoop 线程。

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduleFromEventLoop(task);
    } else {
        final long deadlineNanos = task.deadlineNanos();
        // task will add itself to scheduled task queue when run if not expired
        if (beforeScheduledTaskSubmitted(deadlineNanos)) {
            execute(task);
        } else {
            lazyExecute(task);
            // Second hook after scheduling to facilitate race-avoidance
            if (afterScheduledTaskSubmitted(deadlineNanos)) {
                execute(WAKEUP_TASK);
            }
        }
    }

    return task;
}

final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
    // nextTaskId a long and so there is no chance it will overflow back to 0
    scheduledTaskQueue().add(task.setId(++nextTaskId));
}

根据接口定义,多个调度函数的实现都用到了上面的核心调度函数。对于不同的调度函数,要做的额外工作是计算并检查 初始延时、延时 或 间隔 的合法性。

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(unit, "unit");
    if (delay < 0) {
        delay = 0;
    }
    validateScheduled0(delay, unit);

    return schedule(new ScheduledFutureTask<Void>(
        this,
        command,
        deadlineNanos(unit.toNanos(delay))));
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    ObjectUtil.checkNotNull(callable, "callable");
    ObjectUtil.checkNotNull(unit, "unit");
    if (delay < 0) {
        delay = 0;
    }
    validateScheduled0(delay, unit);

    return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(unit, "unit");
    if (initialDelay < 0) {
        throw new IllegalArgumentException(
            String.format("initialDelay: %d (expected: >= 0)", initialDelay));
    }
    if (period <= 0) {
        throw new IllegalArgumentException(
            String.format("period: %d (expected: > 0)", period));
    }
    validateScheduled0(initialDelay, unit);
    validateScheduled0(period, unit);

    return schedule(new ScheduledFutureTask<Void>(
        this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(unit, "unit");
    if (initialDelay < 0) {
        throw new IllegalArgumentException(
            String.format("initialDelay: %d (expected: >= 0)", initialDelay));
    }
    if (delay <= 0) {
        throw new IllegalArgumentException(
            String.format("delay: %d (expected: > 0)", delay));
    }

    validateScheduled0(initialDelay, unit);
    validateScheduled0(delay, unit);

    return schedule(new ScheduledFutureTask<Void>(
        this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}

SingleThreadEventExecutor

基于 AbstractScheduledEventExecutor 抽象类的进一步实现,也是一个抽象类。顾名思义,该类使用一个单独的线程执行所有提交的任务。

Definition

/**
 * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

}

Constructor

从最复杂的构造函数可以看出类内维护的成员变量及作用。

/**
 * Create a new instance
 *
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param executor          the {@link Executor} which will be used for executing
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
 * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
 * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
 */
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ThreadExecutorMap.apply(executor, this);
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    this.executor = ThreadExecutorMap.apply(executor, this);
    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
    this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

其中:

  • addTaskWakesUp 指示调用 addTask() 时是否唤醒执行器线程
  • maxPendingTasks 指示拒绝接收新任务前,能够容纳的最大任务数量
  • taskQueue 是保存所有等待任务的队列
  • rejectedExecutionHandler 是当任务数量饱和时,所采取的拒绝策略
private final Queue<Runnable> taskQueue;

/**
 * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
 * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
 * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
 * implementation that does not support blocking operations at all.
 */
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}

剩下的主要操作即从 taskQueue 中取出任务 (并执行),或将任务放入 taskQueue 中。

Poll

如果当前线程是 EventLoop 线程,则不断试图从任务队列中 poll() 出一个不为空任务 (非阻塞)。如果没有任务,则返回 null。

/**
 * @see Queue#poll()
 */
protected Runnable pollTask() {
    assert inEventLoop();
    return pollTaskFrom(taskQueue);
}

protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        Runnable task = taskQueue.poll();
        if (task != WAKEUP_TASK) {
            return task;
        }
    }
}

Take

同样需要执行线程是 EventLoop 线程。在一个死循环内,不断 peek() 队头。如果队头为空,那么调用 take() 阻塞在队列上,直到队列中有新任务;如果队头不为空,而任务有延时调度时间,则等待任务到期后再取出任务。

/**
 * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
 * <p>
 * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
 * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
 * </p>
 *
 * @return {@code null} if the executor thread has been interrupted or waken up.
 */
protected Runnable takeTask() {
    assert inEventLoop();
    if (!(taskQueue instanceof BlockingQueue)) {
        throw new UnsupportedOperationException();
    }

    BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
    for (;;) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            Runnable task = null;
            try {
                task = taskQueue.take();
                if (task == WAKEUP_TASK) {
                    task = null;
                }
            } catch (InterruptedException e) {
                // Ignore
            }
            return task;
        } else {
            long delayNanos = scheduledTask.delayNanos();
            Runnable task = null;
            if (delayNanos > 0) {
                try {
                    task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                } catch (InterruptedException e) {
                    // Waken up.
                    return null;
                }
            }
            if (task == null) {
                // We need to fetch the scheduled tasks now as otherwise there may be a chance that
                // scheduled tasks are never executed if there is always one task in the taskQueue.
                // This is for example true for the read task of OIO Transport
                // See https://github.com/netty/netty/issues/1614
                fetchFromScheduledTaskQueue();
                task = taskQueue.poll();
            }

            if (task != null) {
                return task;
            }
        }
    }
}

private boolean fetchFromScheduledTaskQueue() {
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) {
            return true;
        }
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}

Add

/**
 * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
 * before.
 */
protected void addTask(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    if (!offerTask(task)) {
        reject(task);
    }
}

final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}

Remove

/**
 * @see Queue#remove(Object)
 */
protected boolean removeTask(Runnable task) {
    return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
}

Run

将所有任务从任务队列中 poll() 出来,然后执行任务。需要确保当前线程是 EventLoop 线程。

/**
 * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
 *
 * @return {@code true} if and only if at least one task was run
 */
protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        fetchedAll = fetchFromScheduledTaskQueue();
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}

/**
 * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
 * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
 */
protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        safeExecute(task);

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

由于该类派生自 AbstractScheduledEventExecutor,因此,除了 taskQueue 中会被立刻执行的任务以外,还有被计划调度执行的优先队列中的任务。以下函数先执行 takeQueue 中的所有任务,然后再执行优先队列中的所有到期任务。

/**
 * Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
 * or {@code maxDrainAttempts} has been exceeded.
 * @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
 *                         continuous task execution and scheduling from preventing the EventExecutor thread to
 *                         make progress and return to the selector mechanism to process inbound I/O events.
 * @return {@code true} if at least one task was run.
 */
protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
    assert inEventLoop();
    boolean ranAtLeastOneTask;
    int drainAttempt = 0;
    do {
        // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
        // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
        ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
    } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);

    if (drainAttempt > 0) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();

    return drainAttempt > 0;
}

/**
 * What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}.
 * @param taskQueue the task queue to drain.
 * @return {@code true} if at least {@link Runnable#run()} was called.
 */
private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    int remaining = Math.min(maxPendingTasks, taskQueue.size());
    safeExecute(task);
    // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
    // silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
    while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
        safeExecute(task);
    }
    return true;
}

/**
 * @return {@code true} if at least one scheduled task was executed.
 */
private boolean executeExpiredScheduledTasks() {
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return false;
    }
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask = pollScheduledTask(nanoTime);
    if (scheduledTask == null) {
        return false;
    }
    do {
        safeExecute(scheduledTask);
    } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
    return true;
}

In Event Loop

确定当前线程是否是 EventLoop 线程。该类内维护了一个 Thread 的引用,指向 EventLoop 线程。判断的具体方式是,比较参数传入的线程是否与类内维护的线程是同一个。

private volatile Thread thread;

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

那么参数中给定的线程是哪一个呢?在该类的父类中找到了答案:

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

即,如果当前执行代码的线程与类内维护的 EventLoop 线程是同一个,那么当前就是 EventLoop 线程正在执行代码。

Shutdown Hook

类内维护了一个 Runnable 集合,集合内的回调会在关闭时被调用。该集合需要被维护,可以向其中加入回调,也可以从其中移除回调。

private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();

从下面的代码中,可以很清晰地看出:

  • 如果当前线程是 EventLoop 线程,那么由它负责将回调加入到集合中
  • 如果当前线程不是 EventLoop 线程,那么将加入回调的逻辑作为一个任务添加到任务队列中,之后由 EventLoop 线程真正添加到队列中
/**
 * Add a {@link Runnable} which will be executed on shutdown of this instance
 */
public void addShutdownHook(final Runnable task) {
    if (inEventLoop()) {
        shutdownHooks.add(task);
    } else {
        execute(new Runnable() {
            @Override
            public void run() {
                shutdownHooks.add(task);
            }
        });
    }
}

/**
 * Remove a previous added {@link Runnable} as a shutdown hook
 */
public void removeShutdownHook(final Runnable task) {
    if (inEventLoop()) {
        shutdownHooks.remove(task);
    } else {
        execute(new Runnable() {
            @Override
            public void run() {
                shutdownHooks.remove(task);
            }
        });
    }
}

private boolean runShutdownHooks() {
    boolean ran = false;
    // Note shutdown hooks can add / remove shutdown hooks.
    while (!shutdownHooks.isEmpty()) {
        List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
        shutdownHooks.clear();
        for (Runnable task: copy) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("Shutdown hook raised an exception.", t);
            } finally {
                ran = true;
            }
        }
    }

    if (ran) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }

    return ran;
}

Execute

将任务加入到执行队列中。如果 EventLoop 线程还没被设置,则先启动该线程。

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

Start Up

类内维护了一个变量来表示 EventLoop 线程的状态,并通过原子更新来维护它。

private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;

private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

private volatile int state = ST_NOT_STARTED;

线程的启动本身也作为一个任务添加到执行队列中。该任务会被当前执行这个任务的线程设置为 EventLoop 线程。

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                        SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    if (logger.isErrorEnabled()) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                     "be called before run() implementation terminates.");
                    }
                }

                try {
                    // Run all remaining tasks and shutdown hooks. At this point the event loop
                    // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                    // graceful shutdown with quietPeriod.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }

                    // Now we want to make sure no more tasks can be added from this point. This is
                    // achieved by switching the state. Any new tasks beyond this point will be rejected.
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                            break;
                        }
                    }

                    // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                    // No need to loop here, this is the final pass.
                    confirmShutdown();
                } finally {
                    try {
                        cleanup();
                    } finally {
                        // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                        // the future. The user may block on the future and once it unblocks the JVM may terminate
                        // and start unloading classes.
                        // See https://github.com/netty/netty/issues/6596.
                        FastThreadLocal.removeAll();

                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.countDown();
                        int numUserTasks = drainTasks();
                        if (numUserTasks > 0 && logger.isWarnEnabled()) {
                            logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                        }
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}
Edit this page on GitHub
Prev
3.1 - EventLoopGroup
Next
3.4 - EventLoop