NIO详解
NIO是JDK1.4新加入的API,很多人把它称为Not Blocking IO,意为非阻塞式IO,它是相较于传统的IO方式而言的,NIO在数据打包和传输方式上进行了较大的改良,增加了缓冲区、通道等等内容,使得NIO能够借助一个线程处理多个资源,读写效率也大大提升。
缓冲区Buffer
先来介绍NIO中的一个基本概念,缓冲区。在NIO中,任何数据的读写都需要借助缓冲区,你可以把缓冲区理解成一个数组,当要写入数据时就向数组中存值,当要读取数据时就从数组中取值。
创建缓冲区
对于缓冲区的创建,JDK提供了两种方式(以字节缓冲区ByteBuffer为例):
- allocate
- wrap
其中allocate用于创建一个指定大小的缓冲区:
@Test
public void buffer(){
// 指定长度的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(5);
for (int i = 0; i < 5; i++) {
// 从缓冲区中获取数据
System.out.print(byteBuffer.get() + "\t");
}
}
运行结果:
0 0 0 0 0
通过缓冲区的get方法可以获取缓冲区中的数据,初始为数据类型的零值
。
而wrap方法可以创建一个指定内容的缓冲区:
@Test
public void buffer(){
// 指定内容的缓冲区
ByteBuffer wrap = ByteBuffer.wrap("test".getBytes());
for (int i = 0; i < 4; i++) {
System.out.print((char) wrap.get() + "\t");
}
}
运行结果:
t e s t
那么缓冲区内部的具体结构是如何的呢?数据的存取是怎样进行的呢?你需要了解缓冲区中的几个标记:
- position:当前索引位置
- limit:最大索引位置
- capacity:缓冲区的总容量
- remaining:缓冲区的剩余容量
来创建一个容量为10的缓冲区,然后分别输出这些标记的值:
@Test
public void buffer(){
ByteBuffer allocate = ByteBuffer.allocate(10);
System.out.print(allocate.position() + "\t"); // 当前索引位置
System.out.print(allocate.limit() + "\t"); // 最大索引位置,初始等于缓冲区大小
System.out.print(allocate.capacity() + "\t"); // 返回缓冲区的总长度
System.out.print(allocate.remaining() + "\t"); // 剩余能操作的容量(limit - position)
}
运行结果:
0 10 10 10
每个标记的位置如下图所示:
position指向的是当前索引位置,当向缓冲区中添加数据时,position便会随之移动,而limit指向的是最大索引位置(初始等于capacity),即position最大不会等于limit,remaining为缓冲区的剩余容量,remaining = limit - position
。
向缓冲区添加数据
现在向缓冲区添加一个数据:
// 向缓冲区添加一个字节
allocate.put((byte) 97);
此时缓冲区标记会如何变化呢?首先position会右移一位,然后remaining变为9,其它的不影响,如下图所示:
我们可以试验一下是不是这样:
@Test
public void buffer(){
ByteBuffer allocate = ByteBuffer.allocate(10);
// 向缓冲区添加一个字节
allocate.put((byte) 97);
System.out.print(allocate.position() + "\t");
System.out.print(allocate.limit() + "\t");
System.out.print(allocate.capacity() + "\t");
System.out.print(allocate.remaining() + "\t");
}
运行结果:
1 10 10 9
结果正如我们所料。
缓冲区的put方法还能够传递一个数组,将一串数据进行添加:
// 向缓冲区添加一个字节
allocate.put("0123456789".getBytes());
若是当前缓冲区已经满了,则再向一个满的缓冲区添加数据会抛出异常:
@Test
public void buffer(){
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123456789".getBytes());
allocate.put((byte) 1);
}
运行结果:
java.nio.BufferOverflowException
at java.nio.Buffer.nextPutIndex(Buffer.java:521)
at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:169)
at com.wwj.nio.BufferDemo.buffer(BufferDemo.java:144)
通过缓冲区的hasRemaining方法可以判断当前缓冲区是否还能够继续添加数据:
@Test
public void buffer(){
ByteBuffer allocate = ByteBuffer.allocate(10);
System.out.println(allocate.hasRemaining());
allocate.put("0123456789".getBytes());
System.out.println(allocate.hasRemaining());
}
运行结果:
true
false
缓冲区支持动态修改标记位置,以达到重新写入的需求:
@Test
public void buffer(){
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123456789".getBytes());
// 修改当前索引位置
allocate.position(0);
allocate.put((byte) 1);
System.out.print(allocate.position() + "\t");
System.out.print(allocate.limit() + "\t");
System.out.print(allocate.capacity() + "\t");
System.out.print(allocate.remaining() + "\t");
}
运行结果:
1 10 10 9
把position位置修改为0之后,又相当于对一个空的缓冲区进行操作了。
读取缓冲区数据
接下来介绍一下缓冲区数据的读取,在最开始我们已经使用过get方法来读取缓冲区的数据了,如下:
@Test
public void buffer() {
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
for (int i = 0; i < 4; i++) {
System.out.print(allocate.get() + "\t");
}
}
大家可以猜一猜运行结果是什么呢:
0 0 0 0
也许有同学很奇怪,为什么添加的数据没有被读取出来,其实,如果你掌握了缓冲区中的标记,就能明白是为什么。
在创建了一个容量为10的缓冲区之后,标记如下图所示:
当向缓冲区添加了一个字节数组后,标记发生了变化:
此时我们调用get方法进行读取,它将从position位置也就是索引4位置开始往后读取,这样读取到的数据当然就是0了,若是想读取添加到缓冲区中的数据,则需要将position移动到索引0位置才行,不过JDK已经提供了这样的方法给我们:
@Test
public void buffer() {
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
// 切换为读模式
allocate.flip();
for (int i = 0; i < allocate.limit(); i++) {
System.out.print((char) allocate.get() + "\t");
}
}
运行结果:
0 1 2 3
查看一下flip方法的源码:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
关键就在于limit = position
和position = 0
,通过改变这两个标记后:
position重新回到了索引0的位置,这样就可以进行正常的读取了,而limit也修改为了写入数据的末尾位置,可以通过判断limit来终止读取条件。
与写入数据一样,缓冲区在读取数据的时候,也会不停地移动position,当所有数据都被读取后,再次读取数据将会抛出异常,因为position必须小于等于limit:
@Test
public void buffer() {
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
// 切换为读模式
allocate.flip();
for (int i = 0; i < allocate.limit(); i++) {
System.out.print((char) allocate.get() + "\t");
}
allocate.get();
}
运行结果:
0 1 2 3
java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:500)
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:135)
at com.wwj.nio.BufferDemo.buffer(BufferDemo.java:148)
但是通过索引读取数据将不会判断position是否小于等于limit,也不会移动position:
@Test
public void buffer() {
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
// 切换为读模式
allocate.flip();
for (int i = 0; i < allocate.limit(); i++) {
System.out.print((char) allocate.get() + "\t");
}
// 通过索引读取数据
System.out.println((char) allocate.get(1));
}
运行结果:
0 1 2 3 1
数据读取完毕后,若是想重新对该缓冲区进行读取,则可以将position手动置为0,也可以调用JDK提供的方法:
// 调用rewind方法可以将当前索引重置为0
allocate.rewind();
rewind方法内部也是对position进行赋值为0的操作:
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
若是想重新对缓冲区进行写入,则调用clear方法:
// 切换写模式,此时会将当前索引置为0,将最大索引置为缓冲区容量
allocate.clear();
注意rewind方法和clear方法的区别,它们虽然都会将position置为0,但是clear方法还会将limit置为capacity的值,所以当想要再次读取缓冲区中的数据时,则可以调用rewind方法;当想要再次写入数据到缓冲区时,则可以调用clear方法。
来验证一下:
@Test
public void buffer() {
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
// 切换为读模式
allocate.flip();
for(int i = 0;i <allocate.limit();++i){
allocate.get();
}
System.out.print("position:" + allocate.position() + "\t");
System.out.print("limit:" + allocate.limit() + "\t");
System.out.print("capacity:" + allocate.capacity() + "\t");
System.out.print("remaining:" + allocate.remaining() + "\t");
System.out.println();
allocate.rewind();
System.out.print("position:" + allocate.position() + "\t");
System.out.print("limit:" + allocate.limit() + "\t");
System.out.print("capacity:" + allocate.capacity() + "\t");
System.out.print("remaining:" + allocate.remaining() + "\t");
System.out.println();
allocate.clear();
System.out.print("position:" + allocate.position() + "\t");
System.out.print("limit:" + allocate.limit() + "\t");
System.out.print("capacity:" + allocate.capacity() + "\t");
System.out.print("remaining:" + allocate.remaining() + "\t");
}
运行结果:
position:4 limit:4 capacity:10 remaining:0
position:0 limit:4 capacity:10 remaining:4
position:0 limit:10 capacity:10 remaining:10
通道Channel
NIO借助通道能够实现双向的数据传递,在传统的IO中,流分为输入流和输出流,输入流只能实现数据的读取,输出流只能实现数据的写入,比如下面的这段代码:
public static void main(String[] args) throws Exception {
while (true) {
Socket socket = new Socket("127.0.0.1", 9999);
// 向服务端发送消息
OutputStream output = socket.getOutputStream();
System.out.println("请输入内容:");
Scanner scanner = new Scanner(System.in);
String s = scanner.nextLine();
output.write(s.getBytes());
// 接收服务端发送过来的消息
InputStream input = socket.getInputStream();
byte[] buffer = new byte[1024];
int read = input.read(buffer);
System.out.println(new String(buffer, 0, read).trim());
socket.close();
}
}
这是一段Socket的客户端代码,对于数据的写入和读取需要分别获取输出流和输入流,再来看Socket的服务端:
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
// 创建socket
ServerSocket socket = new ServerSocket(9999);
while (true) {
// 等待客户端接入
Socket accept = socket.accept();
// 处理接入
executorService.execute(new Runnable() {
@Override
public void run() {
handler(accept); // 此情况会导致只能处理一个客户端连接,若是想处理多个客户端,需使用线程
}
});
}
}
private static void handler(Socket socket) {
try {
// 获取输入流
byte[] buffer = new byte[1024];
InputStream inputStream = socket.getInputStream();
// 接收数据
int len = inputStream.read(buffer);
System.out.println(new String(buffer, 0, len));
// 发送消息
OutputStream outputStream = socket.getOutputStream();
outputStream.write("好的!".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
其中accept方法是阻塞方法,在客户端未接入服务端之前,这个方法会一直被阻塞,而handler中的read方法也会因为等待客户端写入数据而一直阻塞,这将导致一个线程只能为一个客户端服务。
再来看看NIO是如何实现Socket通信的,首先是服务端:
public static void main(String[] args) throws Exception {
// 打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(9999));
// 通道默认是阻塞的,需要设置为非阻塞
serverSocketChannel.configureBlocking(false);
System.out.println("服务端启动......");
while (true) {
// 检查是否有客户端连接,若有客户端连接则会返回连接通道
SocketChannel channel = serverSocketChannel.accept();
if (channel == null) {
System.out.println("没有客户端连接......");
Thread.sleep(1000);
continue;
}
// 获取客户端发送过来的数据,并把数据放入缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = channel.read(byteBuffer);
System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read));
// 给客户端回写数据
channel.write(ByteBuffer.wrap("这是服务端回写的数据".getBytes()));
// 释放资源
channel.close();
}
}
服务端会开启一个通道,在通道内进行数据的传递,这个通道是双向的,可进行读取和写入数据两种操作,并且accept方法也不会阻塞,而是立马返回一个通道,若是返回的通道为空,则没有客户端连接;若是有通道,则有客户端连接。
客户端代码如下:
public static void main(String[] args) throws Exception {
// 打开通道
SocketChannel socketChannel = SocketChannel.open();
// 设置服务端IP和端口
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
// 写入数据
socketChannel.write(ByteBuffer.wrap("这是客户端发送的数据".getBytes()));
// 获取服务端响应的数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
System.out.println("服务端消息:" + new String(byteBuffer.array(), 0, read));
// 释放资源
socketChannel.close();
}
打开通道写入数据即可,我们运行起来看看结果:
没有客户端连接......
没有客户端连接......
没有客户端连接......
客户端消息:这是客户端发送的数据
没有客户端连接......
没有客户端连接......
没有客户端连接......
选择器Selector
使用通道后的Socket在连接时虽然不会产生阻塞,但是若想在发送数据之前进行一些耗时的处理,则服务端会停在read方法上一直阻塞。NIO提供了Selector选择器,通过选择器可以实现在真正发生读写数据的情况下才进行读写,并且可以使用一个线程处理多个通道,如下所示:
Selector能够监听多个Channel上的事件,当发生读写事件时,才会真正去处理读写数据,避免了资源的浪费。
Selector通过调用selectedKeys方法可以获取一个SelectKey的集合,在SelectKey中便定义了一些事件类型,分别如下:
- OP_ACCEPT:连接继续事件,表示服务端监听到了客户端的连接,此时还未建立连接
- OP_CONNECT:连接就绪事件,此时服务端已经与客户端成功建立连接
- OP_READ:读就绪事件,表示通道中有待读取的数据,可以执行读操作
- OP_WRITE:写就绪事件,表示通道可以执行写操作
加上了Selector选择器之后,Socket服务端的实现就发生了一些变化:
public static void main(String[] args) throws Exception {
// 打开通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(9999));
// 设置通道为非阻塞
serverSocketChannel.configureBlocking(false);
// 创建选择器
Selector selector = Selector.open();
// 将通道注册到选择器上,并监听ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 检查选择器是否有事件
int select = selector.select(2000);
if (select == 0) {
System.out.println("暂未监听到事件......");
continue;
}
// 获取事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历事件集合
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 判断事件是否为ACCEPT事件
if (selectionKey.isAcceptable()) {
// 得到客户端通道,将通道注册到选择器上,并监听READ事件
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端已连接......");
// 必须将通道设置为非阻塞
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 判断事件是否为READ事件
if (selectionKey.isReadable()) {
// 得到客户端通道,读取数据
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
System.out.println("接收到客户端的消息:" + new String(byteBuffer.array(), 0, read));
// 向客户端回写数据
socketChannel.write(ByteBuffer.wrap("这是一条服务端发送的消息".getBytes()));
// 释放资源
socketChannel.close();
}
// 从事件集合中删除对应的事件,防止重复处理
selectionKeys.remove(selectionKey);
}
}
}
具体流程是这样:
- 首先打开一个服务端的通道,绑定端口,并设置通道为非阻塞的
- 创建选择器,并将通道注册到选择器上,并监听连接继续事件(OP_ACCEPT)
- 此时循环检查选择器中是否有事件,若select方法的返回值为0,则没有事件发生;若不为0,则处理事件
- 从选择器上获取所有事件的集合,并开始遍历事件集合
- 若是当前事件为连接继续事件,则调用accept方法得到客户端的通道,设置客户端通道为非阻塞的,并将其也注册到选择器上,监听读就绪事件(OP_READ)
- 若是当前事件为读就绪事件,则得到客户端通道,并借助缓冲区读取数据
- 最后将处理完成的事件从事件集合中删除,防止重复处理
客户端实现无需变化:
public static void main(String[] args) throws Exception {
// 打开通道
SocketChannel socketChannel = SocketChannel.open();
// 设置服务端IP和端口
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
// 写入数据
socketChannel.write(ByteBuffer.wrap("这是客户端发送的数据".getBytes()));
// 获取服务端响应的数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
System.out.println("服务端消息:" + new String(byteBuffer.array(), 0, read));
// 释放资源
socketChannel.close();
}
当客户端连接上服务端时,就会触发连接继续事件,服务端会对该事件进行处理,并注册读就绪事件,此时当客户端发送数据时,又会触发读就绪事件,服务端处理得到数据,这就是Selector选择器的运行过程。
Selector选择器的优势在于一个线程能够处理多个客户端通道,当有多个通道待处理时,选择器会通过轮询的方式获取每个通道上的事件并进行处理。
转载自:https://juejin.cn/post/7243780412546957368