likes
comments
collection
share

Reactor模式

作者站长头像
站长
· 阅读数 52

主要论述来自于Doug Lea的Scalable IO in Java

  • 原文档《Doug Lea的Scalable IO in Java 》.pdf
  • 还有一篇作者为Douglas C. Schmidt的论文论文
  • "Reactor" 这个单词的词源来自于 "react",意思是"反应"、"回应"。在 Reactor 模式中,应用程序会对外部事件进行反应(react),例如接收到网络数据、文件可读、定时器到期等事件。因此,"Reactor" 这个单词被用来表示这种事件驱动的模式。

传统的网络编程:一客户一线程

  • 为每个客户端创建一个线程 Reactor模式
    public class Server {  
        private ServerSocket serverSocket;  
        public Server(int port) throws Exception {  
           this.serverSocket = new ServerSocket(port);  
        }  
        public void runServer(){  
           while (true) {  
             // 等待客户端连接  
             Socket clientSocket = this.serverSocket.accept();  
             // 创建一个新的线程来处理客户端请求  
             new Thread(() -> {  
                // 处理客户端请求  
                handleClientRequest(clientSocket);  
             }).start();  
          }  
        }  
        private void handleClientRequest(Socket clientSocket) throws Exception {  
             // TODO: 处理客户端请求  
             read();  
             decode();  
             compute();  
             encode();  
             send();  
             // 关闭客户端连接  
             clientSocket.close();  
         }  
         public static void main(String[] args) throws Exception {  
             Server server = new Server(8888);  
             server.runServer();  
         }  
    }
    
  • 这种方式显然不好用,因为每个线程都有自己的数据结构,而且会占用cpu的时间,当线程多的时候,程序都忙于线程上下文切换,导致客户端的请求响应慢。
  • 也可以用线程池,但是如果没有其他措施,必然也有性能瓶颈,上面的handleClientRequest方法显然是个耗时的方法,如果客户端过多,任务必然会排队,导致响应延迟。

Reactor模式的目标

网络编程中的可扩展性目标和分而治之的方法。

  • 其主要目标是在不断增加的负载下保持优雅的降级,随着资源的不断增加实现持续改进,并且同时满足可用性和性能目标,如短延迟、满足峰值需求、可调整的服务质量等。针对这些目标,提出了将处理过程划分为小任务并分别执行,通过非阻塞读写和I/O事件分派来实现。
  • 这种基于事件驱动的设计通常比其他替代方案更高效,可以减少资源和开销,需要手动绑定事件和动作,因此通常更难编程,需要将程序分解为简单的非阻塞操作,并且必须跟踪服务的逻辑状态

单线程的Reactor模式

  • 示例图 Reactor模式
  • 模拟代码如下:
public class Reactor implements Runnable {
    private final Selector selector;
    private final ServerSocketChannel serverSocket;
    //构造方法
    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());//附加一个对象,绑定
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();//事件就绪会返回,可能是新连接,可能是读或写
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                for (SelectionKey key : new HashSet<>(selectedKeys)) {
                    selectedKeys.remove(key);
                    dispatch(key);//分发事件,所有的run()都是通过dispatch去调用的
                }
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    private void dispatch(SelectionKey key) {
        Runnable handler = (Runnable) key.attachment();//返回绑定的对象
        if (handler != null) {
            handler.run();//处理绑定到该事件的SelectionKey
        }
    }

    private class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel socketChannel = serverSocket.accept();//接收请求
                if (socketChannel != null) {
                    new Handler(selector, socketChannel);//新请求到了,分配给Handler
                    SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);  
                    sk.attach(this);//附加一个对象,绑定,新连接到了,之前的Acceptor会失效,需重新绑定
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    private static class Handler implements Runnable {
        private final SocketChannel socketChannel;
        private final SelectionKey selectionKey;
        private final ByteBuffer input = ByteBuffer.allocate(1024);
        private final ByteBuffer output = ByteBuffer.allocate(1024);

        public Handler(Selector selector, SocketChannel socketChannel) throws IOException {
            this.socketChannel = socketChannel;
            this.socketChannel.configureBlocking(false);
            selectionKey = this.socketChannel.register(selector, SelectionKey.OP_READ);
            selectionKey.attach(this);//
            selector.wakeup();//立即返回,就绪后Reactor类的select()方法就会返回,就会调用dispatch方法,执行run方法
        }

        @Override
        public void run() {
            try {
                if (selectionKey.isReadable()) {
                    read();
                } else if (selectionKey.isWritable()) {
                    send();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }

        private void read() throws IOException {
            socketChannel.read(input);
            if (inputIsComplete()) {
                process();
                selectionKey.interestOps(SelectionKey.OP_WRITE);
            }
        }

        private boolean inputIsComplete() {
            // implementation omitted
            return true;
        }

        private void process() {
            // implementation omitted
            output.put("Hello from Reactor".getBytes());
            output.flip();
        }

        private void send() throws IOException {
            socketChannel.write(output);
            if (outputIsComplete()) {
                selectionKey.cancel();
            } else {
                selectionKey.interestOps(SelectionKey.OP_READ);
            }
        }

        private boolean outputIsComplete() {
            // implementation omitted
            return true;
        }
    }

    public static void main(String[] args) throws IOException {
        new Thread(new Reactor(8899)).start();//全局一个线程
    }
}
  • Reactor类中有一个循环,不断的从Selector中获取已经就绪的事件,可以是新的连接到了,可以是处理已经存在的客户端Channel,对其进行操作。
  • 刚起服时,会先绑定Acceptor,当有连接到来时,就会将客户端的Channel注册到Selector中,最先从channel的读事件开始。
  • 总的来说,就是充分利用操作系统的IO多路复用技术,将尽可能将操作细分,分别注册进Selector,对应的事件就绪后再交由程序继续处理,通过dispatch,将就绪的任务分配到对应的处理器。
  • 整个单线程的程序,只有select()是阻塞的。它还可以处理蛮蛮多的客户端请求。
  • 该例子是单线程的,理论上Reactor的内部类不需要继承Runnable接口的。
    • 实现Runnable接口只是为了更好地表达事件处理程序应该具有可运行的特性,可以在线程中执行。

单线程Reactor的缺点

  • 执行子步骤的时候会拖慢Reactor的速度,会导致程序无法快速回到select() 方法,Selector中可能已经有很多事件已经就绪了。
     public void run() {
      try {
          if (selectionKey.isReadable()) {
              read();//如果它很慢
          } else if (selectionKey.isWritable()) {
              send();
          }
      } catch (IOException ex) {
          ex.printStackTrace();
      }
    }
    
  • IO操作和处理数据速率上可能会不匹配。
  • 没法充分利用计算机的多核特性。

多线程Reactor模式

  • 在计算机系统中,I/O(输入/输出)的速度主要由磁盘、网络、内存等硬件设备的性能和操作系统的I/O子系统的实现方式决定。因此,程序对I/O操作的速度影响相对较小,而操作系统的I/O子系统和硬件设备的性能则对I/O速度有更大的影响。
  • 通常情况下,对于磁盘或网络I/O,由于数据的读写是在物理设备上进行的,因此需要进行一定的等待时间。在这段等待时间内,CPU可以进行其他操作,否则会导致CPU的浪费
  • 因此,使用异步IO或多线程技术可以最大限度地减少CPU的空闲时间,提高程序的效率。而Reactor模式正是一种基于异步IO的编程模式,可以高效地处理大量的并发I/O请求。
    private void read() throws IOException {
      socketChannel.read(input);//这句代码的快慢,由操作底层决定。和程序代码关系不大
      if (inputIsComplete()) {
          process();//但是这个的快慢是和程序代码相关的
          selectionKey.interestOps(SelectionKey.OP_WRITE);
      }
    }
    

常见的策略和方法如下

  • 可以使用工作线程来快速触发处理程序,避免处理程序降低Reactor的效率;
  • 可以将非 IO 处理转移到其他线程中,以减轻处理程序的压力;
  • 可以使用多个Reactor线程来分散 IO 的负载;
  • 可以使用负载均衡来匹配 CPU 和 IO 的速率;
  • 可以使用线程池来控制和调整工作线程数量,通常需要的线程比客户端数量少得多。

将非IO操作转移到其他线程

Reactor模式

  • 大致代码如下:
    class Handler implements Runnable {
      // uses util.concurrent thread pool
      static PooledExecutor pool = new PooledExecutor(...);
      static final int PROCESSING = 3;
      // ...
      synchronized void read() { // ...
          socket.read(input);//IO操作
          if (inputIsComplete()) {
              state = PROCESSING;
              pool.execute(new Processer());//非IO操作交由线程池处理
          }
      }
      synchronized void processAndHandOff() {
          process();
          state = SENDING; // or rebind attachment
          sk.interest(SelectionKey.OP_WRITE);
      }
      class Processer implements Runnable {
          public void run() { processAndHandOff(); }
      }
    }
    
    • 毕竟要将数据准备好才能执行后续的操作,后续的非IO操作要尽可能的快,才能提高性能。
  • 这只是其中一个优化点,可以用多个Reactor,同时执行I/O操作,这样就可以并发的接收连接请求了。Reactor模式
    public Reactor(int port, int nSubReactors) throws IOException {//构造方法
      // Create the server socket channel and bind to port
      serverChannel = ServerSocketChannel.open();
      serverChannel.bind(new InetSocketAddress(port));
      serverChannel.configureBlocking(false);
    
      // Create the sub-reactors
      subReactors = new ArrayList<>(nSubReactors);
      for (int i = 0; i < nSubReactors; i++) {
          Selector selector = Selector.open();
          SubReactor subReactor = new SubReactor(selector);
          subReactors.add(subReactor);
    
          // Start the sub-reactor's thread
          new Thread(subReactor).start();
      }
    
      // Assign the main acceptor to a random sub-reactor
      mainReactor = new Acceptor(serverChannel, this, nextSubReactor());//Acceptor选一个Reactor
      new Thread(mainReactor).start();
    }
    class Acceptor { // ...
      public synchronized void run() { ...
          Socket connection = serverSocket.accept();
          if (connection != null) {
              new Handler(nextSubReactor(), connection);
          }
      }
    }
    
    public SubReactor nextSubReactor() {//随机返回一个子Reactor
        // Round-robin
        return subReactors.get(ThreadLocalRandom.current().nextInt(subReactors.size()));
    }
    

本篇小结

  • Reactor模式的主要内容是构建高性能、可扩展的网络服务。该模式通过将网络服务分解为多个组件,实现了网络服务的解耦和重用,其主要组件如下:

    1. Reactor:负责监听并接受所有连接请求,管理所有连接,并分派事件给相应的Handler处理。
    2. Acceptor:负责接受客户端连接请求,创建连接,并将新连接注册到Reactor中。
    3. Dispatch:负责将不同类型的事件分发给不同的Handler处理。
    4. Handler:负责处理特定类型的事件。在处理过程中可能需要调用其他的系统服务,如数据库等。
    5. Event:描述了一个事件,可以是网络连接建立、数据可读或可写等。
  • Reactor模式的工作流程如下:

    1. 初始化:在应用程序启动时创建Reactor,并将Acceptor注册到Reactor中,以监听新连接请求。
    2. 连接:当一个客户端请求连接时,Acceptor接受请求并创建一个新的连接。
    3. 注册:将新的连接注册到Reactor中,以便Reactor可以监控其事件。每个连接会有对应的Handler,Reactor会将事件分发给相应的Handler。
    4. 事件分发:当一个连接上发生事件时,Reactor会使用Dispatch将事件分发给相应的Handler进行处理。
    5. 处理:Handler根据接收到的事件类型来处理连接的数据读写,或者执行其他系统服务的调用。在处理过程中,可能会出现阻塞等耗时操作。
    6. 回调:当Handler处理完事件后,会向Reactor注册事件回调函数,以便在需要时通知Reactor已经处理完该事件。
  • Reactor模式的工作流程也可以换一种描述:Reactor模式

    1. 当应用向Initiation Dispatcher注册具体的事件处理器时,应用会标识出该事件处理器希望Initiation Dispatcher在某个事件发生时向其通知的该事件,该事件与Handle关联。

    2. Initiation Dispatcher会要求每个事件处理器向其传递内部的Handle。该Handle向操作系统标识了事件处理器

    3. 当所有的事件处理器注册完毕后,应用会调用handle_events方法来启动Initiation Dispatcher的事件循环。这时,Initiation Dispatcher会将每个注册的事件管理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。比如说,TCP协议层会使用select同步事件分离器操作来等待客户端发送的数据到达连接的socket handle上。

    4. 当与某个事件源对应的Handle变为ready状态时(比如说,TCP socket变为等待读状态时),同步事件分离器就会通知Initiation Dispatcher。

    5. Initiation Dispatcher会触发事件处理器的回调方法,从而响应这个处于ready状态的Handle。当事件发生时,Initiation Dispatcher会将被事件源激活的Handle作为『key』来寻找并分发恰当的事件处理器回调方法。

    6. Initiation Dispatcher会回调事件处理器的handle_events回调方法来执行特定于应用的功能(开发者自己所编写的功能),从而响应这个事件。所发生的事件类型可以作为该方法参数并被该方法内部使用来执行额外的特定于服务的分离与分发。

  • Reactor模式是一种经典的事件驱动设计模式,主要用于实现高性能的IO操作。其核心思想是将事件的响应和处理分离开来,将IO操作交给操作系统内核,通过异步通知的方式来进行事件的处理。

    • 多想想selector.select();,操作系统提供的异步通知功能。
转载自:https://juejin.cn/post/7228235387356119095
评论
请登录