3.1 - EventLoopGroup
Created by : Mr Dk.
2021 / 02 / 17 18:12
Ningbo, Zhejiang, China
AbstractEventLoopGroup
该类继承自 AbstractEventExecutorGroup
,只是重写了抽象定义的 next()
。
/**
* Skeletal implementation of {@link EventLoopGroup}.
*/
public abstract class AbstractEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
@Override
public abstract EventLoop next();
}
MultithreadEventLoopGroup
该抽象类继承自 MultithreadEventExecutorGroup
,实现了维护多个 EventLoop
线程的 EventLoopGroup
。
Definition
/**
* Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
* the same time.
*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
}
Constructor
该类的构造函数主要传入了用于创建新线程的线程工厂,以及 EventLoop
线程个数等参数。线程个数可以通过属性配置,也可以通过函数参数指定。
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
Next
该类中的函数直接调用了父类 AbstractEventLoopGroup
的 next()
,而 AbstractEventLoopGroup
的 next()
又使用了专门的 EventExecutorChooser
来决定下一个返回的 EventExecutor
。在 Chooser
的工厂类具体实现中,根据向工厂类传入的所有 EventExecutor
的个数是否是 2 的幂,有着两种 Chooser
实现:
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
如果 EventExecutor
的个数是 2 的幂,那么维护一个 AtomicInteger
计数器,通过类似 hash 的方式,返回一个 EventExecutor
:
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
否则,则使用一个 AtomicLong
计数器,通过对所有 EventExecutor
的个数取模,返回一个 EventExecutor
:
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
// Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
// The 64-bit long solves this by placing the overflow so far into the future, that no system
// will encounter this in practice.
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
可能原因是与运算的效率更高,所以执行器的个数是 2 的整数幂应当会更好。
ThreadPerChannelEventLoopGroup
该实现类会为每一个 Channel
创建一个 EventLoop
,适用于传统的阻塞 I/O 模型 (比如代码内不得不阻塞当前线程)。
Definition
/**
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
*
* @deprecated this will be remove in the next-major release.
*/
@Deprecated
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
}
Next
类内维护了一些提前创建好的空闲 EventLoop
,已经正在被 Channel
使用的 EventLoop
:
final Set<EventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
由于每个 EventLoop
将会对应一个 Channel
,因此这个类不支持 next()
函数:
@Override
public EventLoop next() {
throw new UnsupportedOperationException();
}
Register
当一个新的 Channel
需要被当前 EventLoopGroup
处理时,进行注册即可。
@Override
public ChannelFuture register(Channel channel) {
ObjectUtil.checkNotNull(channel, "channel");
try {
EventLoop l = nextChild();
return l.register(new DefaultChannelPromise(channel, l));
} catch (Throwable t) {
return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
}
}
@Override
public ChannelFuture register(ChannelPromise promise) {
try {
return nextChild().register(promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
ObjectUtil.checkNotNull(channel, "channel");
try {
return nextChild().register(channel, promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
New Child
在注册时,EventLoopGroup
会调用 nextChild()
从空闲的 EventLoop
中拿出一个来服务 Channel
,该 EventLoop
会被加入到 activeChildren
集合中。当 EventLoop
的数量不够用时,则抛出异常。
private EventLoop nextChild() throws Exception {
if (shuttingDown) {
throw new RejectedExecutionException("shutting down");
}
EventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = newChild(childArgs);
loop.terminationFuture().addListener(childTerminationListener);
}
activeChildren.add(loop);
return loop;
}
DefaultEventLoopGroup
默认的 EventLoopGroup
实现。
Definition
继承自 MultithreadEventLoopGroup
抽象类。
/**
* {@link MultithreadEventLoopGroup} which must be used for the local transport.
*/
public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
}
Constructor
默认将创建具有 0
个线程的 EventLoopGroup
。
/**
* Create a new instance with the default number of threads.
*/
public DefaultEventLoopGroup() {
this(0);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
*/
public DefaultEventLoopGroup(int nThreads) {
this(nThreads, (ThreadFactory) null);
}
/**
* Create a new instance with the default number of threads and the given {@link ThreadFactory}.
*
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
*/
public DefaultEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
*/
public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param executor the Executor to use, or {@code null} if the default should be used.
*/
public DefaultEventLoopGroup(int nThreads, Executor executor) {
super(nThreads, executor);
}
New Child
该类中实现了 MultithreadEventExecutorGroup
中定义的抽象函数 newChild()
,实现了建立新的 EventLoop
结点的行为。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
}