3.3 - EventExecutor
Created by : Mr Dk.
2021 / 02 / 18 19:37
Ningbo, Zhejiang, China
AbstractEventExecutor
EventExecutor
接口的第一层抽象类实现,基本上还是使用了未实现的抽象方法。抽象方法由具体的实现类来实现。
Definition
/**
* Abstract base class for {@link EventExecutor} implementations.
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
}
Parent
EventExecutor
接口要实现的主要函数为 parent()
以获取 EventExecutorGroup
的引用。因此该类内保存了该引用。
private final EventExecutorGroup parent;
private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);
protected AbstractEventExecutor() {
this(null);
}
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
@Override
public EventExecutorGroup parent() {
return parent;
}
@Override
public EventExecutor next() {
return this;
}
Shutdown
定义了优雅关闭的 quiet period
和超时时间。
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
AbstractScheduledEventExecutor
继承自 AbstractEventExecutor
的抽象类,定义了 支持调度执行 的抽象 EventExecutor
。
Definition
/**
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
*/
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
}
Task Queue
该类内的核心是一个维护所有任务执行状态 / 结果的 ScheduledFutureTask
优先队列。
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}
Cancel All Scheduled Tasks
以下函数对优先队列中的所有任务调用一次 cancel(false)
,但不中断正在执行的任务。注意,该函数只有当当前线程是 EventLoop
线程时才会继续。
/**
* Cancel all scheduled tasks.
*
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected void cancelScheduledTasks() {
assert inEventLoop();
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (isNullOrEmpty(scheduledTaskQueue)) {
return;
}
final ScheduledFutureTask<?>[] scheduledTasks =
scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
for (ScheduledFutureTask<?> task: scheduledTasks) {
task.cancelWithoutRemove(false);
}
scheduledTaskQueue.clearIgnoringIndexes();
}
Poll Scheduled Task
从优先队列中取出下一个 (在指定纳秒数之后) 已就绪被执行的任务,从队列中移除后返回。
/**
* @see #pollScheduledTask(long)
*/
protected final Runnable pollScheduledTask() {
return pollScheduledTask(nanoTime());
}
/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
*/
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask;
}
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
}
返回下一个被调度任务的开始时间和 deadline。
/**
* Return the nanoseconds until the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
*/
protected final long nextScheduledTaskNano() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? scheduledTask.delayNanos() : -1;
}
/**
* Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1}
* if no task is scheduled.
*/
protected final long nextScheduledTaskDeadlineNanos() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
}
返回优先队列中是否有任务可供调度。
/**
* Returns {@code true} if a scheduled task is ready for processing.
*/
protected final boolean hasScheduledTasks() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
}
Schedule
多个调度函数都用到的核心调度函数。调度的本质实际上是先将任务加入到优先队列中,然后择机执行任务。下面的函数首先判断线程自身是否是 EventLoop
线程,如果是,就将任务加入优先队列中;如果不是,则调用 execute()
函数,其内部也会将任务添加到优先队列中,并启动 EventLoop
线程。
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduleFromEventLoop(task);
} else {
final long deadlineNanos = task.deadlineNanos();
// task will add itself to scheduled task queue when run if not expired
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(task);
} else {
lazyExecute(task);
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK);
}
}
}
return task;
}
final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
// nextTaskId a long and so there is no chance it will overflow back to 0
scheduledTaskQueue().add(task.setId(++nextTaskId));
}
根据接口定义,多个调度函数的实现都用到了上面的核心调度函数。对于不同的调度函数,要做的额外工作是计算并检查 初始延时、延时 或 间隔 的合法性。
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask<Void>(
this,
command,
deadlineNanos(unit.toNanos(delay))));
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
validateScheduled0(initialDelay, unit);
validateScheduled0(period, unit);
return schedule(new ScheduledFutureTask<Void>(
this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
validateScheduled0(initialDelay, unit);
validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask<Void>(
this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
SingleThreadEventExecutor
基于 AbstractScheduledEventExecutor
抽象类的进一步实现,也是一个抽象类。顾名思义,该类使用一个单独的线程执行所有提交的任务。
Definition
/**
* Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
*
*/
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
}
Constructor
从最复杂的构造函数可以看出类内维护的成员变量及作用。
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param executor the {@link Executor} which will be used for executing
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ThreadExecutorMap.apply(executor, this);
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
其中:
addTaskWakesUp
指示调用addTask()
时是否唤醒执行器线程maxPendingTasks
指示拒绝接收新任务前,能够容纳的最大任务数量taskQueue
是保存所有等待任务的队列rejectedExecutionHandler
是当任务数量饱和时,所采取的拒绝策略
private final Queue<Runnable> taskQueue;
/**
* Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
* {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
* calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
* implementation that does not support blocking operations at all.
*/
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
剩下的主要操作即从 taskQueue
中取出任务 (并执行),或将任务放入 taskQueue
中。
Poll
如果当前线程是 EventLoop
线程,则不断试图从任务队列中 poll()
出一个不为空任务 (非阻塞)。如果没有任务,则返回 null
。
/**
* @see Queue#poll()
*/
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {
return task;
}
}
}
Take
同样需要执行线程是 EventLoop
线程。在一个死循环内,不断 peek()
队头。如果队头为空,那么调用 take()
阻塞在队列上,直到队列中有新任务;如果队头不为空,而任务有延时调度时间,则等待任务到期后再取出任务。
/**
* Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
* <p>
* Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
* created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
* </p>
*
* @return {@code null} if the executor thread has been interrupted or waken up.
*/
protected Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// Waken up.
return null;
}
}
if (task == null) {
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
if (task != null) {
return task;
}
}
}
}
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
Add
/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
Remove
/**
* @see Queue#remove(Object)
*/
protected boolean removeTask(Runnable task) {
return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
}
Run
将所有任务从任务队列中 poll()
出来,然后执行任务。需要确保当前线程是 EventLoop
线程。
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
*
* @return {@code true} if and only if at least one task was run
*/
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
由于该类派生自 AbstractScheduledEventExecutor
,因此,除了 taskQueue
中会被立刻执行的任务以外,还有被计划调度执行的优先队列中的任务。以下函数先执行 takeQueue
中的所有任务,然后再执行优先队列中的所有到期任务。
/**
* Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
* or {@code maxDrainAttempts} has been exceeded.
* @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
* continuous task execution and scheduling from preventing the EventExecutor thread to
* make progress and return to the selector mechanism to process inbound I/O events.
* @return {@code true} if at least one task was run.
*/
protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
assert inEventLoop();
boolean ranAtLeastOneTask;
int drainAttempt = 0;
do {
// We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
// here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
} while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
if (drainAttempt > 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return drainAttempt > 0;
}
/**
* What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}.
* @param taskQueue the task queue to drain.
* @return {@code true} if at least {@link Runnable#run()} was called.
*/
private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
int remaining = Math.min(maxPendingTasks, taskQueue.size());
safeExecute(task);
// Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
// silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
safeExecute(task);
}
return true;
}
/**
* @return {@code true} if at least one scheduled task was executed.
*/
private boolean executeExpiredScheduledTasks() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return false;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return false;
}
do {
safeExecute(scheduledTask);
} while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
return true;
}
In Event Loop
确定当前线程是否是 EventLoop
线程。该类内维护了一个 Thread
的引用,指向 EventLoop
线程。判断的具体方式是,比较参数传入的线程是否与类内维护的线程是同一个。
private volatile Thread thread;
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
那么参数中给定的线程是哪一个呢?在该类的父类中找到了答案:
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
即,如果当前执行代码的线程与类内维护的 EventLoop
线程是同一个,那么当前就是 EventLoop
线程正在执行代码。
Shutdown Hook
类内维护了一个 Runnable
集合,集合内的回调会在关闭时被调用。该集合需要被维护,可以向其中加入回调,也可以从其中移除回调。
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
从下面的代码中,可以很清晰地看出:
- 如果当前线程是
EventLoop
线程,那么由它负责将回调加入到集合中 - 如果当前线程不是
EventLoop
线程,那么将加入回调的逻辑作为一个任务添加到任务队列中,之后由EventLoop
线程真正添加到队列中
/**
* Add a {@link Runnable} which will be executed on shutdown of this instance
*/
public void addShutdownHook(final Runnable task) {
if (inEventLoop()) {
shutdownHooks.add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
shutdownHooks.add(task);
}
});
}
}
/**
* Remove a previous added {@link Runnable} as a shutdown hook
*/
public void removeShutdownHook(final Runnable task) {
if (inEventLoop()) {
shutdownHooks.remove(task);
} else {
execute(new Runnable() {
@Override
public void run() {
shutdownHooks.remove(task);
}
});
}
}
private boolean runShutdownHooks() {
boolean ran = false;
// Note shutdown hooks can add / remove shutdown hooks.
while (!shutdownHooks.isEmpty()) {
List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
shutdownHooks.clear();
for (Runnable task: copy) {
try {
task.run();
} catch (Throwable t) {
logger.warn("Shutdown hook raised an exception.", t);
} finally {
ran = true;
}
}
}
if (ran) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
return ran;
}
Execute
将任务加入到执行队列中。如果 EventLoop
线程还没被设置,则先启动该线程。
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
Start Up
类内维护了一个变量来表示 EventLoop
线程的状态,并通过原子更新来维护它。
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private volatile int state = ST_NOT_STARTED;
线程的启动本身也作为一个任务添加到执行队列中。该任务会被当前执行这个任务的线程设置为 EventLoop
线程。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}