阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)
书接上文深入思考Java IO和NIO(一),本文将详述NIO在网络通信中是怎么解决阻塞的API这个问题的。
我们会一步步从一个文件服务器看到是怎么解决这个问题的。
初次实践
假设有一个文件服务器提供上传图片的服务,有海量的客户端的图片需要上传,我们应该怎么提供这个服务呢?
首先我打算使用TCP连接让客户端上传文件,每次上传文件建立一个连接,在一个连接里面客户端将一张图片发送完之后就断开连接。
有了这个想法后,看看我是怎么写这个文件服务器的。
文件服务server
可以看我在GitHub的代码
class BioServer{
  public static void main(String[] args) throws IOException {
    ServerSocketChannel acceptSocket = ServerSocketChannel.open();
    acceptSocket.bind(new InetSocketAddress(6666));
    while (true){
        SocketChannel tcpSocket = acceptSocket.accept();
        Task task = new Task(tcpSocket);
        task.start();
    }
 }
}
class Task extends Thread{
    SocketChannel tcpSocket;
    public Task(SocketChannel tcpSocket) {
        this.tcpSocket = tcpSocket;
    }
    @Override
    public void run() {
        try {
            handleAAccept(tcpSocket);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    // 将客户端传递过来的图片保存在本地中
    private static void handleAAccept(SocketChannel tcpSocket) throws IOException {
    FileChannel fileChannel = FileChannel.open(Paths.get("pic.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    while (tcpSocket.read(buffer) != -1) {// 这种channel 只有在关闭的时候才会返回-1 
        buffer.flip();
        fileChannel.write(buffer);
        buffer.clear(); // 读完切换成写模式,能让管道继续读取文件的数据
    }
server端的代码就上这样
ServerSocketChannel.open()开启一个ServerSocketChannel用于监听6666端口的连接- while循环调用
acceptSocket.accept()来监听是否有连接要建立。- accept是阻塞的
 - 连接建立成功,则将这个TCP连接交给Task线程去处理。
 
 - Task读取tcpSocket连接发过来的信息并存储到文件。 在这个实现里面, 一个连接需要一个线程处理。
 
分析阻塞的系统调用
在这个实现里面, 一个连接需要一个线程处理,主要的原因是因为read/write是阻塞的。
read的阻塞
read从Channel读取字节序列到给定缓冲区。
- tcpSocket.read(buffer)和前面一样,也是系统调用read等,是阻塞的,
 - 如果缓冲区有可用的数据,则返回可用的数据,尝试从通道读取最多 r 个字节,其中 r 是缓冲区中剩余的字节数,即 dst.remaining()。 下图绿色部分上TCP上缓冲区应用程序可以读取的可用的数据:
 

- 当对方发起FIN包的时候,这个时候读完所有ACK的数据,read系统调用才会返回-1
tcpSocket.shutdownOutput();,shutdown()系统调用的功能是关闭一个套接字的指定方向上的通信tcpSocket.close();- 当RCV_BUF没有数据可读,且收到FIN包的时候,tcpSocket.read(buffer)返回值就是-1
 
 
对于read系统调用来说,阻塞和非阻塞的返回是不同的,如下图所示

write的阻塞
对于TCP来说如果这个可用窗口为0,则write操作会被阻塞。

做了一个小实验,客户端不停的发数据,但是让server应用不接收数据。后面发送端的write操作就会被阻塞,,而且发现接收端的接收缓冲区可能是500K的大小。
大家也可以尝试一下这个实验。感受阻塞的write。
传输过程优化--使用零拷贝
就像前面分析的,NIO使用MappedByteBuffer大大优化了数据复制的过程。

我们在这个文件服务器里,如果约定好要传输的数据的大小,那么也可以使用MappedByteBuffer。
 void handleAAccept(SocketChannel tcpSocket) throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get("test.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
        fileChannel.transferFrom(tcpSocket,0,19300);
    }
将客户端传递过来的图片通过 fileChannel.transferFrom(tcpSocket,0,19300)保存在本地,
同样,客户端发送图片的逻辑也可以简化为fileChannel.transferTo(0, size,tcpSocket)。
public class BioClient {
    public static void main(String[] args) throws IOException {
        SocketChannel tcpSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
        FileChannel fileChannel = FileChannel.open(Paths.get("***/http.png"), StandardOpenOption.READ);
        long size = fileChannel.size();
        fileChannel.transferTo(0, size,tcpSocket);
        fileChannel.close();
        tcpSocket.close();
    }
}
transferTo和transferFrom的阻塞
transferTo和transferFrom操作在这里也是阻塞的。
transferFrom将字节从给定的可读字节通道传输到该通道的文件中。
- 如果是从文件到文件,阻塞还是有限的
 - 从网络到文件到操作,可能因为等待数据阻塞很久
比如下图这种情况:
客户端发了1000字节,transferFrom拿走可用的800字节就返回了。 
而下面这种情况:
transferFrom会等待后面两个字节,除非客户端再发两个字节过来或者连接关闭。
初实现的总结-BIO模型
这个文件服务器的实现已经初具模型,能满足上传图片的要求,还使用零拷贝技术进行优化,这是一个BIO模型。
BIO也就是说阻塞的IO。阻塞虽然不占用CPU时间,但是非常占用线程。 但是每一个连接对应一个新的线程,对于C10K的并发来说,这样的模型是支撑不了的。
所以我能不能使用非阻塞的呢?
非阻塞的服务模型
在JAVA NIO里,可以通过设置参数将Channel的读写设置为非阻塞的。比如 tcpSocket.configureBlocking(false);,那么read在没有数据的时候就可以返回0,而不是阻塞着。
reply服务器的非阻塞实现
先不考虑文件服务器,这里实现客户端和服务器的通话。如下图所示:

服务器读取客户端发过来的一个小的文字,并回复收到了。具体的代码是:
public class MultiServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress(6666));
        serverSocket.configureBlocking(false);
        Selector selector = Selector.open();
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        while (selector.select() > 0) {//select返回ready events的个数
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    SocketChannel tcpSocket = serverSocket.accept();
                    tcpSocket.configureBlocking(false);
                    tcpSocket.register(selector, SelectionKey.OP_READ);
                    System.out.println("和客户端建立连接");
                } else if (key.isReadable()) {
                    try {
                        handle(key);
                    } catch (IOException e) {
                        key.channel().close();
                    }
                }
            }
        }
    }
    private static void handle( SelectionKey key) throws IOException {
        SocketChannel tcpSocket = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String received="";
        int i=0;
        while ((i=tcpSocket.read(buffer) )> 0) {//>0就表示没有阻塞
            buffer.flip();
            received = new String(buffer.array(), 0, buffer.remaining());
            System.out.println("client说" + received);
            buffer.clear();
        }
        System.out.println("i = " + i);
        buffer.put(("我收到了" + received).getBytes(StandardCharsets.UTF_8));
        buffer.flip();
        tcpSocket.write(buffer);
        buffer.clear();
    }
}
可以看到效果,多个client和server在通话,而且服务器只使用了一个线程!
使用了selector的多路复用技术, 底层原理是epoll,而且java nio实现的是水平触发的epoll模式,需要使用
SelectionKey key = iterator.next();                 iterator.remove();移除event,表示这个事件会处理完毕。
- 注册了
serverSocket.configureBlocking(false); serverSocket.register(selector, SelectionKey.OP_ACCEPT), - 对于建立的连接,又注册
tcpSocket.configureBlocking(false);tcpSocket.register(selector, SelectionKey.OP_READ); - Selector会负责件监听这些channel的这些注册的类型的事件
- 
key.isAcceptable()表示有可连接事件,使用SocketChannel tcpSocket = serverSocket.accept()则不会阻塞,可建立连接
 - 
key.isReadable()表示这个key有可读事件,使用read也不会阻塞,直接能读到数据,当然读完了就没有数据,再读时读到的数据长度变为0。
 - 
同理write也是非阻塞的。
 
 - 
 
注册OP_WRITE事件
 int write(ByteBuffer src)返回写的字节长度,在非阻塞模式下,即使没有将准备的buf的数据写完也会返回,比如返回0.
因为缓冲区没有可用空间,那么在写较大数据的时候需要注册OP_WRITE事件,当发送窗口可用时select会通知应用程序。
if (writed<remaining){
    tcpSocket.register(selector,SelectionKey.OP_WRITE);
    break;
}
在被通知时,记得处理:
...
if (selectionKey.isWritable()){
    //将剩下的没有写完的数据继续写完
    continueWriteFileToSocket(selectionKey,channel, fileChannel, buffer, selector);
}
private static void continueWriteFileToSocket(SelectionKey key,SocketChannel tcpSocket, FileChannel fileChannel, ByteBuffer buffer, Selector selector) throws IOException {
    //buffer在之前kennel被先get了一部分,position的位置已经发生了改变
    int writed = tcpSocket.write(buffer);
    if (writed<buffer.remaining()){
       return;
    }
    buffer.clear();
    //如果能写完,就可以继续往buffer里面put数据并继续发送到网上了
    while (fileChannel.read(buffer) != -1) {
        buffer.flip();
        int remaining = buffer.remaining();
        writed = tcpSocket.write(buffer);
        System.out.println("write = " + writed);
        if (writed<remaining){
            return;
        }else {
            buffer.clear();
        }
    }
    key.cancel();
}
- 写完了之后要cancel。
 
非阻塞场景下的零拷贝
在非阻塞场景下,同样可以使用transferTo函数减少拷贝次数,但是也要注意上面的写的问题。
     long position;
    void transferFileToSocket(SocketChannel tcpSocket, FileChannel fileChannel, Selector selector) throws IOException {
        long l=fileChannel.transferTo(position, fileChannel.size(), tcpSocket);
        position+=l;
        System.out.println("position = " + position);
        if (position<fileChannel.size()){
            //没写完
            tcpSocket.register(selector,SelectionKey.OP_WRITE|SelectionKey.OP_READ);
            System.out.println("selector.keys() = " + selector.keys());
        }
    }
    void continueTransferFileToSocket(SelectionKey key,SocketChannel tcpSocket, FileChannel fileChannel,Selector selector ) throws IOException {
        System.out.println("continueTransferFileToSocket");
        long l=fileChannel.transferTo(position, fileChannel.size(), tcpSocket);
        position+=l;
        System.out.println("position = " + position);
        if (position<fileChannel.size()){
            //没写完
            return;
        }
      
       tcpSocket.register(selector,SelectionKey.OP_READ);
  
    }
这里这个Channel还注册了读的事件,所以不用cancel取消这个key.
- 注册写的时候 重新注册SelectionKey.OP_WRITE|SelectionKey.OP_READ
 - 后面取消写的时候重新注册SelectionKey.OP_READ
 
至此,NIO不仅解决了多次数据复制的问题,还解决了阻塞的API的问题。

正确的关闭连接
在服务器处理的时候,一定要注意连接的处理。因为客户端可以随时关闭连接。如果因为一个客户端的关闭导致服务器宕机,那还会影响其他的服务。
比如,此时强制关闭一个client:
此时服务端显示:
造成服务器的IOException异常退出,不过整个处理过程可以catch异常。如果没有catch,server将异常退出,这影响了服务器的服务。
抓包看服务器的6666端口:
# tcpdump -i any port 6666
localhost.49775 > localhost.6666: Flags [S], seq 3053657778, win 65535, options [mss 16324,nop,wscale 6,nop,nop,TS val 1370645599 ecr 0,sackOK,eol], length 0
localhost.6666 > localhost.49775: Flags [S.], seq 3892119155, ack 3053657779, win 65535, options [mss 16324,nop,wscale 6,nop,nop,TS val 1370645599 ecr 1370645599,sackOK,eol], length 0
localhost.49775 > localhost.6666: Flags [.], ack 1, win 6371, options [nop,nop,TS val 1370645599 ecr 1370645599], length 0
localhost.6666 > localhost.49775: Flags [.], ack 1, win 6371, options [nop,nop,TS val 1370645599 ecr 1370645599], length 0
localhost.49775 > localhost.6666: Flags [P.], seq 1:4, ack 1, win 6371, options [nop,nop,TS val 1370647611 ecr 1370645599], length 3
localhost.6666 > localhost.49775: Flags [.], ack 4, win 6371, options [nop,nop,TS val 1370647611 ecr 1370647611], length 0
localhost.6666 > localhost.49775: Flags [P.], seq 1:16, ack 4, win 6371, options [nop,nop,TS val 1370647612 ecr 1370647611], length 15
localhost.49775 > localhost.6666: Flags [.], ack 16, win 6371, options [nop,nop,TS val 1370647612 ecr 1370647612], length 0
localhost.49775 > localhost.6666: Flags [F.], seq 4, ack 16, win 6371, options [nop,nop,TS val 1371319563 ecr 1370647612], length 0
 localhost.6666 > localhost.49775: Flags [.], ack 5, win 6371, options [nop,nop,TS val 1371319563 ecr 1371319563], length 0
 localhost.6666 > localhost.49775: Flags [P.], seq 16:28, ack 5, win 6371, options [nop,nop,TS val 1371319573 ecr 1371319563], length 12
 localhost.49775 > localhost.6666: Flags [R], seq 3053657783, win 0, length 0
首先是三次握手完成:
- [S]client 发起syn包,seq=3053657778,win=65535
 - [S.] server ack=3053657779,seq=3892119155,win=65535
 - [.] client ack=1 win 6371 然后发送数据:一发一收; 最后客户端关闭连接:
 - 客户端发送FIN
 - 服务器内核回复ACK
 - 服务器应用程序还在继续发数据,并没有也关闭连接
 - 客户端内核发Reset包,client强制关闭了连接。
 
服务端应该正确的处理连接的关闭,比如像下面这样:
 SocketChannel tcpSocket = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int read = tcpSocket.read(buffer);
    if (read ==-1){
        key.channel().close();
        return;
    }
}
假设tcpSocket.read返回-1 ,则表示收到了一个FIN包,则服务端也关闭连接。 这就是完美的四次挥手过程。如下显示
 # tcpdump -i any -nn -s0 -v --absolute-tcp-sequence-numbers port 6666
1.51461 > ::1.6666: Flags [F.], cksum 0x0028 (incorrect -> 0x5f54), seq 220187003, ack 2240511794, win 6371, options [nop,nop,TS val 1372377293 ecr 1372375988], length 0
1.6666 > ::1.51461: Flags [.], cksum 0x0028 (incorrect -> 0x5a3b), ack 220187004, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0
1.6666 > ::1.51461: Flags [F.], cksum 0x0028 (incorrect -> 0x5a3a), seq 2240511794, ack 220187004, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0
1.51461 > ::1.6666: Flags [.], cksum 0x0028 (incorrect -> 0x5a3a), ack 2240511795, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0
然后那条连接就处于TIME_WAIT状态了。
tcp6       0      0  localhost.51947      localhost.6666     TIME_WAIT
TIME_WAIT只需要等待两个MSL,看起来大概是2S左右。
之前是客户端发Reset强制关闭连接的,那连接就直接释放了,此时服务端再发数据可能就有些不能预料的异常了。
IO复用模型的总结-reactor单线程模型
通过引入Selector,一个线程就能完成所有的连接和客户端通信。 但是在我们这样的实现下,还是有一些缺点:
- 客户端之间相互影响,如果有一个客户端卡太久,会影响新的连接
 - 但是不能很好地利用多核cpu
 
其实,上面举的这个例子就是单线程reactor模型,像redis就是使用的这种模型,因为redis使用的场景get/set都是内存的操作,速度非常快,使用单线程reactor模型能够达到高性能。
server Reactor 单线程处理模型图如下

文件服务器:reactor多线程模型
假设我们要用Reactor单线程模型来实现文件的传输,我们并不知道文件的大小,接收数据的时候并不知道什么时候接收完一张图片了。 客户端和服务器之间需要做一些约定:
- 发送端先约定图片的大小
 - 接收端的连接需要在接收到这么大的一个文件之后,才将这个全部写到文件里面,才算这次图片传输真的结束了。
 - 另外接收完图片之后,验证压缩水印处理可能要花很多的时间,我们需要有一个线程池来帮助处理这些,不能让业务处理占用连接和网络读写的时间,使得服务之间相互影响。
 
带有Process线程池的实现
    static ExecutorService executorService = Executors.newFixedThreadPool(8);
    //需要有个关于连接的数组来就记录size,acc
    static ConcurrentHashMap<SocketChannel, HashMap<String,Object>> channelMap = new ConcurrentHashMap<>();
      static ConcurrentHashMap<Future<?>,SocketChannel> tasks=new ConcurrentHashMap<>();
    
       if (key.isReadable()){
                    SocketChannel tcpSocket =(SocketChannel) key.channel();
                    HashMap<String,Object> prop = channelMap.get(tcpSocket);
                    ByteBuffer buffer = ByteBuffer.allocate(1024*4);
                    while (true){
                        int read = tcpSocket.read(buffer);
                        if (!(read >0)) break;
                        buffer.flip();
                        if (prop.get("size").equals(0L)){
                            long size = 0;
                            try {
                                size = buffer.getLong();
                            } catch (Exception e) {
                                System.out.println("需要按照指定的协议上传文件: size+文件");
                                tcpSocket.close();
                                break;
                            }
                            prop.put("size",size);
                            prop.put("fileChannel",FileChannel.open(Paths.get(size+".png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE));
                        }
                        if ((Long)prop.get("size")>0L) {
                            FileChannel fileChannel =(FileChannel) prop.get("fileChannel");
                            fileChannel.write(buffer);
                            buffer.clear();
                            Long size = (Long)prop.get("size");
                            Long acc = (Long)prop.get("acc");
                            prop.put("acc",acc+read);
                            System.out.println("进度 = " + ((acc - 8) * 1.0 / size * 1.0) * 100);
                        }
                    }
                    completed((Long)prop.get("size"), (Long)prop.get("acc"), tcpSocket,channelMap);
                }
  
    private static void completed(Long size,Long acc,SocketChannel tcpSocket,ConcurrentHashMap<SocketChannel,HashMap<String,Object> > channelMap) throws IOException, ExecutionException, InterruptedException {
        if (acc == size +8) {
            HashMap<String,Object> prop = channelMap.get(tcpSocket);
            Future<String> future = executorService.submit(new Processor(prop));
            //再使用一个数组,将future和channelMap放进去,
            tasks.put(future,tcpSocket);
            handleTasks();
        }
    }
    private static void handleTasks() throws InterruptedException, ExecutionException, IOException {
        //遍历task
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        for (Future<?> future1 : tasks.keySet()) {
            if (future1.isDone()){
                SocketChannel tcpSocket = tasks.get(future1);
                String o = future1.get().toString();
                buffer.put(o.getBytes());
                buffer.flip();
                tcpSocket.write(buffer);
                buffer.clear();
                //关闭文件 关闭 释放资源。。。
                tasks.remove(future1);
            }
        }
    }
}
class Processor implements Callable<String> {
    public String call() throws Exception {
          // 假设在处理业务
          int millis = new Random().nextInt(3000);
            System.out.println("sleep millis " + millis);
            Thread.sleep(millis);
        return "完成收图" ;
    }
}
要做到IO复用,需要一个数组保存文件的基本属性(比如大小、名字和上传进度等)和连接的对应关系; 使用了线程池之后,任务的执行会变得非常复杂
- 等待业务逻辑执行完毕才返回数据给客户端
 
Future<String> future = executorService.submit(new Processor(prop));
            //再使用一个数组,将future和channelMap放进去,
            tasks.put(future,tcpSocket);
- 需要统一处理task,不知道任务什么时候会处理完,需要select超时阻塞也要再调用task,比如
 
while (selector.select(1000)>=0){//假设一直阻塞,则任务可能得不到执行
    handleTasks();
select的超时时间是多少可能需要根据业务场景来具体指定,比如10ms或者1ms。
更优雅的封装
上面的代码实现很复杂,单一职责原则将代码拆分为Reactor类、Acceptor类和Handler类,这就是下面的模型了,就是Reactor 多线程处理模型。

总结
本文通过一个具体的例子,一步一步阐述了NIO在网络通信中是怎么解决阻塞的API这个问题的, 解决阻塞的问题带来了很多性能的好处,但也带来了编程和理解上的一些复杂性。
并且为了提高文件服务器的鲁棒性和高可用性,也使用了Reactor 多线程处理模型。甚至为了更好的提高网络服务器的鲁棒性和高可用性,以及更好高效实现喝更完备的功能,还有像netty服务器这样的网络框架,可以看我写的Netty究竟是怎么运行的-连接流程的深入剖析
查漏补缺
关于Java NIO的内容,通过这两篇文章已经剖析完毕。是否已经了解到这些并形成记忆?
- 
Java 流是BIO
 - 
多次拷贝且阻塞
 - 
ByteBuffer是NIO
 - 
能减内核用户态上下文切换
 - 
来减少拷贝次数
 - 
利用系统底层epoll机制
 - 
将阻塞的语义变为非阻塞的语义
 - 
感受阻塞和非阻塞的过程
 - 
记住零拷贝的几个api
 - 
transferTo/From
 - 
连接处理很复杂
 - 
while循环的套路
 - 
Rector模型来帮忙
 
转载自:https://juejin.cn/post/7045672219866988551