likes
comments
collection
share

阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

作者站长头像
站长
· 阅读数 15

书接上文深入思考Java IO和NIO(一),本文将详述NIO在网络通信中是怎么解决阻塞的API这个问题的。

我们会一步步从一个文件服务器看到是怎么解决这个问题的。

初次实践

假设有一个文件服务器提供上传图片的服务,有海量的客户端的图片需要上传,我们应该怎么提供这个服务呢?

首先我打算使用TCP连接让客户端上传文件,每次上传文件建立一个连接,在一个连接里面客户端将一张图片发送完之后就断开连接。

有了这个想法后,看看我是怎么写这个文件服务器的。

文件服务server

可以看我在GitHub的代码

class BioServer{
  public static void main(String[] args) throws IOException {
    ServerSocketChannel acceptSocket = ServerSocketChannel.open();
    acceptSocket.bind(new InetSocketAddress(6666));
    while (true){
        SocketChannel tcpSocket = acceptSocket.accept();
        Task task = new Task(tcpSocket);
        task.start();
    }
 }
}
class Task extends Thread{
    SocketChannel tcpSocket;
    public Task(SocketChannel tcpSocket) {
        this.tcpSocket = tcpSocket;
    }
    @Override
    public void run() {
        try {
            handleAAccept(tcpSocket);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    // 将客户端传递过来的图片保存在本地中
    private static void handleAAccept(SocketChannel tcpSocket) throws IOException {
    FileChannel fileChannel = FileChannel.open(Paths.get("pic.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    while (tcpSocket.read(buffer) != -1) {// 这种channel 只有在关闭的时候才会返回-1 
        buffer.flip();
        fileChannel.write(buffer);
        buffer.clear(); // 读完切换成写模式,能让管道继续读取文件的数据
    }

server端的代码就上这样

  • ServerSocketChannel.open()开启一个ServerSocketChannel用于监听6666端口的连接
  • while循环调用acceptSocket.accept()来监听是否有连接要建立。
    • accept是阻塞的
    • 连接建立成功,则将这个TCP连接交给Task线程去处理。
  • Task读取tcpSocket连接发过来的信息并存储到文件。 在这个实现里面, 一个连接需要一个线程处理。

分析阻塞的系统调用

在这个实现里面, 一个连接需要一个线程处理,主要的原因是因为read/write是阻塞的。

read的阻塞

read从Channel读取字节序列到给定缓冲区。

  • tcpSocket.read(buffer)和前面一样,也是系统调用read等,是阻塞的,
  • 如果缓冲区有可用的数据,则返回可用的数据,尝试从通道读取最多 r 个字节,其中 r 是缓冲区中剩余的字节数,即 dst.remaining()。 下图绿色部分上TCP上缓冲区应用程序可以读取的可用的数据:

阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

  • 当对方发起FIN包的时候,这个时候读完所有ACK的数据,read系统调用才会返回-1
    • tcpSocket.shutdownOutput();,shutdown()系统调用的功能是关闭一个套接字的指定方向上的通信
    • tcpSocket.close();
    • 当RCV_BUF没有数据可读,且收到FIN包的时候,tcpSocket.read(buffer)返回值就是-1

对于read系统调用来说,阻塞和非阻塞的返回是不同的,如下图所示 阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

write的阻塞

对于TCP来说如果这个可用窗口为0,则write操作会被阻塞。 阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

做了一个小实验,客户端不停的发数据,但是让server应用不接收数据。后面发送端的write操作就会被阻塞,,而且发现接收端的接收缓冲区可能是500K的大小。

大家也可以尝试一下这个实验。感受阻塞的write。

传输过程优化--使用零拷贝

就像前面分析的,NIO使用MappedByteBuffer大大优化了数据复制的过程。 阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

我们在这个文件服务器里,如果约定好要传输的数据的大小,那么也可以使用MappedByteBuffer。

 void handleAAccept(SocketChannel tcpSocket) throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get("test.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
        fileChannel.transferFrom(tcpSocket,0,19300);
    }

将客户端传递过来的图片通过 fileChannel.transferFrom(tcpSocket,0,19300)保存在本地, 同样,客户端发送图片的逻辑也可以简化为fileChannel.transferTo(0, size,tcpSocket)

public class BioClient {
    public static void main(String[] args) throws IOException {
        SocketChannel tcpSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
        FileChannel fileChannel = FileChannel.open(Paths.get("***/http.png"), StandardOpenOption.READ);
        long size = fileChannel.size();
        fileChannel.transferTo(0, size,tcpSocket);
        fileChannel.close();
        tcpSocket.close();
    }
}

transferTo和transferFrom的阻塞

transferTo和transferFrom操作在这里也是阻塞的。

transferFrom将字节从给定的可读字节通道传输到该通道的文件中。

  • 如果是从文件到文件,阻塞还是有限的
  • 从网络到文件到操作,可能因为等待数据阻塞很久 比如下图这种情况: 阻塞的处理和网络模型设计——深入思考Java IO和NIO(二) 客户端发了1000字节,transferFrom拿走可用的800字节就返回了。

而下面这种情况:

阻塞的处理和网络模型设计——深入思考Java IO和NIO(二) transferFrom会等待后面两个字节,除非客户端再发两个字节过来或者连接关闭。

初实现的总结-BIO模型

这个文件服务器的实现已经初具模型,能满足上传图片的要求,还使用零拷贝技术进行优化,这是一个BIO模型

BIO也就是说阻塞的IO。阻塞虽然不占用CPU时间,但是非常占用线程。 但是每一个连接对应一个新的线程,对于C10K的并发来说,这样的模型是支撑不了的。

所以我能不能使用非阻塞的呢?

非阻塞的服务模型

在JAVA NIO里,可以通过设置参数将Channel的读写设置为非阻塞的。比如 tcpSocket.configureBlocking(false);,那么read在没有数据的时候就可以返回0,而不是阻塞着。

reply服务器的非阻塞实现

先不考虑文件服务器,这里实现客户端和服务器的通话。如下图所示:

阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

服务器读取客户端发过来的一个小的文字,并回复收到了。具体的代码是:

public class MultiServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress(6666));
        serverSocket.configureBlocking(false);
        Selector selector = Selector.open();
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        while (selector.select() > 0) {//select返回ready events的个数
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    SocketChannel tcpSocket = serverSocket.accept();
                    tcpSocket.configureBlocking(false);
                    tcpSocket.register(selector, SelectionKey.OP_READ);
                    System.out.println("和客户端建立连接");
                } else if (key.isReadable()) {
                    try {
                        handle(key);
                    } catch (IOException e) {
                        key.channel().close();
                    }
                }
            }
        }
    }
    private static void handle( SelectionKey key) throws IOException {
        SocketChannel tcpSocket = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String received="";
        int i=0;
        while ((i=tcpSocket.read(buffer) )> 0) {//>0就表示没有阻塞
            buffer.flip();
            received = new String(buffer.array(), 0, buffer.remaining());
            System.out.println("client说" + received);
            buffer.clear();
        }
        System.out.println("i = " + i);
        buffer.put(("我收到了" + received).getBytes(StandardCharsets.UTF_8));
        buffer.flip();
        tcpSocket.write(buffer);
        buffer.clear();
    }
}

可以看到效果,多个client和server在通话,而且服务器只使用了一个线程阻塞的处理和网络模型设计——深入思考Java IO和NIO(二) 使用了selector的多路复用技术, 底层原理是epoll,而且java nio实现的是水平触发的epoll模式,需要使用 SelectionKey key = iterator.next(); iterator.remove();移除event,表示这个事件会处理完毕。

  • 注册了 serverSocket.configureBlocking(false); serverSocket.register(selector, SelectionKey.OP_ACCEPT),
  • 对于建立的连接,又注册tcpSocket.configureBlocking(false);tcpSocket.register(selector, SelectionKey.OP_READ);
  • Selector会负责件监听这些channel的这些注册的类型的事件
    • key.isAcceptable()表示有可连接事件,使用SocketChannel tcpSocket = serverSocket.accept()则不会阻塞,可建立连接

    • key.isReadable()表示这个key有可读事件,使用read也不会阻塞,直接能读到数据,当然读完了就没有数据,再读时读到的数据长度变为0。

    • 同理write也是非阻塞的。

注册OP_WRITE事件

int write(ByteBuffer src)返回写的字节长度,在非阻塞模式下,即使没有将准备的buf的数据写完也会返回,比如返回0. 因为缓冲区没有可用空间,那么在写较大数据的时候需要注册OP_WRITE事件,当发送窗口可用时select会通知应用程序。

if (writed<remaining){
    tcpSocket.register(selector,SelectionKey.OP_WRITE);
    break;
}

在被通知时,记得处理:

...
if (selectionKey.isWritable()){
    //将剩下的没有写完的数据继续写完
    continueWriteFileToSocket(selectionKey,channel, fileChannel, buffer, selector);
}

private static void continueWriteFileToSocket(SelectionKey key,SocketChannel tcpSocket, FileChannel fileChannel, ByteBuffer buffer, Selector selector) throws IOException {
    //buffer在之前kennel被先get了一部分,position的位置已经发生了改变
    int writed = tcpSocket.write(buffer);
    if (writed<buffer.remaining()){
       return;
    }
    buffer.clear();
    //如果能写完,就可以继续往buffer里面put数据并继续发送到网上了
    while (fileChannel.read(buffer) != -1) {
        buffer.flip();
        int remaining = buffer.remaining();
        writed = tcpSocket.write(buffer);
        System.out.println("write = " + writed);
        if (writed<remaining){
            return;
        }else {
            buffer.clear();
        }
    }
    key.cancel();
}
  • 写完了之后要cancel。

非阻塞场景下的零拷贝

在非阻塞场景下,同样可以使用transferTo函数减少拷贝次数,但是也要注意上面的写的问题。

     long position;
    void transferFileToSocket(SocketChannel tcpSocket, FileChannel fileChannel, Selector selector) throws IOException {
        long l=fileChannel.transferTo(position, fileChannel.size(), tcpSocket);
        position+=l;
        System.out.println("position = " + position);
        if (position<fileChannel.size()){
            //没写完
            tcpSocket.register(selector,SelectionKey.OP_WRITE|SelectionKey.OP_READ);
            System.out.println("selector.keys() = " + selector.keys());
        }
    }
    void continueTransferFileToSocket(SelectionKey key,SocketChannel tcpSocket, FileChannel fileChannel,Selector selector ) throws IOException {
        System.out.println("continueTransferFileToSocket");
        long l=fileChannel.transferTo(position, fileChannel.size(), tcpSocket);
        position+=l;
        System.out.println("position = " + position);
        if (position<fileChannel.size()){
            //没写完
            return;
        }
      
       tcpSocket.register(selector,SelectionKey.OP_READ);
  
    }

这里这个Channel还注册了读的事件,所以不用cancel取消这个key.

  • 注册写的时候 重新注册SelectionKey.OP_WRITE|SelectionKey.OP_READ
  • 后面取消写的时候重新注册SelectionKey.OP_READ

至此,NIO不仅解决了多次数据复制的问题,还解决了阻塞的API的问题。

阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

正确的关闭连接

在服务器处理的时候,一定要注意连接的处理。因为客户端可以随时关闭连接。如果因为一个客户端的关闭导致服务器宕机,那还会影响其他的服务。 比如,此时强制关闭一个client: 阻塞的处理和网络模型设计——深入思考Java IO和NIO(二) 此时服务端显示: 阻塞的处理和网络模型设计——深入思考Java IO和NIO(二) 造成服务器的IOException异常退出,不过整个处理过程可以catch异常。如果没有catch,server将异常退出,这影响了服务器的服务。

抓包看服务器的6666端口:

# tcpdump -i any port 6666
localhost.49775 > localhost.6666: Flags [S], seq 3053657778, win 65535, options [mss 16324,nop,wscale 6,nop,nop,TS val 1370645599 ecr 0,sackOK,eol], length 0
localhost.6666 > localhost.49775: Flags [S.], seq 3892119155, ack 3053657779, win 65535, options [mss 16324,nop,wscale 6,nop,nop,TS val 1370645599 ecr 1370645599,sackOK,eol], length 0
localhost.49775 > localhost.6666: Flags [.], ack 1, win 6371, options [nop,nop,TS val 1370645599 ecr 1370645599], length 0
localhost.6666 > localhost.49775: Flags [.], ack 1, win 6371, options [nop,nop,TS val 1370645599 ecr 1370645599], length 0
localhost.49775 > localhost.6666: Flags [P.], seq 1:4, ack 1, win 6371, options [nop,nop,TS val 1370647611 ecr 1370645599], length 3
localhost.6666 > localhost.49775: Flags [.], ack 4, win 6371, options [nop,nop,TS val 1370647611 ecr 1370647611], length 0
localhost.6666 > localhost.49775: Flags [P.], seq 1:16, ack 4, win 6371, options [nop,nop,TS val 1370647612 ecr 1370647611], length 15
localhost.49775 > localhost.6666: Flags [.], ack 16, win 6371, options [nop,nop,TS val 1370647612 ecr 1370647612], length 0

localhost.49775 > localhost.6666: Flags [F.], seq 4, ack 16, win 6371, options [nop,nop,TS val 1371319563 ecr 1370647612], length 0
 localhost.6666 > localhost.49775: Flags [.], ack 5, win 6371, options [nop,nop,TS val 1371319563 ecr 1371319563], length 0
 localhost.6666 > localhost.49775: Flags [P.], seq 16:28, ack 5, win 6371, options [nop,nop,TS val 1371319573 ecr 1371319563], length 12
 localhost.49775 > localhost.6666: Flags [R], seq 3053657783, win 0, length 0

首先是三次握手完成:

  • [S]client 发起syn包,seq=3053657778,win=65535
  • [S.] server ack=3053657779,seq=3892119155,win=65535
  • [.] client ack=1 win 6371 然后发送数据:一发一收; 最后客户端关闭连接:
  • 客户端发送FIN
  • 服务器内核回复ACK
  • 服务器应用程序还在继续发数据,并没有也关闭连接
  • 客户端内核发Reset包,client强制关闭了连接。

服务端应该正确的处理连接的关闭,比如像下面这样:

 SocketChannel tcpSocket = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int read = tcpSocket.read(buffer);
    if (read ==-1){
        key.channel().close();
        return;
    }
}

假设tcpSocket.read返回-1 ,则表示收到了一个FIN包,则服务端也关闭连接。 这就是完美的四次挥手过程。如下显示

 # tcpdump -i any -nn -s0 -v --absolute-tcp-sequence-numbers port 6666
1.51461 > ::1.6666: Flags [F.], cksum 0x0028 (incorrect -> 0x5f54), seq 220187003, ack 2240511794, win 6371, options [nop,nop,TS val 1372377293 ecr 1372375988], length 0
1.6666 > ::1.51461: Flags [.], cksum 0x0028 (incorrect -> 0x5a3b), ack 220187004, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0
1.6666 > ::1.51461: Flags [F.], cksum 0x0028 (incorrect -> 0x5a3a), seq 2240511794, ack 220187004, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0
1.51461 > ::1.6666: Flags [.], cksum 0x0028 (incorrect -> 0x5a3a), ack 2240511795, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0

然后那条连接就处于TIME_WAIT状态了。

tcp6       0      0  localhost.51947      localhost.6666     TIME_WAIT

TIME_WAIT只需要等待两个MSL,看起来大概是2S左右。

之前是客户端发Reset强制关闭连接的,那连接就直接释放了,此时服务端再发数据可能就有些不能预料的异常了。

IO复用模型的总结-reactor单线程模型

通过引入Selector,一个线程就能完成所有的连接和客户端通信。 但是在我们这样的实现下,还是有一些缺点:

  • 客户端之间相互影响,如果有一个客户端卡太久,会影响新的连接
  • 但是不能很好地利用多核cpu

其实,上面举的这个例子就是单线程reactor模型,像redis就是使用的这种模型,因为redis使用的场景get/set都是内存的操作,速度非常快,使用单线程reactor模型能够达到高性能。

server Reactor 单线程处理模型图如下 阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

文件服务器:reactor多线程模型

假设我们要用Reactor单线程模型来实现文件的传输,我们并不知道文件的大小,接收数据的时候并不知道什么时候接收完一张图片了。 客户端和服务器之间需要做一些约定:

  • 发送端先约定图片的大小
  • 接收端的连接需要在接收到这么大的一个文件之后,才将这个全部写到文件里面,才算这次图片传输真的结束了。
  • 另外接收完图片之后,验证压缩水印处理可能要花很多的时间,我们需要有一个线程池来帮助处理这些,不能让业务处理占用连接和网络读写的时间,使得服务之间相互影响。

带有Process线程池的实现

    static ExecutorService executorService = Executors.newFixedThreadPool(8);
    //需要有个关于连接的数组来就记录size,acc
    static ConcurrentHashMap<SocketChannel, HashMap<String,Object>> channelMap = new ConcurrentHashMap<>();
      static ConcurrentHashMap<Future<?>,SocketChannel> tasks=new ConcurrentHashMap<>();
    
       if (key.isReadable()){
                    SocketChannel tcpSocket =(SocketChannel) key.channel();
                    HashMap<String,Object> prop = channelMap.get(tcpSocket);
                    ByteBuffer buffer = ByteBuffer.allocate(1024*4);
                    while (true){
                        int read = tcpSocket.read(buffer);
                        if (!(read >0)) break;
                        buffer.flip();
                        if (prop.get("size").equals(0L)){
                            long size = 0;
                            try {
                                size = buffer.getLong();
                            } catch (Exception e) {
                                System.out.println("需要按照指定的协议上传文件: size+文件");
                                tcpSocket.close();
                                break;
                            }
                            prop.put("size",size);
                            prop.put("fileChannel",FileChannel.open(Paths.get(size+".png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE));
                        }
                        if ((Long)prop.get("size")>0L) {
                            FileChannel fileChannel =(FileChannel) prop.get("fileChannel");
                            fileChannel.write(buffer);
                            buffer.clear();
                            Long size = (Long)prop.get("size");
                            Long acc = (Long)prop.get("acc");
                            prop.put("acc",acc+read);
                            System.out.println("进度 = " + ((acc - 8) * 1.0 / size * 1.0) * 100);
                        }
                    }
                    completed((Long)prop.get("size"), (Long)prop.get("acc"), tcpSocket,channelMap);
                }

  

    private static void completed(Long size,Long acc,SocketChannel tcpSocket,ConcurrentHashMap<SocketChannel,HashMap<String,Object> > channelMap) throws IOException, ExecutionException, InterruptedException {
        if (acc == size +8) {
            HashMap<String,Object> prop = channelMap.get(tcpSocket);
            Future<String> future = executorService.submit(new Processor(prop));
            //再使用一个数组,将future和channelMap放进去,
            tasks.put(future,tcpSocket);
            handleTasks();
        }
    }
    private static void handleTasks() throws InterruptedException, ExecutionException, IOException {
        //遍历task
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        for (Future<?> future1 : tasks.keySet()) {
            if (future1.isDone()){
                SocketChannel tcpSocket = tasks.get(future1);
                String o = future1.get().toString();
                buffer.put(o.getBytes());
                buffer.flip();
                tcpSocket.write(buffer);
                buffer.clear();
                //关闭文件 关闭 释放资源。。。
                tasks.remove(future1);
            }
        }
    }
}
class Processor implements Callable<String> {
    public String call() throws Exception {
          // 假设在处理业务
          int millis = new Random().nextInt(3000);
            System.out.println("sleep millis " + millis);
            Thread.sleep(millis);
        return "完成收图" ;
    }
}

要做到IO复用,需要一个数组保存文件的基本属性(比如大小、名字和上传进度等)和连接的对应关系; 使用了线程池之后,任务的执行会变得非常复杂

  • 等待业务逻辑执行完毕才返回数据给客户端
Future<String> future = executorService.submit(new Processor(prop));
            //再使用一个数组,将future和channelMap放进去,
            tasks.put(future,tcpSocket);
  • 需要统一处理task,不知道任务什么时候会处理完,需要select超时阻塞也要再调用task,比如
while (selector.select(1000)>=0){//假设一直阻塞,则任务可能得不到执行
    handleTasks();

select的超时时间是多少可能需要根据业务场景来具体指定,比如10ms或者1ms。

更优雅的封装

上面的代码实现很复杂,单一职责原则将代码拆分为Reactor类Acceptor类Handler类,这就是下面的模型了,就是Reactor 多线程处理模型。

阻塞的处理和网络模型设计——深入思考Java IO和NIO(二)

总结

本文通过一个具体的例子,一步一步阐述了NIO在网络通信中是怎么解决阻塞的API这个问题的, 解决阻塞的问题带来了很多性能的好处,但也带来了编程和理解上的一些复杂性。

并且为了提高文件服务器的鲁棒性和高可用性,也使用了Reactor 多线程处理模型。甚至为了更好的提高网络服务器的鲁棒性和高可用性,以及更好高效实现喝更完备的功能,还有像netty服务器这样的网络框架,可以看我写的Netty究竟是怎么运行的-连接流程的深入剖析

查漏补缺

关于Java NIO的内容,通过这两篇文章已经剖析完毕。是否已经了解到这些并形成记忆?

  • Java 流是BIO

  • 多次拷贝且阻塞

  • ByteBuffer是NIO

  • 内核用户态上下文切换

  • 来减少拷贝次数

  • 利用系统底层epoll机制

  • 将阻塞的语义变为非阻塞的语义

  • 感受阻塞和非阻塞的过程

  • 记住零拷贝的几个api

  • transferTo/From

  • 连接处理很复杂

  • while循环的套路

  • Rector模型来帮忙