3.1 - EventExecutorGroup
Created by : Mr Dk.
2021 / 02 / 17 18:12
Ningbo, Zhejiang, China
AbstractEventExecutorGroup
该类初步实现了 EventExecutorGroup
接口内定义的函数。根据 EventExecutorGroup
的接口定义,该类的目的是为了管理多个 EventExecutor
。接口中定义了 next()
来返回其中的一个实例。因此该类的实现基本上都是调用 next()
获取到 EventExecutor
实例,然后调用实例相应的函数。
Definition
/**
* Abstract base class for {@link EventExecutorGroup} implementations.
*/
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
}
Submit
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
}
其余函数也类似,都是通过调用 next()
获取到 EventExecutor
实例,然后调用 EventExecutor
实例的函数。
MultithreadEventExecutorGroup
该类进一步实现了使用 多个线程 同时处理任务的 EventExecutorGroup
。
Definition
/**
* Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
* the same time.
*/
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
}
Fields
- 类内使用一个
EventExecutor
数组children
维护组内所有的EventExecutor
readonlyChildren
用于向类外提供 不可修改 的EventExecutor
迭代器视角。- 原子变量
terminatedChildren
记录已经执行完毕的EventExecutor
个数 terminationFuture
变量用于在 所有EventExecutor
都结束后触发通知。chooser
用于为当前EventExecutorGroup
选择出一个EventExecutor
实例
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
Constructor
构造函数依次创建每一个 EventExecutor
,每个线程对应一个 EventExecutor
。每个 EventExecutor
结束后,将会使原子变量 terminatedChildren
自增,当该变量等于 EventExecutor
的数量时,该 group 的 terminationFuture
就应当被设为结束状态 - 每一个 EventExecutor
也会监听所属 group 的 terminationFuture
。
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
其中,使用了抽象的 newChild()
函数来建立每一个 EventExecutor
。
/**
* Create a new EventExecutor which will later then accessible via the {@link #next()} method. This method will be
* called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
*
*/
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
其它相关的所有操作都是通过对 children
数组进行迭代,依次操作每一个 EventExecutor
实例完成的。比如:
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
DefaultEventExecutorGroup
默认的 EventExecutorGroup
实现。
Definition
/**
* Default implementation of {@link MultithreadEventExecutorGroup} which will use {@link DefaultEventExecutor} instances
* to handle the tasks.
*/
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
}
构造函数通过传入 ThreadFactory
来建立组内维护的 EventExecutor
。另外,还需要指定线程数、拒绝策略等。
/**
* @see #DefaultEventExecutorGroup(int, ThreadFactory)
*/
public DefaultEventExecutorGroup(int nThreads) {
this(nThreads, null);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
*/
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
RejectedExecutionHandlers.reject());
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(nThreads, threadFactory, maxPendingTasks, rejectedHandler);
}
另外,对于抽象函数 newChild
,在该类中的实现是,创建一个 DefaultEventExecutor
:
@Override
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
}