likes
comments
collection
share

高性能网络通讯框架Netty预热—NIO模型

作者站长头像
站长
· 阅读数 11

Netty是一款高性能的网络通信框架,其应用也很广泛,比如常用的消息队列RocketMQ,RPC框架Dubbo在底层都有使用到Netty。在学习Netty之前,我们需要对IO模型要有一定的了解,其中最重要的就是NIO,所以今天打算先对NIO进行一些简单的梳理。

IO模型

常见IO模型分为几种:

  • BIO :Blocking IO, 即同步阻塞式IO。Client和Server的每建立一次连接,都会创建一个线程,在Client等待Server响应的期间,会处于阻塞状态。
  • NIO :Non-Blocking IO,即同步非阻塞式IO。NIO是基于Reactor模式,面向缓冲区并结合通道的IO模型。客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求就进行处理。
  • AIO : Asynchronous IO,即异步非阻塞,采用了 Proactor 模式,特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

既然BIO和NIO都是以同步的方式工作的,那么这里就先拿BIO与NIO做个简单的对比,比较两者的差异具体在哪些地方。

BIO

上面提到了Client和Server的每建立一次连接,都会创建一个线程并且会发生阻塞,那么我们就来简单的验证一下。验证方式也比较简单,在编辑器中创建一个ServerSocket作为服务端并给定一个端口号用于客户端连接,使用telnet作为客户端来连接服务端并实现消息发送和接收,通过代码来分析BIO会在那些地方会阻塞。

public static void main(String[] args) throws IOException {

    // 1. 创建一个BIO服务端 端口号为9999
    ServerSocket serverSocket = new ServerSocket(9998);
    System.out.println("=====等待客户端连接......");

    // 2. 等待客户端连接 , 会阻塞
    Socket socket = serverSocket.accept();
    System.out.println("=====客户端已连接......");

    // 3. 获取客户端发送的内容,如果客户端没有发送内容,也会阻塞
    System.out.println("=====等待客户端发送数据......");
    InputStream inputStream = socket.getInputStream();

    while (true) {
        byte[] bytes = new byte[2048];
        int read = inputStream.read(bytes);
        if (read != -1) {
            System.out.println((Thread.currentThread().getName() + " " + new String(bytes, 0, read)));
        } else {
            break;
        }
    }
    inputStream.close();
    socket.close();

}

简单编码完成后,启动服务,如果在没有客户端连接的情况下,accept()方法会阻塞,直到有客户端进行了连接。

高性能网络通讯框架Netty预热—NIO模型 现在可以打开cmd使用telnet命令来进行连接。连接成功后结合'Ctrl + ]'快捷键进去Telent Client,使用send命令发送数据内容。

telnet 127.0.0.1 9998

高性能网络通讯框架Netty预热—NIO模型

这里可以看到,客户端虽然连接成功了,但是在调用getInputStream()方法时,线程又被阻塞了,那么进行Telnet Client来发送数据。

高性能网络通讯框架Netty预热—NIO模型

一切OK,服务端收到了消息。既然说了,BIO一次连接就是一个线程,那么再发起一个客户端连接,来看看main线程到底还能不能获取到消息。

高性能网络通讯框架Netty预热—NIO模型

经过验证,第二个连接发送的消息,控制台确实没有收到,那就证实了一次连接就是一个线程。那有些人就会有疑问,既然控制台没打印消息,那怎么确保这条消息就被服务端接收了呢?这岂不是很简单,加个线程池,搞成伪异步不就搞定了。那么就基于上面的代码,按照下图流程方式来简单的改造改造。

高性能网络通讯框架Netty预热—NIO模型

// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);

// 1. 创建一个BIO服务端 端口号为9998
ServerSocket serverSocket = new ServerSocket(9998);

while(true){
    // 2. 等待客户端连接 , 会阻塞
    Socket socket = serverSocket.accept();
    executorService.execute(() -> {

        try {
            System.out.println(Thread.currentThread().getName() + " 客户端已连接...");
            // 3. 获取客户端的ip信息
            InetAddress address = socket.getInetAddress();

            System.out.println(Thread.currentThread().getName() + " " + address.getHostName() + " , " + address.getHostAddress());

            // 4. 获取客户端发送的内容,如果客户端没有发送内容,也会阻塞
            InputStream inputStream = socket.getInputStream();

            while (true) {
                byte[] bytes = new byte[2048];
                int read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println((Thread.currentThread().getName() + " " + new String(bytes, 0, read)));
                } else {
                    break;
                }
            }
            inputStream.close();
        } catch (Exception e) {

        } finally {
            try {
                socket.close();
                System.out.println(" socket 关闭连接 ");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    });
}

经过一顿乱敲,代码改造完成,运行一下看看效果,尝试多个客户端连接,结果符合预期。(一次连接一个线程是不是更明显了🤭) 高性能网络通讯框架Netty预热—NIO模型

NIO

经过对BIO的测试发现,BIO对并发支持不好,如果有大批量的客户端连接的服务端,那么服务端就会不断的创建线程,直到撑爆服务器。所以,在JDK1.4+版本,官方提供了另一种IO模型-NIO模型。NIO即非阻塞式IO,目前应用在很多框架或中间件的底层,它基于Reactor模式,面向缓冲区,不面向流。其核心组件有三种,分别为Buffer缓冲区Channel通道Selector选择器

当程序需要与服务端进行数据交互时,并不会向BIO那样,直接发送到服务端,而是将数据发送到Buffer缓冲区,而Buffer缓冲区与Channel通道之间会进行数据交互,Selector会对应一个线程并且会根据事件驱动(Event)来选择哪一个Channel进行处理。当某个Channel上某个请求的事件完全就绪的时候,选择器Selector才会将该任务分配给服务端的一个或多个线程,其他情况服务器的线程可以做其他事情。

高性能网络通讯框架Netty预热—NIO模型

Buffer

Buffer可以简单的理解为一个数组,程序可以向Buffer中写入或者读取数据,数据存储依赖于缓存,在NIO中主要应用在和通道之间进行数据交互。Buffer的实现子类有很多,比如IntBuffer,CharBuffer,DoubleBuffer等,不过使用经常使用的还是ByteBuffer。写一个简单的小demo,来体现一下Buffer如何进行存储和读取的。

public static void main(String[] args) {

    // 1. 创建一个容量为100的ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(100);
    // 2. 向Buffer中写入数据
    buffer.put("byte".getBytes());
    buffer.put("byte02".getBytes());

    System.out.println("limit = "+buffer.limit()+ " , position = "+buffer.position());
    // 3. 切换为读模式
    buffer.flip();
    System.out.println("limit = "+buffer.limit()+ " , position = "+buffer.position());

    // 4. 判断是否还有元素,有则读取
    while(buffer.hasRemaining()){
        System.out.println(new String(new byte[]{buffer.get()}));
    }
}

需要注意的是,如果需要读取缓冲区的数据时,一定要先调用flip()方法,这是因为在源码中有三个重要参数

// 表示当前写入或读取的位置,每当写入或读取时,该值会进行+1操作
private int position = 0;  
// 缓冲区里的数据的总数,代表了当前缓冲区中一共有多少数据
private int limit;
// 缓冲区能够容纳的数据元素的最大数量
private int capacity;

在创建缓冲区时,会指定capacity的大小,此时limit等于capacity,随着不断的写入数据position的值不断的增加,如果position大于capacity时,则会抛出异常。在不调用flip()方法,进行读取数据时,源码中会根据当前的position位置继续向下读,那么读出的数据就会是一个空值。

/**
* 读取数据时,获取索引下标
*/
final int nextGetIndex() {  // package-private
    int p = position;
    if (p >= limit)
        throw new BufferUnderflowException();
    position = p + 1;
    return p;
}

flip方法中所做的事情就是将position赋值给limit,并将自身清零。那么在读取数据时,position就会从0开始读,一直读到limit为止 高性能网络通讯框架Netty预热—NIO模型 所以demo程序的输出结果为

limit = 100 , position = 10
limit = 10 , position = 0

缓冲区的数据是存储在内存中的,这个内存可以是JVM的堆内存,也可以是堆外的内存(堆外内存)。堆外内存的方式可以通过allcateDirect方法进行创建,返回的是DirectByteBuffer对象(直接缓冲区),不受GC影响,使用的是操作系统的物理内存,适合大文件传输等操作。堆内存的方式可以通过allocate方法创建,返回的是HeapByteBuffer对象(非直接缓冲区),会受GC影响。

Channel

Channel是源程序和目标程序之间数据传输的通道,可以通过这个通道进行数据读取或写入,当然数据的读取和写入需要配合Buffer来一起完成。与普通的流相比,Channel是一个双向的通道,而流只能进行单向传输。在NIO中,Channel的实现分为四种,分别为FileChannel, SocketChannel, ServerSocketChannel, DatagramChannel。常用的方法有read(Buffer buffer)write(Buffer buffer),transferFrom(Channel channel,long position,long count)分别表示将Channel中的数据读取到Buffer中,将Buffer中的数据写入到Channel中以及从通道中拷贝数据。

高性能网络通讯框架Netty预热—NIO模型

  • FileChannel

在实现类中FileChannel常被使用,FileChannel即文件通道,用于文件读取,其主要的实现类是FileChannelImpl,但是在使用的过程中是无法直接通过new来创建,可以通过输入流InputStream,输出流OutputStream,RandomAccessFile或者FileChannel提供的open()方法中来获取实例,那么就通过FileChannel来写个文件拷贝的例子。

public static void main(String[] args) {
    // 1. 需要复制的文件
    File file = new File("file-channel.txt");
    // 2. 创建输入流
    FileInputStream inputStream = new FileInputStream(file);
    FileChannel inputStreamChannel = inputStream.getChannel();

    // 3. 创建输出流
    FileOutputStream outputStream = new FileOutputStream("file-channel-copy.txt");
    FileChannel outputStreamChannel = outputStream.getChannel();

    // 4. 创建buffer缓冲
    ByteBuffer buffer = ByteBuffer.allocate((int) file.length());

    // 5. 将通道数据读取到缓冲区
    inputStreamChannel.read(buffer);

    buffer.flip();

    // 6. 将缓冲区写入到通道
    outputStreamChannel.write(buffer);

    /**
    * 除了上面使用的read和write方法,也可以使用transferFrom方法直接copy通道中的数据
    * outputStreamChannel.transferFrom(inputStreamChannel,0,inputStreamChannel.size());
    */

    outputStreamChannel.close();
    inputStreamChannel.close();
}
  • ServerSocketChannel

ServerSocketChannel与BIO中的ServerSocket类似,可以绑定端口并监听TCP连接。在等待客户端连接的过程中,可以通过使用configureBlocking()方法来设置阻塞或非阻塞,如果设置了非阻塞,那么在调用accept()方法时可能会出现NULL值,所以需要注意一下。那么同样搞个demo,来实现双端通信的效果。

public static void main(String[] args) {
    // 1. 打开通道
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open():
    // 2.绑定9999端口
    serverSocketChannel.bind(new InetSocketAddress(9999));
    // 3.设置非阻塞
    serverSocketChannel.configureBlocking(false) ;
    while (true) {
    // 4.客户端连接,因为设置了非阻塞,所以这里可能会为空
    SocketChannel channel = serverSocketChannel.accept():
    if (channel == null) {
        System.out.println("没有客户端连接......");
        continue:
    }
    //5.创建缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024) ;
    // 6.将客户端发来的消息,读取到缓冲区
    int read = channel.read(buffer) ;
    System.out.println(" client message : " + new String(buffer.array() ,  0, read, StandardCharsets.UTF_8):
    // 7.回复消息给客户端
    channel.write(ByteBuffer.wrap(" server received message".getBytes(StandardCharsets.UTF_8)));
    channel.close();
    break;
    }
}
  • SocketChannel

上面服务端代码已经搞定,现在可以编写客户端代码。客户端使用SocketChannel去连接服务端,类似BIO中的Socket。一顿乱敲后,先启动Server,在启动Client就可以实现双端通信了。

public static void main(String[] args) throws IOException{
    // 1. 创建通道
    SocketChannel channel = SocketChannel.open();
    // 2. 通过ip和端口连接server
    channel.connect(new InetSocketAddress("127.0.0.1",9999));
    // 3. 像server发送数据
    channel.write(ByteBuffer.wrap("hello server".getBytes(StandardCharsets.UTF_8)));
    // 4. 创建buffer 用于接收server消息
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    // 5. 将server消息读入buffer
    int read = channel.read(buffer);
    System.out.println(" server message : " + new String(buffer.array() , 0,read, StandardCharsets.UTF_8));
    channel.close();
}

Selector

Selector是NIO中的选择器,主要工作就是通道注册,事件监听,事件选择切换,一个选择器可以注册多个通道。 ServerSocketChannel和SocketChannel都可以注册到选择器中,选择器中通过调用select方法获取通道中所发生的事件,并且根据不同的事件切换到不同的通道。选择器的事件有四种,分别为OP_READ,OP_WRITE,OP_ACCEPT以及OP_CONNECT。

高性能网络通讯框架Netty预热—NIO模型 NIO中,一般是一个单线程处理一个选择器,一个选择器可以监控很多通道。所以,通过选择器,一个单线程可以处理上百个、上千个甚至更多的通道,这样可以减少线程之间上下文的切换。通道和选择器之间通过使用register()方法进行注册,通过 selectedKeys()方法获取通道发生的事件。那先现在就来改造一下上面的代码,通过注册选择器的方式实现双端通信。

  • 服务端
public static void main(String[] args) throws Exception {

    // 1. 创建ServerSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // 2. 创建选择器Selector
    Selector selector = Selector.open();
    // 3. 绑定9999端口
    serverSocketChannel.bind(new InetSocketAddress(9999));
    // 4. 设置非阻塞
    serverSocketChannel.configureBlocking(false);
    // 5. ServerSocketChannel注册到选择器,并监听连接事件
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
        // 6. 没有监听到任何事件
        if (selector.selectNow() == 0) {
            continue;
        }
        // 7. 监听到事件
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
        while (keyIterator.hasNext()) {
            SelectionKey key = keyIterator.next();
            // 8. 如果是客户端连接事件
            if (key.isAcceptable()) {
                // 8.1 创建SocketChannel,注册到选择器并监听读事件
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                serverSendMsg(socketChannel, ByteBuffer.wrap("hello client".getBytes(StandardCharsets.UTF_8)));
            } else if (key.isReadable()) { // 9. 如果客户端是读事件
                // 9.1 获取事件通道,并读取通道数据
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                int read = socketChannel.read(byteBuffer);
                System.out.println("[ client message ] : " + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
                serverSendMsg(socketChannel, ByteBuffer.wrap("receive client message".getBytes(StandardCharsets.UTF_8)));
            }
            keyIterator.remove();
        }
    }
}
/** 
* 发送消息给客户端
*/
public static void serverSendMsg(SocketChannel socketChannel, ByteBuffer byteBuffer) throws Exception {
    socketChannel.write(byteBuffer);
}
  • 客户端
public static void main(String[] args) throws Exception {
    SocketChannel channel = SocketChannel.open();
    channel.configureBlocking(false);
    boolean connect = channel.connect(new InetSocketAddress("127.0.0.1", 9999));
    if (!connect) {
        while (!channel.finishConnect()) {
            System.out.println("服务连接中.....");
        }
    }
    receiveServerMsg(channel);
    // 发送消息给服务端
    channel.write(ByteBuffer.wrap("hello server".getBytes(StandardCharsets.UTF_8)));
    receiveServerMsg(channel);
    new CountDownLatch(1).await();

}
/**
* 接收服务响应的消息
*/
public static void receiveServerMsg(SocketChannel channel) throws Exception {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int read = channel.read(buffer);
    System.out.println("[ server message ]: " + new String(buffer.array(), 0, read, StandardCharsets.UTF_8));
}
  • 运行结果 高性能网络通讯框架Netty预热—NIO模型

总结

以上部分简单的介绍了BIO与NIO,其中着重描述了NIO的特性(为了后续的Netty)。BIO基于字节流和字符流进行操作的,而NIO基于Channel(通道)和Buffer(缓冲区)进行操作的,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道事件,因此使用单个线程就可以监听多个客户端通道。下面表格是两种IO方式式的对比。

阻塞状态实现方式程度效率数据处理
BIO阻塞简单面向流
NIO非阻塞相对复杂面向缓冲区