Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ⚙️ Netty in Action
    • 1 - NIO Transportation Model
    • 2 - ByteBuf
    • 3 - Netty Thread Model
    • 3.1 - EventExecutorGroup
    • 3.1 - EventLoopGroup
    • 3.3 - EventExecutor
    • 3.4 - EventLoop
    • 4 - Channel Concept
    • 4.1 - Channel
    • 4.2 - ChannelHandler
    • 4.3 - ChannelHandlerContext
    • 4.4 - ChannelPipeline
    • 5.1 - Future
    • 5.2 - CompleteFuture
    • 5.3 - FutureListener

3.1 - EventExecutorGroup

Created by : Mr Dk.

2021 / 02 / 17 18:12

Ningbo, Zhejiang, China


AbstractEventExecutorGroup

该类初步实现了 EventExecutorGroup 接口内定义的函数。根据 EventExecutorGroup 的接口定义,该类的目的是为了管理多个 EventExecutor。接口中定义了 next() 来返回其中的一个实例。因此该类的实现基本上都是调用 next() 获取到 EventExecutor 实例,然后调用实例相应的函数。

Definition

/**
 * Abstract base class for {@link EventExecutorGroup} implementations.
 */
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

}

Submit

@Override
public Future<?> submit(Runnable task) {
    return next().submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
    return next().submit(task, result);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
    return next().submit(task);
}

其余函数也类似,都是通过调用 next() 获取到 EventExecutor 实例,然后调用 EventExecutor 实例的函数。

MultithreadEventExecutorGroup

该类进一步实现了使用 多个线程 同时处理任务的 EventExecutorGroup。

Definition

/**
 * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 */
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

}

Fields

  • 类内使用一个 EventExecutor 数组 children 维护组内所有的 EventExecutor
  • readonlyChildren 用于向类外提供 不可修改 的 EventExecutor 迭代器视角。
  • 原子变量 terminatedChildren 记录已经执行完毕的 EventExecutor 个数
  • terminationFuture 变量用于在 所有 EventExecutor 都结束后触发通知。
  • chooser 用于为当前 EventExecutorGroup 选择出一个 EventExecutor 实例
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

Constructor

构造函数依次创建每一个 EventExecutor,每个线程对应一个 EventExecutor。每个 EventExecutor 结束后,将会使原子变量 terminatedChildren 自增,当该变量等于 EventExecutor 的数量时,该 group 的 terminationFuture 就应当被设为结束状态 - 每一个 EventExecutor 也会监听所属 group 的 terminationFuture。

/**
 * Create a new instance.
 *
 * @param nThreads          the number of threads that will be used by this instance.
 * @param executor          the Executor to use, or {@code null} if the default should be used.
 * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

其中,使用了抽象的 newChild() 函数来建立每一个 EventExecutor。

/**
 * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
 * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
 *
 */
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

其它相关的所有操作都是通过对 children 数组进行迭代,依次操作每一个 EventExecutor 实例完成的。比如:

@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    for (EventExecutor l: children) {
        l.shutdownGracefully(quietPeriod, timeout, unit);
    }
    return terminationFuture();
}

DefaultEventExecutorGroup

默认的 EventExecutorGroup 实现。

Definition

/**
 * Default implementation of {@link MultithreadEventExecutorGroup} which will use {@link DefaultEventExecutor} instances
 * to handle the tasks.
 */
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {

}

构造函数通过传入 ThreadFactory 来建立组内维护的 EventExecutor。另外,还需要指定线程数、拒绝策略等。

/**
 * @see #DefaultEventExecutorGroup(int, ThreadFactory)
 */
public DefaultEventExecutorGroup(int nThreads) {
    this(nThreads, null);
}

/**
 * Create a new instance.
 *
 * @param nThreads          the number of threads that will be used by this instance.
 * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
 */
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
    this(nThreads, threadFactory, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
         RejectedExecutionHandlers.reject());
}

/**
 * Create a new instance.
 *
 * @param nThreads          the number of threads that will be used by this instance.
 * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
 * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
 * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
 */
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks,
                                 RejectedExecutionHandler rejectedHandler) {
    super(nThreads, threadFactory, maxPendingTasks, rejectedHandler);
}

另外,对于抽象函数 newChild,在该类中的实现是,创建一个 DefaultEventExecutor:

@Override
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
    return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
}
Edit this page on GitHub
Prev
3 - Netty Thread Model
Next
3.1 - EventLoopGroup