3.1 - EventLoopGroup
Created by : Mr Dk.
2021 / 02 / 17 18:12
Ningbo, Zhejiang, China
该类继承自 AbstractEventExecutorGroup
,只是重写了抽象定义的 next()
* Skeletal implementation of {@link EventLoopGroup}.
public abstract class AbstractEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
public abstract EventLoop next();
该抽象类继承自 MultithreadEventExecutorGroup
,实现了维护多个 EventLoop
线程的 EventLoopGroup
* Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
* the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
该类的构造函数主要传入了用于创建新线程的线程工厂,以及 EventLoop
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
该类中的函数直接调用了父类 AbstractEventLoopGroup
的 next()
,而 AbstractEventLoopGroup
的 next()
又使用了专门的 EventExecutorChooser
来决定下一个返回的 EventExecutor
。在 Chooser
的工厂类具体实现中,根据向工厂类传入的所有 EventExecutor
的个数是否是 2 的幂,有着两种 Chooser
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
如果 EventExecutor
的个数是 2 的幂,那么维护一个 AtomicInteger
计数器,通过类似 hash 的方式,返回一个 EventExecutor
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
否则,则使用一个 AtomicLong
计数器,通过对所有 EventExecutor
的个数取模,返回一个 EventExecutor
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
// Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
// The 64-bit long solves this by placing the overflow so far into the future, that no system
// will encounter this in practice.
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
可能原因是与运算的效率更高,所以执行器的个数是 2 的整数幂应当会更好。
该实现类会为每一个 Channel
创建一个 EventLoop
,适用于传统的阻塞 I/O 模型 (比如代码内不得不阻塞当前线程)。
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
* @deprecated this will be remove in the next-major release.
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
类内维护了一些提前创建好的空闲 EventLoop
,已经正在被 Channel
使用的 EventLoop
final Set<EventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
由于每个 EventLoop
将会对应一个 Channel
,因此这个类不支持 next()
public EventLoop next() {
throw new UnsupportedOperationException();
当一个新的 Channel
需要被当前 EventLoopGroup
public ChannelFuture register(Channel channel) {
ObjectUtil.checkNotNull(channel, "channel");
try {
EventLoop l = nextChild();
return l.register(new DefaultChannelPromise(channel, l));
} catch (Throwable t) {
return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
public ChannelFuture register(ChannelPromise promise) {
try {
return nextChild().register(promise);
} catch (Throwable t) {
return promise;
public ChannelFuture register(Channel channel, ChannelPromise promise) {
ObjectUtil.checkNotNull(channel, "channel");
try {
return nextChild().register(channel, promise);
} catch (Throwable t) {
return promise;
New Child
会调用 nextChild()
从空闲的 EventLoop
中拿出一个来服务 Channel
,该 EventLoop
会被加入到 activeChildren
集合中。当 EventLoop
private EventLoop nextChild() throws Exception {
if (shuttingDown) {
throw new RejectedExecutionException("shutting down");
EventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
loop = newChild(childArgs);
return loop;
默认的 EventLoopGroup
继承自 MultithreadEventLoopGroup
* {@link MultithreadEventLoopGroup} which must be used for the local transport.
public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
默认将创建具有 0
个线程的 EventLoopGroup
* Create a new instance with the default number of threads.
public DefaultEventLoopGroup() {
* Create a new instance
* @param nThreads the number of threads to use
public DefaultEventLoopGroup(int nThreads) {
this(nThreads, (ThreadFactory) null);
* Create a new instance with the default number of threads and the given {@link ThreadFactory}.
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
public DefaultEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory);
* Create a new instance
* @param nThreads the number of threads to use
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
* Create a new instance
* @param nThreads the number of threads to use
* @param executor the Executor to use, or {@code null} if the default should be used.
public DefaultEventLoopGroup(int nThreads, Executor executor) {
super(nThreads, executor);
New Child
该类中实现了 MultithreadEventExecutorGroup
中定义的抽象函数 newChild()
,实现了建立新的 EventLoop
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);