3.4 - EventLoop
Created by : Mr Dk.
2021 / 02 / 18 23:01
Ningbo, Zhejiang, China
AbstractEventLoop
继承自 AbstractEventExecutor
的抽象类,没啥花头。
Definition
/**
* Skeletal implementation of {@link EventLoop}.
*/
public abstract class AbstractEventLoop extends AbstractEventExecutor implements EventLoop {
protected AbstractEventLoop() { }
protected AbstractEventLoop(EventLoopGroup parent) {
super(parent);
}
@Override
public EventLoopGroup parent() {
return (EventLoopGroup) super.parent();
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
}
SingleThreadEventLoop
继承自 SingleThreadEventExecutor
的抽象类,实现了单个线程的 EventLoop
。
Definition
/**
* Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
*
*/
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
}
Tail Tasks
在该类中维护了一个队列,用于保存在当前 EventLoop 迭代或下一次 EventLoop 迭代时想要被运行一次的任务。
private final Queue<Runnable> tailTasks;
/**
* Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
*
* @param task to be added.
*/
@UnstableApi
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (isShutdown()) {
reject();
}
if (!tailTasks.offer(task)) {
reject(task);
}
if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) {
wakeup(inEventLoop());
}
}
/**
* Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
*
* @param task to be removed.
*
* @return {@code true} if the task was removed as a result of this call.
*/
@UnstableApi
final boolean removeAfterEventLoopIterationTask(Runnable task) {
return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
ThreadPerChannelEventLoop
适用于 OIO 场景的 EventLoop
,一个 ThreadPerChannelEventLoop
服务一个 Channel
。
Definition
/**
* {@link SingleThreadEventLoop} which is used to handle OIO {@link Channel}'s. So in general there will be
* one {@link ThreadPerChannelEventLoop} per {@link Channel}.
*
* @deprecated this will be remove in the next-major release.
*/
@Deprecated
public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
}
Register
将 Channel
注册到当前 EventLoop
上绑定。在类内持有对 Channel
的引用。
private Channel ch;
@Override
public ChannelFuture register(ChannelPromise promise) {
return super.register(promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
} else {
deregister();
}
}
});
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return super.register(channel, promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
} else {
deregister();
}
}
});
}
Run
该类实现了 SingleThreadEventExecutor
中定义的抽象函数 run()
,即 EventLoop 的主循环:
- 执行任务队列中的任务
- 处理
Channel
关闭 - 处理
Channel
的解除注册
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
Channel ch = this.ch;
if (isShuttingDown()) {
if (ch != null) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
if (confirmShutdown()) {
break;
}
} else {
if (ch != null) {
// Handle deregistration
if (!ch.isRegistered()) {
runAllTasks();
deregister();
}
}
}
}
}
DefaultEventLoop
默认的单线程 EventLoop
实现。
Definition
public class DefaultEventLoop extends SingleThreadEventLoop {
}
Run
实现了 SingleThreadEventExecutor
中定义的抽象函数 run()
,即 EventLoop 的主循环:
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}