Channel-register

约 1071 字大约 4 分钟

Channel-register

Channel 注册

  1. 因为EventLoop对于ServerChannel还是普通channel是无差异的,所以其注册流程如图所示。

An image

提示

有差异的是分组不同,一般 Boss EventLoopGroup 负责接收,Work EventLoopGroup 负责业务处理。

ServerChannel 注册

  1. 注册还是在initAndRegister()进行的,从配置中获取到workGroup,然后将channel注册进去。
// 其他代码省略
final ChannelFuture initAndRegister() {
    // 异步进行注册
    // config = ServerBootstrapConfig
    // group 获取的是work的group
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}

@Override
public ChannelFuture register(Channel channel) {
    // 获取一个EventLoop进行注册
    return next().register(channel);
}
// 用EventExecutorChooserFactory.EventExecutorChooser 选择一个EventLoop
public EventExecutor next() {
    return chooser.next();
}
  1. EventLoop注册channel,先产生一个DefaultChannelPromise,其包含chanelnioEventGroup,然后就包含了channel,selector的所有信息了。
// SingleThreadEventLoop
@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

// channel绑定一个EventLoop
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
    super(executor);
    this.channel = checkNotNull(channel, "channel");
}

// 这里已经委托给EventLoop进行处理
public ChannelFuture register(final ChannelPromise promise) {
      ObjectUtil.checkNotNull(promise, "promise");
      // AbstractUnsafe
      promise.channel().unsafe().register(this, promise);
      return promise;
}

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  // 省略校验部分
  AbstractChannel.this.eventLoop = eventLoop;
  // 判断当前线程与eventLoop里面的线程是否为同一个线程
  if (eventLoop.inEventLoop()) {
      // 如果是同一个线程则进行注册,如果不是同一个线程,委托给eventLoop进行处理
      register0(promise);
  } else {
      try {
          // 将任务提交给eventLoop进行处理
          eventLoop.execute(new Runnable() {
              @Override
              public void run() {
                  register0(promise);
              }
          });
      } catch (Throwable t) {
         // 省略
      }
  }
}
  1. 处理任务,先增加任务到队列里面,然后判断是否需要开启线程。
// SingleThreadEventExecutor#execute
public void execute(Runnable task) {
  // 省略
  boolean inEventLoop = inEventLoop();
  addTask(task);
  // 如果SingleThreadEventExecutor没有初始化过线程,则上面为false
  if (!inEventLoop) {
      startThread();
      if (isShutdown() && removeTask(task)) {
          reject();
      }
  }
  if (!addTaskWakesUp && wakesUpForTask(task)) {
      wakeup(inEventLoop);
  }
}
  1. 开启线程
private void startThread() {
    // 对状态进行判断,判断是否没开启过
  if (state == ST_NOT_STARTED) {
      if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
          try {
              // 设置线程已经初始化过了
              doStartThread();
          } catch (Throwable cause) {
              STATE_UPDATER.set(this, ST_NOT_STARTED);
              PlatformDependent.throwException(cause);
          }
      }
  }
}

private void doStartThread() {
    // 线程为空
    assert thread == null;
    // io.netty.util.concurrent.ThreadPerTaskExecutor
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 将创建的线程绑定到event
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }
            boolean success = false;
            updateLastExecutionTime();
            try {
                // 这个时候listenfd的线程已经启动了
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
               // 省略...
            }
        }
    });
}

NioEventLoop 线程

  1. 这里已经切换线程了,使用NioEventLoop中的线程。
 // 创建一个线程去执行
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    // 因为其线程工厂为DefaultThreadFactory,所以创建出来的线程为FastThreadLocalThread
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
  1. 新线程中的任务SingleThreadEventExecutor.this.run();
  2. 处理任务和事件,由于线程还没有绑定端口,所以这时候是没有SelectedKeys要处理,仅仅只有Queue<Runnable> taskQueue中的任务需要处理,这个task也就是前面的register0(promise);
@Override
protected void run() {
  for (;;) {
    try {
      // 结果为0,处理default
      switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
        case SelectStrategy.CONTINUE:
          continue;
        case SelectStrategy.SELECT:
          select(wakenUp.getAndSet(false));
          if (wakenUp.get()) {
            selector.wakeup();
          }
        default:
      }

      cancelledKeys = 0;
      needsToSelectAgain = false;
      final int ioRatio = this.ioRatio;
      if (ioRatio == 100) {
        try {
          processSelectedKeys();
        } finally {
          runAllTasks();
        }
      } else {
        final long ioStartTime = System.nanoTime();
        try {
          // 因为没有事件处理
          processSelectedKeys();
        } finally {
          final long ioTime = System.nanoTime() - ioStartTime;
          // 开始处理当前线程所有事情
          runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
      }
    } catch (Throwable t) {
      handleLoopException(t);
    }
    try {
      if (isShuttingDown()) {
        closeAll();
        if (confirmShutdown()) {
          return;
        }
      }
    } catch (Throwable t) {
      handleLoopException(t);
    }
  }
}

protected boolean runAllTasks(long timeoutNanos) {
   // 没有调度任务
    fetchFromScheduledTaskQueue();
    // 队列中拉取任务
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
         // 第一个任务是register0
        safeExecute(task);

        runTasks ++;
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

第一个任务

  1. 从队列中拉取的第一个任务register0
// 之前提交的任务
  eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
  });
// 队列中的任务开始处理
private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // 开始注册
        doRegister();
        neverRegistered = false;
        registered = true;
        // 这里开始要处理handler了
        pipeline.invokeHandlerAddedIfNeeded();

        // 处理完所有任务后,这里设置成功
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

核心代码

  1. Selector中注册java.nio.ServerSocketChannel
// 真实的注册任务
protected void doRegister() throws Exception {
  boolean selected = false;
  for (;;) {
      try {
          // 将channel注册到到Selector中
          selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
          return;
      } catch (CancelledKeyException e) {
          if (!selected) {
              eventLoop().selectNow();
              selected = true;
          } else {
              throw e;
          }
      }
}

总结

核心是异步处理线程的注册,由对应的 NioEventLoop 进行处理,也处理了线程初始化问题。