likes
comments
collection
share

NIO详解

作者站长头像
站长
· 阅读数 61

NIO是JDK1.4新加入的API,很多人把它称为Not Blocking IO,意为非阻塞式IO,它是相较于传统的IO方式而言的,NIO在数据打包和传输方式上进行了较大的改良,增加了缓冲区、通道等等内容,使得NIO能够借助一个线程处理多个资源,读写效率也大大提升。

缓冲区Buffer

先来介绍NIO中的一个基本概念,缓冲区。在NIO中,任何数据的读写都需要借助缓冲区,你可以把缓冲区理解成一个数组,当要写入数据时就向数组中存值,当要读取数据时就从数组中取值。

创建缓冲区

对于缓冲区的创建,JDK提供了两种方式(以字节缓冲区ByteBuffer为例):

  1. allocate
  2. wrap

其中allocate用于创建一个指定大小的缓冲区:

@Test
public void buffer(){
    // 指定长度的缓冲区
    ByteBuffer byteBuffer = ByteBuffer.allocate(5);
    for (int i = 0; i < 5; i++) {
        // 从缓冲区中获取数据
        System.out.print(byteBuffer.get() + "\t");
    }
}

运行结果:

0	0	0	0	0

通过缓冲区的get方法可以获取缓冲区中的数据,初始为数据类型的零值

而wrap方法可以创建一个指定内容的缓冲区:

@Test
public void buffer(){
    // 指定内容的缓冲区
    ByteBuffer wrap = ByteBuffer.wrap("test".getBytes());
    for (int i = 0; i < 4; i++) {
        System.out.print((char) wrap.get() + "\t");
    }
}

运行结果:

t	e	s	t	

那么缓冲区内部的具体结构是如何的呢?数据的存取是怎样进行的呢?你需要了解缓冲区中的几个标记:

  1. position:当前索引位置
  2. limit:最大索引位置
  3. capacity:缓冲区的总容量
  4. remaining:缓冲区的剩余容量

来创建一个容量为10的缓冲区,然后分别输出这些标记的值:

@Test
public void buffer(){
    ByteBuffer allocate = ByteBuffer.allocate(10);
    System.out.print(allocate.position() + "\t"); // 当前索引位置
    System.out.print(allocate.limit() + "\t"); // 最大索引位置,初始等于缓冲区大小
    System.out.print(allocate.capacity() + "\t"); // 返回缓冲区的总长度
    System.out.print(allocate.remaining() + "\t"); // 剩余能操作的容量(limit - position)
}

运行结果:

0	10	10	10

每个标记的位置如下图所示:

NIO详解

position指向的是当前索引位置,当向缓冲区中添加数据时,position便会随之移动,而limit指向的是最大索引位置(初始等于capacity),即position最大不会等于limit,remaining为缓冲区的剩余容量,remaining = limit - position

向缓冲区添加数据

现在向缓冲区添加一个数据:

// 向缓冲区添加一个字节
allocate.put((byte) 97);

此时缓冲区标记会如何变化呢?首先position会右移一位,然后remaining变为9,其它的不影响,如下图所示:

NIO详解

我们可以试验一下是不是这样:

@Test
public void buffer(){
    ByteBuffer allocate = ByteBuffer.allocate(10);
    // 向缓冲区添加一个字节
    allocate.put((byte) 97);
    System.out.print(allocate.position() + "\t");
    System.out.print(allocate.limit() + "\t");
    System.out.print(allocate.capacity() + "\t");
    System.out.print(allocate.remaining() + "\t");
}

运行结果:

1	10	10	9

结果正如我们所料。

缓冲区的put方法还能够传递一个数组,将一串数据进行添加:

// 向缓冲区添加一个字节
allocate.put("0123456789".getBytes());

若是当前缓冲区已经满了,则再向一个满的缓冲区添加数据会抛出异常:

@Test
public void buffer(){
    ByteBuffer allocate = ByteBuffer.allocate(10);
    allocate.put("0123456789".getBytes());
    allocate.put((byte) 1);
}

运行结果:

java.nio.BufferOverflowException
	at java.nio.Buffer.nextPutIndex(Buffer.java:521)
    at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:169)
    at com.wwj.nio.BufferDemo.buffer(BufferDemo.java:144)

通过缓冲区的hasRemaining方法可以判断当前缓冲区是否还能够继续添加数据:

@Test
public void buffer(){
    ByteBuffer allocate = ByteBuffer.allocate(10);
    System.out.println(allocate.hasRemaining());
    allocate.put("0123456789".getBytes());
    System.out.println(allocate.hasRemaining());
}

运行结果:

true
false

缓冲区支持动态修改标记位置,以达到重新写入的需求:

@Test
public void buffer(){
    ByteBuffer allocate = ByteBuffer.allocate(10);
    allocate.put("0123456789".getBytes());
    // 修改当前索引位置
    allocate.position(0);
    allocate.put((byte) 1);
    System.out.print(allocate.position() + "\t");
    System.out.print(allocate.limit() + "\t");
    System.out.print(allocate.capacity() + "\t");
    System.out.print(allocate.remaining() + "\t");
}

运行结果:

1	10	10	9

把position位置修改为0之后,又相当于对一个空的缓冲区进行操作了。

读取缓冲区数据

接下来介绍一下缓冲区数据的读取,在最开始我们已经使用过get方法来读取缓冲区的数据了,如下:

@Test
public void buffer() {
    ByteBuffer allocate = ByteBuffer.allocate(10);
    allocate.put("0123".getBytes());
    for (int i = 0; i < 4; i++) {
        System.out.print(allocate.get() + "\t");
    }
}

大家可以猜一猜运行结果是什么呢:

0	0	0	0

也许有同学很奇怪,为什么添加的数据没有被读取出来,其实,如果你掌握了缓冲区中的标记,就能明白是为什么。

在创建了一个容量为10的缓冲区之后,标记如下图所示:

NIO详解

当向缓冲区添加了一个字节数组后,标记发生了变化:

NIO详解

此时我们调用get方法进行读取,它将从position位置也就是索引4位置开始往后读取,这样读取到的数据当然就是0了,若是想读取添加到缓冲区中的数据,则需要将position移动到索引0位置才行,不过JDK已经提供了这样的方法给我们:

@Test
public void buffer() {
    ByteBuffer allocate = ByteBuffer.allocate(10);
    allocate.put("0123".getBytes());
    // 切换为读模式
    allocate.flip();
    for (int i = 0; i < allocate.limit(); i++) {
        System.out.print((char) allocate.get() + "\t");
    }
}

运行结果:

0	1	2	3

查看一下flip方法的源码:

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

关键就在于limit = positionposition = 0,通过改变这两个标记后:

NIO详解

position重新回到了索引0的位置,这样就可以进行正常的读取了,而limit也修改为了写入数据的末尾位置,可以通过判断limit来终止读取条件。

与写入数据一样,缓冲区在读取数据的时候,也会不停地移动position,当所有数据都被读取后,再次读取数据将会抛出异常,因为position必须小于等于limit:

@Test
public void buffer() {
    ByteBuffer allocate = ByteBuffer.allocate(10);
    allocate.put("0123".getBytes());
    // 切换为读模式
    allocate.flip();
    for (int i = 0; i < allocate.limit(); i++) {
        System.out.print((char) allocate.get() + "\t");
    }
    allocate.get();
}

运行结果:

0	1	2	3	
java.nio.BufferUnderflowException
	at java.nio.Buffer.nextGetIndex(Buffer.java:500)
	at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:135)
	at com.wwj.nio.BufferDemo.buffer(BufferDemo.java:148)

但是通过索引读取数据将不会判断position是否小于等于limit,也不会移动position:

@Test
public void buffer() {
    ByteBuffer allocate = ByteBuffer.allocate(10);
    allocate.put("0123".getBytes());
    // 切换为读模式
    allocate.flip();
    for (int i = 0; i < allocate.limit(); i++) {
        System.out.print((char) allocate.get() + "\t");
    }
    // 通过索引读取数据
    System.out.println((char) allocate.get(1));
}

运行结果:

0	1	2	3	1

数据读取完毕后,若是想重新对该缓冲区进行读取,则可以将position手动置为0,也可以调用JDK提供的方法:

// 调用rewind方法可以将当前索引重置为0
allocate.rewind();

rewind方法内部也是对position进行赋值为0的操作:

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

若是想重新对缓冲区进行写入,则调用clear方法:

// 切换写模式,此时会将当前索引置为0,将最大索引置为缓冲区容量
allocate.clear();

注意rewind方法和clear方法的区别,它们虽然都会将position置为0,但是clear方法还会将limit置为capacity的值,所以当想要再次读取缓冲区中的数据时,则可以调用rewind方法;当想要再次写入数据到缓冲区时,则可以调用clear方法。

来验证一下:

@Test
public void buffer() {
    ByteBuffer allocate = ByteBuffer.allocate(10);
    allocate.put("0123".getBytes());
    // 切换为读模式
    allocate.flip();
    for(int i = 0;i <allocate.limit();++i){
        allocate.get();
    }
    System.out.print("position:" + allocate.position() + "\t");
    System.out.print("limit:" + allocate.limit() + "\t");
    System.out.print("capacity:" + allocate.capacity() + "\t");
    System.out.print("remaining:" + allocate.remaining() + "\t");
    System.out.println();

    allocate.rewind();
    System.out.print("position:" + allocate.position() + "\t");
    System.out.print("limit:" + allocate.limit() + "\t");
    System.out.print("capacity:" + allocate.capacity() + "\t");
    System.out.print("remaining:" + allocate.remaining() + "\t");
    System.out.println();

    allocate.clear();
    System.out.print("position:" + allocate.position() + "\t");
    System.out.print("limit:" + allocate.limit() + "\t");
    System.out.print("capacity:" + allocate.capacity() + "\t");
    System.out.print("remaining:" + allocate.remaining() + "\t");
}

运行结果:

position:4		limit:4		capacity:10		remaining:0	
position:0		limit:4		capacity:10		remaining:4	
position:0		limit:10	capacity:10		remaining:10	

通道Channel

NIO借助通道能够实现双向的数据传递,在传统的IO中,流分为输入流和输出流,输入流只能实现数据的读取,输出流只能实现数据的写入,比如下面的这段代码:

public static void main(String[] args) throws Exception {
    while (true) {
        Socket socket = new Socket("127.0.0.1", 9999);
        // 向服务端发送消息
        OutputStream output = socket.getOutputStream();
        System.out.println("请输入内容:");
        Scanner scanner = new Scanner(System.in);
        String s = scanner.nextLine();
        output.write(s.getBytes());
        // 接收服务端发送过来的消息
        InputStream input = socket.getInputStream();
        byte[] buffer = new byte[1024];
        int read = input.read(buffer);
        System.out.println(new String(buffer, 0, read).trim());
        socket.close();
    }
}

这是一段Socket的客户端代码,对于数据的写入和读取需要分别获取输出流和输入流,再来看Socket的服务端:

public static void main(String[] args) throws Exception {
    ExecutorService executorService = Executors.newCachedThreadPool();
    // 创建socket
    ServerSocket socket = new ServerSocket(9999);
    while (true) {
        // 等待客户端接入
        Socket accept = socket.accept();
        // 处理接入
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                handler(accept); // 此情况会导致只能处理一个客户端连接,若是想处理多个客户端,需使用线程
            }
        });
    }
}

private static void handler(Socket socket) {
    try {
        // 获取输入流
        byte[] buffer = new byte[1024];
        InputStream inputStream = socket.getInputStream();
        // 接收数据
        int len = inputStream.read(buffer);
        System.out.println(new String(buffer, 0, len));
        // 发送消息
        OutputStream outputStream = socket.getOutputStream();
        outputStream.write("好的!".getBytes());
    } catch (IOException e) {
        e.printStackTrace();
    }
}

其中accept方法是阻塞方法,在客户端未接入服务端之前,这个方法会一直被阻塞,而handler中的read方法也会因为等待客户端写入数据而一直阻塞,这将导致一个线程只能为一个客户端服务。

再来看看NIO是如何实现Socket通信的,首先是服务端:

public static void main(String[] args) throws Exception {
    // 打开一个服务端通道
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // 绑定端口
    serverSocketChannel.bind(new InetSocketAddress(9999));
    // 通道默认是阻塞的,需要设置为非阻塞
    serverSocketChannel.configureBlocking(false);
    System.out.println("服务端启动......");
    while (true) {
        // 检查是否有客户端连接,若有客户端连接则会返回连接通道
        SocketChannel channel = serverSocketChannel.accept();
        if (channel == null) {
            System.out.println("没有客户端连接......");
            Thread.sleep(1000);
            continue;
        }
        // 获取客户端发送过来的数据,并把数据放入缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int read = channel.read(byteBuffer);
        System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read));
        // 给客户端回写数据
        channel.write(ByteBuffer.wrap("这是服务端回写的数据".getBytes()));
        // 释放资源
        channel.close();
    }
}

服务端会开启一个通道,在通道内进行数据的传递,这个通道是双向的,可进行读取和写入数据两种操作,并且accept方法也不会阻塞,而是立马返回一个通道,若是返回的通道为空,则没有客户端连接;若是有通道,则有客户端连接。

客户端代码如下:

public static void main(String[] args) throws Exception {
    // 打开通道
    SocketChannel socketChannel = SocketChannel.open();
    // 设置服务端IP和端口
    socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
    // 写入数据
    socketChannel.write(ByteBuffer.wrap("这是客户端发送的数据".getBytes()));
    // 获取服务端响应的数据
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    int read = socketChannel.read(byteBuffer);
    System.out.println("服务端消息:" + new String(byteBuffer.array(), 0, read));
    // 释放资源
    socketChannel.close();
}

打开通道写入数据即可,我们运行起来看看结果:

没有客户端连接......
没有客户端连接......
没有客户端连接......
客户端消息:这是客户端发送的数据
没有客户端连接......
没有客户端连接......
没有客户端连接......

选择器Selector

使用通道后的Socket在连接时虽然不会产生阻塞,但是若想在发送数据之前进行一些耗时的处理,则服务端会停在read方法上一直阻塞。NIO提供了Selector选择器,通过选择器可以实现在真正发生读写数据的情况下才进行读写,并且可以使用一个线程处理多个通道,如下所示:

NIO详解

Selector能够监听多个Channel上的事件,当发生读写事件时,才会真正去处理读写数据,避免了资源的浪费。

Selector通过调用selectedKeys方法可以获取一个SelectKey的集合,在SelectKey中便定义了一些事件类型,分别如下:

  1. OP_ACCEPT:连接继续事件,表示服务端监听到了客户端的连接,此时还未建立连接
  2. OP_CONNECT:连接就绪事件,此时服务端已经与客户端成功建立连接
  3. OP_READ:读就绪事件,表示通道中有待读取的数据,可以执行读操作
  4. OP_WRITE:写就绪事件,表示通道可以执行写操作

加上了Selector选择器之后,Socket服务端的实现就发生了一些变化:

public static void main(String[] args) throws Exception {
    // 打开通道
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // 绑定端口
    serverSocketChannel.bind(new InetSocketAddress(9999));
    // 设置通道为非阻塞
    serverSocketChannel.configureBlocking(false);
    // 创建选择器
    Selector selector = Selector.open();
    // 将通道注册到选择器上,并监听ACCEPT事件
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (true) {
        // 检查选择器是否有事件
        int select = selector.select(2000);
        if (select == 0) {
            System.out.println("暂未监听到事件......");
            continue;
        }
        // 获取事件集合
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        // 遍历事件集合
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey selectionKey = iterator.next();
            // 判断事件是否为ACCEPT事件
            if (selectionKey.isAcceptable()) {
                // 得到客户端通道,将通道注册到选择器上,并监听READ事件
                SocketChannel socketChannel = serverSocketChannel.accept();
                System.out.println("客户端已连接......");
                // 必须将通道设置为非阻塞
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            // 判断事件是否为READ事件
            if (selectionKey.isReadable()) {
                // 得到客户端通道,读取数据
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int read = socketChannel.read(byteBuffer);
                System.out.println("接收到客户端的消息:" + new String(byteBuffer.array(), 0, read));
                // 向客户端回写数据
                socketChannel.write(ByteBuffer.wrap("这是一条服务端发送的消息".getBytes()));
                // 释放资源
                socketChannel.close();
            }
            // 从事件集合中删除对应的事件,防止重复处理
            selectionKeys.remove(selectionKey);
        }
    }
}

具体流程是这样:

  1. 首先打开一个服务端的通道,绑定端口,并设置通道为非阻塞的
  2. 创建选择器,并将通道注册到选择器上,并监听连接继续事件(OP_ACCEPT)
  3. 此时循环检查选择器中是否有事件,若select方法的返回值为0,则没有事件发生;若不为0,则处理事件
  4. 从选择器上获取所有事件的集合,并开始遍历事件集合
  5. 若是当前事件为连接继续事件,则调用accept方法得到客户端的通道,设置客户端通道为非阻塞的,并将其也注册到选择器上,监听读就绪事件(OP_READ)
  6. 若是当前事件为读就绪事件,则得到客户端通道,并借助缓冲区读取数据
  7. 最后将处理完成的事件从事件集合中删除,防止重复处理

客户端实现无需变化:

public static void main(String[] args) throws Exception {
    // 打开通道
    SocketChannel socketChannel = SocketChannel.open();
    // 设置服务端IP和端口
    socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
    // 写入数据
    socketChannel.write(ByteBuffer.wrap("这是客户端发送的数据".getBytes()));
    // 获取服务端响应的数据
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    int read = socketChannel.read(byteBuffer);
    System.out.println("服务端消息:" + new String(byteBuffer.array(), 0, read));
    // 释放资源
    socketChannel.close();
}

当客户端连接上服务端时,就会触发连接继续事件,服务端会对该事件进行处理,并注册读就绪事件,此时当客户端发送数据时,又会触发读就绪事件,服务端处理得到数据,这就是Selector选择器的运行过程。

Selector选择器的优势在于一个线程能够处理多个客户端通道,当有多个通道待处理时,选择器会通过轮询的方式获取每个通道上的事件并进行处理。

转载自:https://juejin.cn/post/7243780412546957368
评论
请登录