Single

约 507 字大约 2 分钟

Single

单线程版本open in new window

An image

如图所示,Reactoracceptor(listenfd)read decode compute encode send 在一个线程中,也就是 listenfdconnectionfd 在一个线程, redis server 其底层也是单线程 Reactor ,所有我们需要研究下这个模式。

安装

Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    serverSocket.socket().bind(new InetSocketAddress(port));
    serverSocket.configureBlocking(false);
    SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);
    sk.attach(new Acceptor());
}

rocketmq line 147open in new window

 public void beginAccept() throws Exception {
      this.serverSocketChannel = ServerSocketChannel.open();
      this.selector = RemotingUtil.openSelector();
      this.serverSocketChannel.socket().setReuseAddress(true);
      this.serverSocketChannel.socket().bind(this.socketAddressListen);
      this.serverSocketChannel.configureBlocking(false);
      this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

多线程还是单线程都使用的都是同一个 listenfd 所以代码基本一致,而且 ServerSocketChannel 只关注 accept 事件。

线程循环

  public void run() {  // normally in a new Thread
      try {
        // 如果当前线程没有被中断
        while (!Thread.interrupted()) {
          // 使用选择器查找准备好的事件
          selector.select();
          // 这里做处理的时候时间可能是ServerSocketChannle或者SocketChannle
          Set selected = selector.selectedKeys();
          Iterator it = selected.iterator();
          while (it.hasNext())
             // 只有要事件发生就进行处理
             dispatch((SelectionKey)(it.next());
          selected.clear();
          }
  } catch (IOException ex) { /* ... */ }
  }
void dispatch(SelectionKey k) {
  Runnable r = (Runnable)(k.attachment()); if (r != null)
}
  1. 当前线程进行循环,查看是否有事件发生
  2. 有事件发生则处理事件
  3. 如果attachment = Acceptor产生一个新的SocketSocket
  4. 如果attachment = Handler进行读写处理。

连接器

  class Acceptor implements Runnable { // inner
      public void run() {
        try {
          SocketChannel c = serverSocket.accept();
          if (c != null)
            new Handler(selector, c);
        catch(IOException ex) { /* ... */ }
      }
    }
  }
}

SelectKeychannelServerSocketChannel 时,因为 attachment=Acceptor ,每次被都会产生一个新的 SockectChannel ,其会被包装成 Handler

An image

处理器

Handler(Selector sel, SocketChannel c) throws IOException {
    socket = c; c.configureBlocking(false);
    // Optionally try first read now
    sk = socket.register(sel, 0);
    sk.attach(this);
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
}

Handler初始化时,会向 Selector 注册自己,并将 Handler attach 到 SelectKey 上。

处理器处理请求

SelectKeychannelSocketChannel 时,因为 attachment=Handler ,根据事件进行数据处理。

An image

总结

Reactor模型核心在于 attach 上,会根据 attch 的不同对象类型,进行不同处理。

  • SelectKey = Selector + ServerSocketChannel + Acceptor
  • SelectKey = Selector + SocketChannel + Handler