Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

3.1 - EventLoopGroup

Created by : Mr Dk.

2021 / 02 / 17 18:12

Ningbo, Zhejiang, China


AbstractEventLoopGroup

该类继承自 AbstractEventExecutorGroup,只是重写了抽象定义的 next()。

/**
 * Skeletal implementation of {@link EventLoopGroup}.
 */
public abstract class AbstractEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
    @Override
    public abstract EventLoop next();
}

MultithreadEventLoopGroup

该抽象类继承自 MultithreadEventExecutorGroup,实现了维护多个 EventLoop 线程的 EventLoopGroup。

Definition

/**
 * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

}

Constructor

该类的构造函数主要传入了用于创建新线程的线程工厂,以及 EventLoop 线程个数等参数。线程个数可以通过属性配置,也可以通过函数参数指定。

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

/**
 * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

/**
 * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
 */
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

Next

该类中的函数直接调用了父类 AbstractEventLoopGroup 的 next(),而 AbstractEventLoopGroup 的 next() 又使用了专门的 EventExecutorChooser 来决定下一个返回的 EventExecutor。在 Chooser 的工厂类具体实现中,根据向工厂类传入的所有 EventExecutor 的个数是否是 2 的幂,有着两种 Chooser 实现:

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}

如果 EventExecutor 的个数是 2 的幂,那么维护一个 AtomicInteger 计数器,通过类似 hash 的方式,返回一个 EventExecutor:

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

否则,则使用一个 AtomicLong 计数器,通过对所有 EventExecutor 的个数取模,返回一个 EventExecutor:

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
    // The 64-bit long solves this by placing the overflow so far into the future, that no system
    // will encounter this in practice.
    private final AtomicLong idx = new AtomicLong();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

可能原因是与运算的效率更高,所以执行器的个数是 2 的整数幂应当会更好。

ThreadPerChannelEventLoopGroup

该实现类会为每一个 Channel 创建一个 EventLoop,适用于传统的阻塞 I/O 模型 (比如代码内不得不阻塞当前线程)。

Definition

/**
 * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
 *
 * @deprecated this will be remove in the next-major release.
 */
@Deprecated
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {

}

Next

类内维护了一些提前创建好的空闲 EventLoop,已经正在被 Channel 使用的 EventLoop:

final Set<EventLoop> activeChildren =
        Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();

由于每个 EventLoop 将会对应一个 Channel,因此这个类不支持 next() 函数:

@Override
public EventLoop next() {
    throw new UnsupportedOperationException();
}

Register

当一个新的 Channel 需要被当前 EventLoopGroup 处理时,进行注册即可。

@Override
public ChannelFuture register(Channel channel) {
    ObjectUtil.checkNotNull(channel, "channel");
    try {
        EventLoop l = nextChild();
        return l.register(new DefaultChannelPromise(channel, l));
    } catch (Throwable t) {
        return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
    }
}

@Override
public ChannelFuture register(ChannelPromise promise) {
    try {
        return nextChild().register(promise);
    } catch (Throwable t) {
        promise.setFailure(t);
        return promise;
    }
}

@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
    ObjectUtil.checkNotNull(channel, "channel");
    try {
        return nextChild().register(channel, promise);
    } catch (Throwable t) {
        promise.setFailure(t);
        return promise;
    }
}

New Child

在注册时,EventLoopGroup 会调用 nextChild() 从空闲的 EventLoop 中拿出一个来服务 Channel,该 EventLoop 会被加入到 activeChildren 集合中。当 EventLoop 的数量不够用时,则抛出异常。

private EventLoop nextChild() throws Exception {
    if (shuttingDown) {
        throw new RejectedExecutionException("shutting down");
    }

    EventLoop loop = idleChildren.poll();
    if (loop == null) {
        if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
            throw tooManyChannels;
        }
        loop = newChild(childArgs);
        loop.terminationFuture().addListener(childTerminationListener);
    }
    activeChildren.add(loop);
    return loop;
}

DefaultEventLoopGroup

默认的 EventLoopGroup 实现。

Definition

继承自 MultithreadEventLoopGroup 抽象类。

/**
 * {@link MultithreadEventLoopGroup} which must be used for the local transport.
 */
public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {

}

Constructor

默认将创建具有 0 个线程的 EventLoopGroup。

/**
 * Create a new instance with the default number of threads.
 */
public DefaultEventLoopGroup() {
    this(0);
}

/**
 * Create a new instance
 *
 * @param nThreads          the number of threads to use
 */
public DefaultEventLoopGroup(int nThreads) {
    this(nThreads, (ThreadFactory) null);
}

/**
 * Create a new instance with the default number of threads and the given {@link ThreadFactory}.
 *
 * @param threadFactory     the {@link ThreadFactory} or {@code null} to use the default
 */
public DefaultEventLoopGroup(ThreadFactory threadFactory) {
    this(0, threadFactory);
}

/**
 * Create a new instance
 *
 * @param nThreads          the number of threads to use
 * @param threadFactory     the {@link ThreadFactory} or {@code null} to use the default
 */
public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    super(nThreads, threadFactory);
}

/**
 * Create a new instance
 *
 * @param nThreads          the number of threads to use
 * @param executor          the Executor to use, or {@code null} if the default should be used.
 */
public DefaultEventLoopGroup(int nThreads, Executor executor) {
    super(nThreads, executor);
}

New Child

该类中实现了 MultithreadEventExecutorGroup 中定义的抽象函数 newChild(),实现了建立新的 EventLoop 结点的行为。

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new DefaultEventLoop(this, executor);
}
Edit this page on GitHub
Prev
3.1 - EventExecutorGroup
Next
3.3 - EventExecutor