阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)
书接上文深入思考Java IO和NIO(一),本文将详述NIO在网络通信中是怎么解决阻塞的API这个问题的。
我们会一步步从一个文件服务器看到是怎么解决这个问题的。
初次实践
假设有一个文件服务器提供上传图片的服务,有海量的客户端的图片需要上传,我们应该怎么提供这个服务呢?
首先我打算使用TCP连接让客户端上传文件,每次上传文件建立一个连接,在一个连接里面客户端将一张图片发送完之后就断开连接。
有了这个想法后,看看我是怎么写这个文件服务器的。
文件服务server
可以看我在GitHub的代码
class BioServer{
public static void main(String[] args) throws IOException {
ServerSocketChannel acceptSocket = ServerSocketChannel.open();
acceptSocket.bind(new InetSocketAddress(6666));
while (true){
SocketChannel tcpSocket = acceptSocket.accept();
Task task = new Task(tcpSocket);
task.start();
}
}
}
class Task extends Thread{
SocketChannel tcpSocket;
public Task(SocketChannel tcpSocket) {
this.tcpSocket = tcpSocket;
}
@Override
public void run() {
try {
handleAAccept(tcpSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
// 将客户端传递过来的图片保存在本地中
private static void handleAAccept(SocketChannel tcpSocket) throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get("pic.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (tcpSocket.read(buffer) != -1) {// 这种channel 只有在关闭的时候才会返回-1
buffer.flip();
fileChannel.write(buffer);
buffer.clear(); // 读完切换成写模式,能让管道继续读取文件的数据
}
server端的代码就上这样
ServerSocketChannel.open()
开启一个ServerSocketChannel用于监听6666端口的连接- while循环调用
acceptSocket.accept()
来监听是否有连接要建立。- accept是阻塞的
- 连接建立成功,则将这个TCP连接交给Task线程去处理。
- Task读取tcpSocket连接发过来的信息并存储到文件。 在这个实现里面, 一个连接需要一个线程处理。
分析阻塞的系统调用
在这个实现里面, 一个连接需要一个线程处理,主要的原因是因为read/write是阻塞的。
read的阻塞
read从Channel读取字节序列到给定缓冲区。
- tcpSocket.read(buffer)和前面一样,也是系统调用read等,是阻塞的,
- 如果缓冲区有可用的数据,则返回可用的数据,尝试从通道读取最多 r 个字节,其中 r 是缓冲区中剩余的字节数,即 dst.remaining()。 下图绿色部分上TCP上缓冲区应用程序可以读取的可用的数据:
- 当对方发起FIN包的时候,这个时候读完所有ACK的数据,read系统调用才会返回-1
tcpSocket.shutdownOutput();
,shutdown()系统调用的功能是关闭一个套接字的指定方向上的通信tcpSocket.close();
- 当RCV_BUF没有数据可读,且收到FIN包的时候,tcpSocket.read(buffer)返回值就是-1
对于read系统调用来说,阻塞和非阻塞的返回是不同的,如下图所示
write的阻塞
对于TCP来说如果这个可用窗口为0,则write操作会被阻塞。
做了一个小实验,客户端不停的发数据,但是让server应用不接收数据。后面发送端的write操作就会被阻塞,,而且发现接收端的接收缓冲区可能是500K的大小。
大家也可以尝试一下这个实验。感受阻塞的write。
传输过程优化--使用零拷贝
就像前面分析的,NIO使用MappedByteBuffer大大优化了数据复制的过程。
我们在这个文件服务器里,如果约定好要传输的数据的大小,那么也可以使用MappedByteBuffer。
void handleAAccept(SocketChannel tcpSocket) throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get("test.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
fileChannel.transferFrom(tcpSocket,0,19300);
}
将客户端传递过来的图片通过 fileChannel.transferFrom(tcpSocket,0,19300)
保存在本地,
同样,客户端发送图片的逻辑也可以简化为fileChannel.transferTo(0, size,tcpSocket)
。
public class BioClient {
public static void main(String[] args) throws IOException {
SocketChannel tcpSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
FileChannel fileChannel = FileChannel.open(Paths.get("***/http.png"), StandardOpenOption.READ);
long size = fileChannel.size();
fileChannel.transferTo(0, size,tcpSocket);
fileChannel.close();
tcpSocket.close();
}
}
transferTo和transferFrom的阻塞
transferTo和transferFrom操作在这里也是阻塞的。
transferFrom将字节从给定的可读字节通道传输到该通道的文件中。
- 如果是从文件到文件,阻塞还是有限的
- 从网络到文件到操作,可能因为等待数据阻塞很久
比如下图这种情况:
客户端发了1000字节,transferFrom拿走可用的800字节就返回了。
而下面这种情况:
transferFrom会等待后面两个字节,除非客户端再发两个字节过来或者连接关闭。
初实现的总结-BIO模型
这个文件服务器的实现已经初具模型,能满足上传图片的要求,还使用零拷贝技术进行优化,这是一个BIO模型。
BIO也就是说阻塞的IO。阻塞虽然不占用CPU时间,但是非常占用线程。 但是每一个连接对应一个新的线程,对于C10K的并发来说,这样的模型是支撑不了的。
所以我能不能使用非阻塞的呢?
非阻塞的服务模型
在JAVA NIO里,可以通过设置参数将Channel的读写设置为非阻塞的。比如 tcpSocket.configureBlocking(false);
,那么read在没有数据的时候就可以返回0,而不是阻塞着。
reply服务器的非阻塞实现
先不考虑文件服务器,这里实现客户端和服务器的通话。如下图所示:
服务器读取客户端发过来的一个小的文字,并回复收到了。具体的代码是:
public class MultiServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(6666));
serverSocket.configureBlocking(false);
Selector selector = Selector.open();
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {//select返回ready events的个数
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel tcpSocket = serverSocket.accept();
tcpSocket.configureBlocking(false);
tcpSocket.register(selector, SelectionKey.OP_READ);
System.out.println("和客户端建立连接");
} else if (key.isReadable()) {
try {
handle(key);
} catch (IOException e) {
key.channel().close();
}
}
}
}
}
private static void handle( SelectionKey key) throws IOException {
SocketChannel tcpSocket = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
String received="";
int i=0;
while ((i=tcpSocket.read(buffer) )> 0) {//>0就表示没有阻塞
buffer.flip();
received = new String(buffer.array(), 0, buffer.remaining());
System.out.println("client说" + received);
buffer.clear();
}
System.out.println("i = " + i);
buffer.put(("我收到了" + received).getBytes(StandardCharsets.UTF_8));
buffer.flip();
tcpSocket.write(buffer);
buffer.clear();
}
}
可以看到效果,多个client和server在通话,而且服务器只使用了一个线程!
使用了selector的多路复用技术, 底层原理是epoll,而且java nio实现的是水平触发的epoll模式,需要使用
SelectionKey key = iterator.next(); iterator.remove();
移除event,表示这个事件会处理完毕。
- 注册了
serverSocket.configureBlocking(false); serverSocket.register(selector, SelectionKey.OP_ACCEPT)
, - 对于建立的连接,又注册
tcpSocket.configureBlocking(false);tcpSocket.register(selector, SelectionKey.OP_READ);
- Selector会负责件监听这些channel的这些注册的类型的事件
-
key.isAcceptable()表示有可连接事件,使用SocketChannel tcpSocket = serverSocket.accept()则不会阻塞,可建立连接
-
key.isReadable()表示这个key有可读事件,使用read也不会阻塞,直接能读到数据,当然读完了就没有数据,再读时读到的数据长度变为0。
-
同理write也是非阻塞的。
-
注册OP_WRITE事件
int write(ByteBuffer src)
返回写的字节长度,在非阻塞模式下,即使没有将准备的buf的数据写完也会返回,比如返回0.
因为缓冲区没有可用空间,那么在写较大数据的时候需要注册OP_WRITE事件,当发送窗口可用时select会通知应用程序。
if (writed<remaining){
tcpSocket.register(selector,SelectionKey.OP_WRITE);
break;
}
在被通知时,记得处理:
...
if (selectionKey.isWritable()){
//将剩下的没有写完的数据继续写完
continueWriteFileToSocket(selectionKey,channel, fileChannel, buffer, selector);
}
private static void continueWriteFileToSocket(SelectionKey key,SocketChannel tcpSocket, FileChannel fileChannel, ByteBuffer buffer, Selector selector) throws IOException {
//buffer在之前kennel被先get了一部分,position的位置已经发生了改变
int writed = tcpSocket.write(buffer);
if (writed<buffer.remaining()){
return;
}
buffer.clear();
//如果能写完,就可以继续往buffer里面put数据并继续发送到网上了
while (fileChannel.read(buffer) != -1) {
buffer.flip();
int remaining = buffer.remaining();
writed = tcpSocket.write(buffer);
System.out.println("write = " + writed);
if (writed<remaining){
return;
}else {
buffer.clear();
}
}
key.cancel();
}
- 写完了之后要cancel。
非阻塞场景下的零拷贝
在非阻塞场景下,同样可以使用transferTo函数减少拷贝次数,但是也要注意上面的写的问题。
long position;
void transferFileToSocket(SocketChannel tcpSocket, FileChannel fileChannel, Selector selector) throws IOException {
long l=fileChannel.transferTo(position, fileChannel.size(), tcpSocket);
position+=l;
System.out.println("position = " + position);
if (position<fileChannel.size()){
//没写完
tcpSocket.register(selector,SelectionKey.OP_WRITE|SelectionKey.OP_READ);
System.out.println("selector.keys() = " + selector.keys());
}
}
void continueTransferFileToSocket(SelectionKey key,SocketChannel tcpSocket, FileChannel fileChannel,Selector selector ) throws IOException {
System.out.println("continueTransferFileToSocket");
long l=fileChannel.transferTo(position, fileChannel.size(), tcpSocket);
position+=l;
System.out.println("position = " + position);
if (position<fileChannel.size()){
//没写完
return;
}
tcpSocket.register(selector,SelectionKey.OP_READ);
}
这里这个Channel还注册了读的事件,所以不用cancel取消这个key.
- 注册写的时候 重新注册SelectionKey.OP_WRITE|SelectionKey.OP_READ
- 后面取消写的时候重新注册SelectionKey.OP_READ
至此,NIO不仅解决了多次数据复制的问题,还解决了阻塞的API的问题。
正确的关闭连接
在服务器处理的时候,一定要注意连接的处理。因为客户端可以随时关闭连接。如果因为一个客户端的关闭导致服务器宕机,那还会影响其他的服务。
比如,此时强制关闭一个client:
此时服务端显示:
造成服务器的IOException异常退出,不过整个处理过程可以catch异常。如果没有catch,server将异常退出,这影响了服务器的服务。
抓包看服务器的6666端口:
# tcpdump -i any port 6666
localhost.49775 > localhost.6666: Flags [S], seq 3053657778, win 65535, options [mss 16324,nop,wscale 6,nop,nop,TS val 1370645599 ecr 0,sackOK,eol], length 0
localhost.6666 > localhost.49775: Flags [S.], seq 3892119155, ack 3053657779, win 65535, options [mss 16324,nop,wscale 6,nop,nop,TS val 1370645599 ecr 1370645599,sackOK,eol], length 0
localhost.49775 > localhost.6666: Flags [.], ack 1, win 6371, options [nop,nop,TS val 1370645599 ecr 1370645599], length 0
localhost.6666 > localhost.49775: Flags [.], ack 1, win 6371, options [nop,nop,TS val 1370645599 ecr 1370645599], length 0
localhost.49775 > localhost.6666: Flags [P.], seq 1:4, ack 1, win 6371, options [nop,nop,TS val 1370647611 ecr 1370645599], length 3
localhost.6666 > localhost.49775: Flags [.], ack 4, win 6371, options [nop,nop,TS val 1370647611 ecr 1370647611], length 0
localhost.6666 > localhost.49775: Flags [P.], seq 1:16, ack 4, win 6371, options [nop,nop,TS val 1370647612 ecr 1370647611], length 15
localhost.49775 > localhost.6666: Flags [.], ack 16, win 6371, options [nop,nop,TS val 1370647612 ecr 1370647612], length 0
localhost.49775 > localhost.6666: Flags [F.], seq 4, ack 16, win 6371, options [nop,nop,TS val 1371319563 ecr 1370647612], length 0
localhost.6666 > localhost.49775: Flags [.], ack 5, win 6371, options [nop,nop,TS val 1371319563 ecr 1371319563], length 0
localhost.6666 > localhost.49775: Flags [P.], seq 16:28, ack 5, win 6371, options [nop,nop,TS val 1371319573 ecr 1371319563], length 12
localhost.49775 > localhost.6666: Flags [R], seq 3053657783, win 0, length 0
首先是三次握手完成:
- [S]client 发起syn包,seq=3053657778,win=65535
- [S.] server ack=3053657779,seq=3892119155,win=65535
- [.] client ack=1 win 6371 然后发送数据:一发一收; 最后客户端关闭连接:
- 客户端发送FIN
- 服务器内核回复ACK
- 服务器应用程序还在继续发数据,并没有也关闭连接
- 客户端内核发Reset包,client强制关闭了连接。
服务端应该正确的处理连接的关闭,比如像下面这样:
SocketChannel tcpSocket = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = tcpSocket.read(buffer);
if (read ==-1){
key.channel().close();
return;
}
}
假设tcpSocket.read返回-1 ,则表示收到了一个FIN包,则服务端也关闭连接。 这就是完美的四次挥手过程。如下显示
# tcpdump -i any -nn -s0 -v --absolute-tcp-sequence-numbers port 6666
1.51461 > ::1.6666: Flags [F.], cksum 0x0028 (incorrect -> 0x5f54), seq 220187003, ack 2240511794, win 6371, options [nop,nop,TS val 1372377293 ecr 1372375988], length 0
1.6666 > ::1.51461: Flags [.], cksum 0x0028 (incorrect -> 0x5a3b), ack 220187004, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0
1.6666 > ::1.51461: Flags [F.], cksum 0x0028 (incorrect -> 0x5a3a), seq 2240511794, ack 220187004, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0
1.51461 > ::1.6666: Flags [.], cksum 0x0028 (incorrect -> 0x5a3a), ack 2240511795, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0
然后那条连接就处于TIME_WAIT状态了。
tcp6 0 0 localhost.51947 localhost.6666 TIME_WAIT
TIME_WAIT只需要等待两个MSL,看起来大概是2S左右。
之前是客户端发Reset强制关闭连接的,那连接就直接释放了,此时服务端再发数据可能就有些不能预料的异常了。
IO复用模型的总结-reactor单线程模型
通过引入Selector,一个线程就能完成所有的连接和客户端通信。 但是在我们这样的实现下,还是有一些缺点:
- 客户端之间相互影响,如果有一个客户端卡太久,会影响新的连接
- 但是不能很好地利用多核cpu
其实,上面举的这个例子就是单线程reactor模型,像redis就是使用的这种模型,因为redis使用的场景get/set都是内存的操作,速度非常快,使用单线程reactor模型能够达到高性能。
server Reactor 单线程处理模型图如下
文件服务器:reactor多线程模型
假设我们要用Reactor单线程模型来实现文件的传输,我们并不知道文件的大小,接收数据的时候并不知道什么时候接收完一张图片了。 客户端和服务器之间需要做一些约定:
- 发送端先约定图片的大小
- 接收端的连接需要在接收到这么大的一个文件之后,才将这个全部写到文件里面,才算这次图片传输真的结束了。
- 另外接收完图片之后,验证压缩水印处理可能要花很多的时间,我们需要有一个线程池来帮助处理这些,不能让业务处理占用连接和网络读写的时间,使得服务之间相互影响。
带有Process线程池的实现
static ExecutorService executorService = Executors.newFixedThreadPool(8);
//需要有个关于连接的数组来就记录size,acc
static ConcurrentHashMap<SocketChannel, HashMap<String,Object>> channelMap = new ConcurrentHashMap<>();
static ConcurrentHashMap<Future<?>,SocketChannel> tasks=new ConcurrentHashMap<>();
if (key.isReadable()){
SocketChannel tcpSocket =(SocketChannel) key.channel();
HashMap<String,Object> prop = channelMap.get(tcpSocket);
ByteBuffer buffer = ByteBuffer.allocate(1024*4);
while (true){
int read = tcpSocket.read(buffer);
if (!(read >0)) break;
buffer.flip();
if (prop.get("size").equals(0L)){
long size = 0;
try {
size = buffer.getLong();
} catch (Exception e) {
System.out.println("需要按照指定的协议上传文件: size+文件");
tcpSocket.close();
break;
}
prop.put("size",size);
prop.put("fileChannel",FileChannel.open(Paths.get(size+".png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE));
}
if ((Long)prop.get("size")>0L) {
FileChannel fileChannel =(FileChannel) prop.get("fileChannel");
fileChannel.write(buffer);
buffer.clear();
Long size = (Long)prop.get("size");
Long acc = (Long)prop.get("acc");
prop.put("acc",acc+read);
System.out.println("进度 = " + ((acc - 8) * 1.0 / size * 1.0) * 100);
}
}
completed((Long)prop.get("size"), (Long)prop.get("acc"), tcpSocket,channelMap);
}
private static void completed(Long size,Long acc,SocketChannel tcpSocket,ConcurrentHashMap<SocketChannel,HashMap<String,Object> > channelMap) throws IOException, ExecutionException, InterruptedException {
if (acc == size +8) {
HashMap<String,Object> prop = channelMap.get(tcpSocket);
Future<String> future = executorService.submit(new Processor(prop));
//再使用一个数组,将future和channelMap放进去,
tasks.put(future,tcpSocket);
handleTasks();
}
}
private static void handleTasks() throws InterruptedException, ExecutionException, IOException {
//遍历task
ByteBuffer buffer = ByteBuffer.allocate(1024);
for (Future<?> future1 : tasks.keySet()) {
if (future1.isDone()){
SocketChannel tcpSocket = tasks.get(future1);
String o = future1.get().toString();
buffer.put(o.getBytes());
buffer.flip();
tcpSocket.write(buffer);
buffer.clear();
//关闭文件 关闭 释放资源。。。
tasks.remove(future1);
}
}
}
}
class Processor implements Callable<String> {
public String call() throws Exception {
// 假设在处理业务
int millis = new Random().nextInt(3000);
System.out.println("sleep millis " + millis);
Thread.sleep(millis);
return "完成收图" ;
}
}
要做到IO复用,需要一个数组保存文件的基本属性(比如大小、名字和上传进度等)和连接的对应关系; 使用了线程池之后,任务的执行会变得非常复杂
- 等待业务逻辑执行完毕才返回数据给客户端
Future<String> future = executorService.submit(new Processor(prop));
//再使用一个数组,将future和channelMap放进去,
tasks.put(future,tcpSocket);
- 需要统一处理task,不知道任务什么时候会处理完,需要select超时阻塞也要再调用task,比如
while (selector.select(1000)>=0){//假设一直阻塞,则任务可能得不到执行
handleTasks();
select的超时时间是多少可能需要根据业务场景来具体指定,比如10ms或者1ms。
更优雅的封装
上面的代码实现很复杂,单一职责原则将代码拆分为Reactor类、Acceptor类和Handler类,这就是下面的模型了,就是Reactor 多线程处理模型。
总结
本文通过一个具体的例子,一步一步阐述了NIO在网络通信中是怎么解决阻塞的API这个问题的, 解决阻塞的问题带来了很多性能的好处,但也带来了编程和理解上的一些复杂性。
并且为了提高文件服务器的鲁棒性和高可用性,也使用了Reactor 多线程处理模型。甚至为了更好的提高网络服务器的鲁棒性和高可用性,以及更好高效实现喝更完备的功能,还有像netty服务器这样的网络框架,可以看我写的Netty究竟是怎么运行的-连接流程的深入剖析
查漏补缺
关于Java NIO的内容,通过这两篇文章已经剖析完毕。是否已经了解到这些并形成记忆?
-
Java 流是BIO
-
多次拷贝且阻塞
-
ByteBuffer是NIO
-
能减内核用户态上下文切换
-
来减少拷贝次数
-
利用系统底层epoll机制
-
将阻塞的语义变为非阻塞的语义
-
感受阻塞和非阻塞的过程
-
记住零拷贝的几个api
-
transferTo/From
-
连接处理很复杂
-
while循环的套路
-
Rector模型来帮忙
转载自:https://juejin.cn/post/7045672219866988551