高性能网络通讯框架Netty预热—NIO模型
Netty是一款高性能的网络通信框架,其应用也很广泛,比如常用的消息队列RocketMQ,RPC框架Dubbo在底层都有使用到Netty。在学习Netty之前,我们需要对IO模型要有一定的了解,其中最重要的就是NIO,所以今天打算先对NIO进行一些简单的梳理。
IO模型
常见IO模型分为几种:
- BIO :Blocking IO, 即同步阻塞式IO。Client和Server的每建立一次连接,都会创建一个线程,在Client等待Server响应的期间,会处于阻塞状态。
- NIO :Non-Blocking IO,即同步非阻塞式IO。NIO是基于Reactor模式,面向缓冲区并结合通道的IO模型。客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求就进行处理。
- AIO : Asynchronous IO,即异步非阻塞,采用了 Proactor 模式,特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
既然BIO和NIO都是以同步的方式工作的,那么这里就先拿BIO与NIO做个简单的对比,比较两者的差异具体在哪些地方。
BIO
上面提到了Client和Server的每建立一次连接,都会创建一个线程并且会发生阻塞,那么我们就来简单的验证一下。验证方式也比较简单,在编辑器中创建一个ServerSocket作为服务端并给定一个端口号用于客户端连接,使用telnet作为客户端来连接服务端并实现消息发送和接收,通过代码来分析BIO会在那些地方会阻塞。
public static void main(String[] args) throws IOException {
// 1. 创建一个BIO服务端 端口号为9999
ServerSocket serverSocket = new ServerSocket(9998);
System.out.println("=====等待客户端连接......");
// 2. 等待客户端连接 , 会阻塞
Socket socket = serverSocket.accept();
System.out.println("=====客户端已连接......");
// 3. 获取客户端发送的内容,如果客户端没有发送内容,也会阻塞
System.out.println("=====等待客户端发送数据......");
InputStream inputStream = socket.getInputStream();
while (true) {
byte[] bytes = new byte[2048];
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println((Thread.currentThread().getName() + " " + new String(bytes, 0, read)));
} else {
break;
}
}
inputStream.close();
socket.close();
}
简单编码完成后,启动服务,如果在没有客户端连接的情况下,accept()
方法会阻塞,直到有客户端进行了连接。
现在可以打开cmd使用telnet命令来进行连接。连接成功后结合'Ctrl + ]'快捷键进去Telent Client,使用send命令发送数据内容。
telnet 127.0.0.1 9998
这里可以看到,客户端虽然连接成功了,但是在调用getInputStream()
方法时,线程又被阻塞了,那么进行Telnet Client来发送数据。
一切OK,服务端收到了消息。既然说了,BIO一次连接就是一个线程,那么再发起一个客户端连接,来看看main线程到底还能不能获取到消息。
经过验证,第二个连接发送的消息,控制台确实没有收到,那就证实了一次连接就是一个线程。那有些人就会有疑问,既然控制台没打印消息,那怎么确保这条消息就被服务端接收了呢?这岂不是很简单,加个线程池,搞成伪异步不就搞定了。那么就基于上面的代码,按照下图流程方式来简单的改造改造。
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 1. 创建一个BIO服务端 端口号为9998
ServerSocket serverSocket = new ServerSocket(9998);
while(true){
// 2. 等待客户端连接 , 会阻塞
Socket socket = serverSocket.accept();
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 客户端已连接...");
// 3. 获取客户端的ip信息
InetAddress address = socket.getInetAddress();
System.out.println(Thread.currentThread().getName() + " " + address.getHostName() + " , " + address.getHostAddress());
// 4. 获取客户端发送的内容,如果客户端没有发送内容,也会阻塞
InputStream inputStream = socket.getInputStream();
while (true) {
byte[] bytes = new byte[2048];
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println((Thread.currentThread().getName() + " " + new String(bytes, 0, read)));
} else {
break;
}
}
inputStream.close();
} catch (Exception e) {
} finally {
try {
socket.close();
System.out.println(" socket 关闭连接 ");
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
经过一顿乱敲,代码改造完成,运行一下看看效果,尝试多个客户端连接,结果符合预期。(一次连接一个线程是不是更明显了🤭)
NIO
经过对BIO的测试发现,BIO对并发支持不好,如果有大批量的客户端连接的服务端,那么服务端就会不断的创建线程,直到撑爆服务器。所以,在JDK1.4+版本,官方提供了另一种IO模型-NIO模型。NIO即非阻塞式IO,目前应用在很多框架或中间件的底层,它基于Reactor模式,面向缓冲区,不面向流。其核心组件有三种,分别为Buffer缓冲区,Channel通道和Selector选择器。
当程序需要与服务端进行数据交互时,并不会向BIO那样,直接发送到服务端,而是将数据发送到Buffer缓冲区,而Buffer缓冲区与Channel通道之间会进行数据交互,Selector会对应一个线程并且会根据事件驱动(Event)来选择哪一个Channel进行处理。当某个Channel上某个请求的事件完全就绪的时候,选择器Selector才会将该任务分配给服务端的一个或多个线程,其他情况服务器的线程可以做其他事情。
Buffer
Buffer可以简单的理解为一个数组,程序可以向Buffer中写入或者读取数据,数据存储依赖于缓存,在NIO中主要应用在和通道之间进行数据交互。Buffer的实现子类有很多,比如IntBuffer,CharBuffer,DoubleBuffer等,不过使用经常使用的还是ByteBuffer。写一个简单的小demo,来体现一下Buffer如何进行存储和读取的。
public static void main(String[] args) {
// 1. 创建一个容量为100的ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(100);
// 2. 向Buffer中写入数据
buffer.put("byte".getBytes());
buffer.put("byte02".getBytes());
System.out.println("limit = "+buffer.limit()+ " , position = "+buffer.position());
// 3. 切换为读模式
buffer.flip();
System.out.println("limit = "+buffer.limit()+ " , position = "+buffer.position());
// 4. 判断是否还有元素,有则读取
while(buffer.hasRemaining()){
System.out.println(new String(new byte[]{buffer.get()}));
}
}
需要注意的是,如果需要读取缓冲区的数据时,一定要先调用flip()
方法,这是因为在源码中有三个重要参数
// 表示当前写入或读取的位置,每当写入或读取时,该值会进行+1操作
private int position = 0;
// 缓冲区里的数据的总数,代表了当前缓冲区中一共有多少数据
private int limit;
// 缓冲区能够容纳的数据元素的最大数量
private int capacity;
在创建缓冲区时,会指定capacity的大小,此时limit等于capacity,随着不断的写入数据position的值不断的增加,如果position大于capacity时,则会抛出异常。在不调用flip()
方法,进行读取数据时,源码中会根据当前的position位置继续向下读,那么读出的数据就会是一个空值。
/**
* 读取数据时,获取索引下标
*/
final int nextGetIndex() { // package-private
int p = position;
if (p >= limit)
throw new BufferUnderflowException();
position = p + 1;
return p;
}
flip方法中所做的事情就是将position赋值给limit,并将自身清零。那么在读取数据时,position就会从0开始读,一直读到limit为止
所以demo程序的输出结果为
limit = 100 , position = 10
limit = 10 , position = 0
缓冲区的数据是存储在内存中的,这个内存可以是JVM的堆内存,也可以是堆外的内存(堆外内存)。堆外内存的方式可以通过allcateDirect方法进行创建,返回的是DirectByteBuffer对象(直接缓冲区),不受GC影响,使用的是操作系统的物理内存,适合大文件传输等操作。堆内存的方式可以通过allocate方法创建,返回的是HeapByteBuffer对象(非直接缓冲区),会受GC影响。
Channel
Channel是源程序和目标程序之间数据传输的通道,可以通过这个通道进行数据读取或写入,当然数据的读取和写入需要配合Buffer来一起完成。与普通的流相比,Channel是一个双向的通道,而流只能进行单向传输。在NIO中,Channel的实现分为四种,分别为FileChannel, SocketChannel, ServerSocketChannel, DatagramChannel。常用的方法有read(Buffer buffer)
,write(Buffer buffer)
,transferFrom(Channel channel,long position,long count)
分别表示将Channel中的数据读取到Buffer中,将Buffer中的数据写入到Channel中以及从通道中拷贝数据。
- FileChannel
在实现类中FileChannel常被使用,FileChannel即文件通道,用于文件读取,其主要的实现类是FileChannelImpl,但是在使用的过程中是无法直接通过new来创建,可以通过输入流InputStream,输出流OutputStream,RandomAccessFile或者FileChannel提供的open()
方法中来获取实例,那么就通过FileChannel来写个文件拷贝的例子。
public static void main(String[] args) {
// 1. 需要复制的文件
File file = new File("file-channel.txt");
// 2. 创建输入流
FileInputStream inputStream = new FileInputStream(file);
FileChannel inputStreamChannel = inputStream.getChannel();
// 3. 创建输出流
FileOutputStream outputStream = new FileOutputStream("file-channel-copy.txt");
FileChannel outputStreamChannel = outputStream.getChannel();
// 4. 创建buffer缓冲
ByteBuffer buffer = ByteBuffer.allocate((int) file.length());
// 5. 将通道数据读取到缓冲区
inputStreamChannel.read(buffer);
buffer.flip();
// 6. 将缓冲区写入到通道
outputStreamChannel.write(buffer);
/**
* 除了上面使用的read和write方法,也可以使用transferFrom方法直接copy通道中的数据
* outputStreamChannel.transferFrom(inputStreamChannel,0,inputStreamChannel.size());
*/
outputStreamChannel.close();
inputStreamChannel.close();
}
- ServerSocketChannel
ServerSocketChannel与BIO中的ServerSocket类似,可以绑定端口并监听TCP连接。在等待客户端连接的过程中,可以通过使用configureBlocking()
方法来设置阻塞或非阻塞,如果设置了非阻塞,那么在调用accept()
方法时可能会出现NULL值,所以需要注意一下。那么同样搞个demo,来实现双端通信的效果。
public static void main(String[] args) {
// 1. 打开通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open():
// 2.绑定9999端口
serverSocketChannel.bind(new InetSocketAddress(9999));
// 3.设置非阻塞
serverSocketChannel.configureBlocking(false) ;
while (true) {
// 4.客户端连接,因为设置了非阻塞,所以这里可能会为空
SocketChannel channel = serverSocketChannel.accept():
if (channel == null) {
System.out.println("没有客户端连接......");
continue:
}
//5.创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024) ;
// 6.将客户端发来的消息,读取到缓冲区
int read = channel.read(buffer) ;
System.out.println(" client message : " + new String(buffer.array() , 0, read, StandardCharsets.UTF_8):
// 7.回复消息给客户端
channel.write(ByteBuffer.wrap(" server received message".getBytes(StandardCharsets.UTF_8)));
channel.close();
break;
}
}
- SocketChannel
上面服务端代码已经搞定,现在可以编写客户端代码。客户端使用SocketChannel去连接服务端,类似BIO中的Socket。一顿乱敲后,先启动Server,在启动Client就可以实现双端通信了。
public static void main(String[] args) throws IOException{
// 1. 创建通道
SocketChannel channel = SocketChannel.open();
// 2. 通过ip和端口连接server
channel.connect(new InetSocketAddress("127.0.0.1",9999));
// 3. 像server发送数据
channel.write(ByteBuffer.wrap("hello server".getBytes(StandardCharsets.UTF_8)));
// 4. 创建buffer 用于接收server消息
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 5. 将server消息读入buffer
int read = channel.read(buffer);
System.out.println(" server message : " + new String(buffer.array() , 0,read, StandardCharsets.UTF_8));
channel.close();
}
Selector
Selector是NIO中的选择器,主要工作就是通道注册,事件监听,事件选择切换,一个选择器可以注册多个通道。 ServerSocketChannel和SocketChannel都可以注册到选择器中,选择器中通过调用select方法获取通道中所发生的事件,并且根据不同的事件切换到不同的通道。选择器的事件有四种,分别为OP_READ,OP_WRITE,OP_ACCEPT以及OP_CONNECT。
NIO中,一般是一个单线程处理一个选择器,一个选择器可以监控很多通道。所以,通过选择器,一个单线程可以处理上百个、上千个甚至更多的通道,这样可以减少线程之间上下文的切换。通道和选择器之间通过使用register()方法进行注册,通过 selectedKeys()方法获取通道发生的事件。那先现在就来改造一下上面的代码,通过注册选择器的方式实现双端通信。
- 服务端
public static void main(String[] args) throws Exception {
// 1. 创建ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2. 创建选择器Selector
Selector selector = Selector.open();
// 3. 绑定9999端口
serverSocketChannel.bind(new InetSocketAddress(9999));
// 4. 设置非阻塞
serverSocketChannel.configureBlocking(false);
// 5. ServerSocketChannel注册到选择器,并监听连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 6. 没有监听到任何事件
if (selector.selectNow() == 0) {
continue;
}
// 7. 监听到事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 8. 如果是客户端连接事件
if (key.isAcceptable()) {
// 8.1 创建SocketChannel,注册到选择器并监听读事件
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
serverSendMsg(socketChannel, ByteBuffer.wrap("hello client".getBytes(StandardCharsets.UTF_8)));
} else if (key.isReadable()) { // 9. 如果客户端是读事件
// 9.1 获取事件通道,并读取通道数据
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
int read = socketChannel.read(byteBuffer);
System.out.println("[ client message ] : " + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
serverSendMsg(socketChannel, ByteBuffer.wrap("receive client message".getBytes(StandardCharsets.UTF_8)));
}
keyIterator.remove();
}
}
}
/**
* 发送消息给客户端
*/
public static void serverSendMsg(SocketChannel socketChannel, ByteBuffer byteBuffer) throws Exception {
socketChannel.write(byteBuffer);
}
- 客户端
public static void main(String[] args) throws Exception {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
boolean connect = channel.connect(new InetSocketAddress("127.0.0.1", 9999));
if (!connect) {
while (!channel.finishConnect()) {
System.out.println("服务连接中.....");
}
}
receiveServerMsg(channel);
// 发送消息给服务端
channel.write(ByteBuffer.wrap("hello server".getBytes(StandardCharsets.UTF_8)));
receiveServerMsg(channel);
new CountDownLatch(1).await();
}
/**
* 接收服务响应的消息
*/
public static void receiveServerMsg(SocketChannel channel) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = channel.read(buffer);
System.out.println("[ server message ]: " + new String(buffer.array(), 0, read, StandardCharsets.UTF_8));
}
- 运行结果
总结
以上部分简单的介绍了BIO与NIO,其中着重描述了NIO的特性(为了后续的Netty)。BIO基于字节流和字符流进行操作的,而NIO基于Channel(通道)和Buffer(缓冲区)进行操作的,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道事件,因此使用单个线程就可以监听多个客户端通道。下面表格是两种IO方式式的对比。
阻塞状态 | 实现方式程度 | 效率 | 数据处理 | |
---|---|---|---|---|
BIO | 阻塞 | 简单 | 低 | 面向流 |
NIO | 非阻塞 | 相对复杂 | 高 | 面向缓冲区 |
转载自:https://juejin.cn/post/7245567741691445285