深入理解NIO & Netty学习netty一直走了不少弯路,究其原因就是Netty牵扯很多计算机底层原理,在没有这些底
学习netty一直走了不少弯路,究其原因就是Netty牵扯很多计算机底层原理,在没有这些底层知识做基础的情况下,妄图从使用出发了解Netty,结局只能像我一样 重头来过学习NIO & Netty。
下面会先把一些前置的知识做详细说明,设计操作系统,计算机网络等
-
IO模型
对IO模型的讨论可以分为两个维度,第一个是如何分配线程去执行IO,第二个是这个线程具体是如何执行IO的。
-
计算机如何分配线程去执行IO?
最直观的想法肯定是分配一个线程处理IO任务,但仔细想这其中还有些问题——谁来处理分配线程的任务?每个IO任务占用一个线程必然是效率不高的,能否多个IO任务在一个线程管理下完成?
要梳理清楚这些问题的来龙去脉,就需要从最简单的模型出发,一步步了解现在的结论是如何得出的。
-
最基本的Socket模型
Socket可以说是编码过程中离底层通信最近的一层了,再往下就是IPv4,IPv6,TCP,UDP的选择。
什么是Socket? 本质上是一个抽象层,不同操作系统可以通过不同方式实现Socket API 在Unix/Linux中一切皆文件,都可以通过打开,读写,关闭模式来完成操作;同理一个Socket本质就是一个文件,创建网络连接就是打开某个socket文件;通过Socket向外传输数据就是向文件中write。 通信的基本步骤:
- 创建Socket:
int main() { int sockfd; // Socket 描述符 // 创建 TCP Socket if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { // AF_INET表示IPv4地址族,SOCK_STREAM表示基于流的TCP... perror("socket creation failed"); exit(EXIT_FAILURE); } printf("Socket created successfully with descriptor: %d\n", sockfd); close(sockfd); // 关闭 Socket return 0; }
- 绑定Socket: 使用bind()将socket和本地地址+端口绑定,便于客户端连接到这个Socket
- 监听Socket: 使用listen()将Socket设置为监听状态,以等待客户端的连接请求
- 接受连接:使用accept()函数 接受客户端的连接请求,并创建一个新Socket用于与客户端通信,将这个socket的文件描述符返回给客户端
- 发送与接收数据;API分别为send() & recv()
- 关闭Socket,使用close()关闭
对于基于IPv4 & TCP的socket连接而言,连接的过程简单来说就是服务器分配内部资源(新的Socket = 一个文件)并将描述符通知客户端;但是后续对这个socket文件的IO是谁来处理?
-
fork()出子进程,专门用于新socket的读写
显而易见,进程的调度成本较大
-
fork()出子线程
即使使用线程池复用创建的线程,当客户端数量激增时也会无法解决
-
一个进程来维护多个Socket —— IO多路复用
基本思想是:既然用一个进程/线程管理一个Socket大材小用了,总会出现Socket任务未完成,进程被阻塞,当使用轮询解决阻塞时。无论频率如何都是对CPU的空转;假如能够当socket任务完成时,线程就能感知到并建立连接用来read,那就是对线程最大程度的利用(也就是事件驱动);
如果让一个进程监听多个Socket,监听到IO任务后再由其他进程/线程完成后续IO;这种操作使得负责监听的进程/线程被充分利用,其余负责IO的进程/线程也可以由任务驱动,只关注IO过程。
后续Reactor & Proactor就是对IO多路复用的更高级实践
操作系统内核提供给用户态三种多路复用的系统调用:select/poll/epoll;接下来我们仔细看看
- select:全量传递 将已经连接的Socket放到一个文件描述符集合(使用位图优化)中,调用select函数将这个集合拷贝到内核里;内核遍历文件描述符集合检查有无时间产生,将这个socket标记为可读或者可写;接着把整个集合拷贝回用户态,用户态再遍历找出可读可写的socket
- poll:不用位图用链表
- epoll:增量传递
-
epoll在内核中维护有待检测的socket,就不需要用户态每次全量的复制了;内核中文件描述符用红黑树组织,使得增删改的时间复杂度都是logn。
-
epoll强化了事前驱动机制 —— 传统poll的过程需要用户态遍历整个集合才知道哪些socket可以执行IO,但epoll尝试将内核返回给用户态的所有socket都可以IO 具体做法是:内核中维护一个就绪事件列表,用来记录红黑树上就绪的socket。
-
- 创建Socket:
-
-
线程具体是如何执行IO的?
典中典之五种IO模型:
-
BIO
-
非阻塞IO
-
IO多路复用
从这个角度看,多路复用就是非阻塞IO的增强版,它将多个非阻塞IO重叠起来,交给一个进程/线程管理状态。而从上面关于“如何分配线程执行IO”的讨论中,多路复用是”线程与socket1:1“的升级版
感觉这就是传说中的“银弹”,一个技术的出现解决各种各样的问题
-
异步IO
当进程发起一个IO操作,进程直接返回,当内核处理完IO操作后,由内核将数据复制到用户态;一切完毕后,通知进程结果
-
信号驱动IO
信号驱动IO使用信号机制来实现部分异步IO,应用程序通过向内核注册信号处理函数(回调)来处理IO事件。当IO操作完成后,内核发送一个信号 通知应用程序,应用程序触发相应的函数,由应用程序将准备好的数据复制到用户态。
和异步IO的区别就在于copy的过程是否是应用程序线程来完成的,也就是说是否会在copy的过程中被阻塞;AIO中,内核完成了所有工作,应用程序线程全程不被阻塞,而信号驱动IO会在copy的过程中短暂阻塞。
IO操作的两步(等待数据准备完成 & 将数据从内核态拷贝到用户态)中,等待的时间一般比copy的时间大很多,所以一般IO优化的思路都是降低等待时间(即使是号称non-blocking IO,在多路复用中select到有IO事件后,也会阻塞应用程序完成copy操作,所以并非纯粹的NIO)
-
除此之外,javaer对于IO模型可能更熟悉的叫法有:BIO、NIO、AIO;因为这三个是基于以上IO模型封装的Java库,开发者直接接触的。我们分别看下
- BIO: 我们在学网络通信基础时,通过ServerSocket & Socket封装TCP,UDP数据包的代码就是最基本的BIO,示例如下:
服务器:
public class BIOServer { public static void main(String[] args) throws IOException { //创建服务端套接字 & 绑定host:port & 监听client ServerSocket serverSocket = new ServerSocket(9999); //等待客户端连接到来 Socket socket = serverSocket.accept(); //拿到输入流 -- client write to server InputStream in = socket.getInputStream(); //拿到输出流 -- server write to client OutputStream out = socket.getOutputStream(); //以下读写任务可以交给fork的子线程 while (true){ //将数据读到buf中 byte[] buf = new byte[32]; //server read from client int len = in.read(buf); //如果len == 1,说明client已经断开连接 if(len == -1){ throw new RuntimeException("连接已断开"); } System.out.println("recv:" + new String(buf, 0, len)); //将读出来的数据写回给client //如果不使用偏移量,可能会将buf中的无效数据也写回给client out.write(buf, 0, len); } } }
客户端:
public static void main(String[] args) throws IOException, InterruptedException { //创建客户端套接字 & 连接服务器 Socket socket = new Socket("127.0.0.1", 9999); //拿到输入流 -- server write to client, client read from server InputStream in = socket.getInputStream(); //拿到输出流 -- client write to server OutputStream out = socket.getOutputStream(); byte[] send = "hello".getBytes(); while (true){ //client write to server out.write(send); byte[] buf = new byte[32]; //read from server int len = in.read(buf, 0 ,send.length); //如果len == 1,说明server已经断开连接 if(len == -1){ throw new RuntimeException("连接已断开"); } System.out.println("recv:" + new String(buf, 0, len)); Thread.sleep(1000); } }
代码剽窃自CSDN博主,原文:深入浅出JAVA BIO、NIO和AIO(附详细代码实例)_java bio nio aio demo-CSDN博客
上述代码逻辑就和我们最开始说明的最基础的Socket通信一致:服务器一个主线程(代码层面抽象为ServerSocket)负责建立连接,fork的子线程(accept后产生的socket)完成接下来的等待数据 + 数据复制操作(也就是具体IO)。 这种写法问题很明显:一个client建立一个子线程必然是不行的,使用线程池优化也只是治标不治本,于是出现基于其他吞吐量更高的IO模型的API —— NIO 2. NIO
- NIO底层基于IO多路复用,但将过程中许多部分抽象成上层可使用的组件
核心的三大组件有:buffer,channel和selector,分别是多路复用模型中部分功能的映射或增强
-
channel: channel可以看作是对原始通过stream进行文件IO的一种增强;对于网络IO而言,往往一个服务器的socket对应一个inputStream & outputStream,用来完成对这个描述符所虚拟表示的文件进行读写;而channel不同点在于 它是全双工的,使用更方便(这个可能不是...)并且通过在channel两端使用buffer,可以增强读写效率。
实话实说我没理解到channel高深的地方,希望有大佬可以指点
-
buffer: buffer的工作和channel紧密联系。buffer被放置在一个channel的前后用来给IO提速,意味着使用channel读的时候是从buffer读到的,并且数据也是先到达buffer里的,使用channel写的时候也是先写到buffer,再一举写到channel,最后仍是写到目标文件的buffer后再写入文件。
buffer的构成:
- 我们Java中使用的buffer都是按照基本数据类型封装的Buffer对象,底层就是按照数据类型开辟的一块块内存区(使用确定元素类型的数组本身也是为了给IO提速,使得对buffer内部的读写能够更快)
- Buffer有两种内存分配的实现方式,一种是JVM堆内存中分配缓冲区,第二种是在直接内存分配
- buffer的API间博文:Java NIO 之 Buffer(缓冲区) - SnailClimb - 博客园 (cnblogs.com)
-
selector:
选择器,又名“多路复用器”
- 底层是select线程负责管理多个socket,NIO中抽象出一个selector监控多个channel
- NIO包在代码层面赋予多路复用更高的可操作性,体现在如下
-
可将channel注册到selector,明确哪些路被复用了
channel.configureBlocking(false); SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
-
注册时可以声明感兴趣的事件,这在多路复用的基础上让事件和channel的对应关系更加灵活了。具体事件类型有四种:Connect,Accept,Read & Write。
-
selector的select方法:在内核中选择的过程就是一次询问每个通道是否就绪,过程中应用程序线程是被阻塞的,可以调用wakeup()打断阻塞。
-
-
-
AIO:
AIO人称NIO 2;是真正非阻塞的IO模型
正如上面对IO模型的讨论,多路复用做不到非阻塞(不光是select的过程会阻塞,将数据从内核复制会用户态的过程也会阻塞用户线程);而Java-AIO基于异步IO模型,可以做到真正非阻塞。
TODO:为什么AIO不如NIO流程
-
-
DMA
-
什么是DMA: DMA:直接内存访问技术 / 配合协处理器,指在进行IO设备和内存的数据传输过程中,将数据搬运的指挥工作全都交由DMA控制器,而CPU不再参与。
-
早期没有DMA的场景:
- CPU 发出对应的指令给磁盘控制器,然后返回;
- 磁盘控制器收到指令后,于是就开始准备数据,会把数据放入到磁盘控制器的内部缓冲区中,然后产生一个中断;
- CPU 收到中断信号后,停下手头的工作,接着把磁盘控制器的缓冲区的数据一次一个字节地读进自己的寄存器,然后再把寄存器里的数据写入到内(PageCache),再将PageCache的数据拷贝到用户缓冲区;而在数据传输的期间 CPU 是无法执行其他任务的。 问题:
-
-
为什么需要磁盘控制器以及磁盘设备的缓存,直接由CPU将磁盘读出的字节装入内存不行吗?
由于CPU的工作速度比外设快太多,所以只能采用CPU发起指令,磁盘控制器异步加载的模式(其实最理想的模式是:数据从外设直接到CPU完成计算,连内存都不需要了);另外磁盘控制器作为CPU和外设中间一层,可以将各种外设采集的信息转化成CPU可以识别的。
-
-
-
为什么CPU需要全量的参与磁盘控制器向内存的数据复制?难道不能由CPU的一个指令将控制器缓存的数据复制到内存吗?
网上查到的答案是:外设没有能力直接访问系统内存,所有数据必须通过CPU才能加载入内存。
-
-
-
有没有办法让CPU从中解脱?
DMA技术,将数据复制的任务委托给专门的DMA控制器
流程如下:
- 用户进程调用 read 方法,向操作系统发出 I/O 请求,请求读取数据到自己的内存缓冲区中,进程进入阻塞状态;
- 操作系统收到请求后,进一步将 I/O 请求发送 DMA,然后让 CPU 执行其他任务;
- DMA 进一步将 I/O 请求发送给磁盘;
- 磁盘收到 DMA 的 I/O 请求,把数据从磁盘读取到磁盘控制器的缓冲区中,当磁盘控制器的缓冲区被读满后,向 DMA 发起中断信号,告知自己缓冲区已满;
- DMA 收到磁盘的信号,将磁盘控制器缓冲区中的数据拷贝到内核缓冲区中,此时不占用 CPU,CPU 可以执行其他任务;
- 当 DMA 读取了足够多的数据,就会发送中断信号给 CPU;
- CPU 收到 DMA 的信号,知道数据已经准备好,于是将数据从内核拷贝到用户空间,系统调用返回; 区别在于:CPU不再参与将数据从磁盘控制器缓冲区搬运到内核空间的工作,而是交由DMA完成;但仍然要完成由PageCache内核缓冲区复制到用户缓冲区的任务。
-
-
于是整个IO的流程简述如下:
应用程序在内部缓存中没有查到数据,进行系统调用(int 0x80触发中断)尝试从磁盘获取;在内核中有大量的PageCache(默认不释放,满了才淘汰)用来缓存硬盘数据,当应用程序的系统调用在PageCache中没找到数据时,才会触发缺页,执行真正的磁盘IO——首先将应用程序线程挂起,保护现场,CPU将读取指令交给DMA协处理器,DMA完成通知硬件驱动,再操作硬件把数据读取到硬件缓冲区,之后由DMA负责将数据复制到PageCache,再通知CPU,由CPU将数据复制到用户态application中,并唤醒被挂起的线程,等待接下来的线程调度。
-
Page Cache
pagecache作为内存中对磁盘的缓存,其特点有:
- 操作系统会将未使用的内存全分配给PageCache,因此其大小动态变化,且能容纳不少东西
- 磁盘IO有两种方式,
- buffered IO 缓冲IO : 磁盘数据经过PageCache
- direct IO 直接IO : 读写都不经过pageCache缓冲
- 缓冲的刷盘策略:
- 用户进程调用sync()和fsync()时
- 空闲内存大小低于设定阈值
- 脏页在内存中驻留时间超过设定阈值
- mmap 一种内存映射文件的方法,mmap会将一个文件映射进内存中的多个pagecache页上,用户此时有权利直接操作pageCache的内容,内核会负责刷入磁盘
Java 与 PageCache 的矛盾
Java中的IO默认都是buffered IO,且在写入PageCahce后就返回写入成功,刷盘工作委托给操作系统;同时提供类似flush()的API可以强制刷盘。
下面做一个实验说明这个过程;以及磁盘IO相比内存读写的耗时。
public class Main { public static void main(String[] args) throws IOException { long start = System.currentTimeMillis(); testBasicIO(); System.out.println(System.currentTimeMillis() - start); start = System.currentTimeMillis(); testBufferedIO(); System.out.println(System.currentTimeMillis() - start); start = System.currentTimeMillis(); testRealIO(); System.out.println(System.currentTimeMillis() - start); } public static void testBasicIO() throws IOException { FileOutputStream outputStream = new FileOutputStream(new File("basic.txt")); for (int i = 0; i < 1000; i++) { outputStream.write("12345".getBytes(StandardCharsets.UTF_8)); } } public static void testBufferedIO() throws IOException { FileOutputStream outputStream = new FileOutputStream(new File("buffer.txt")); BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream); for (int i = 0; i < 1000; i++) { bufferedOutputStream.write("12345".getBytes(StandardCharsets.UTF_8)); } } public static void testRealIO() throws IOException { RandomAccessFile file = new RandomAccessFile("real.txt","rw"); for (int i = 0; i < 1000; i++) { file.write("12345".getBytes(StandardCharsets.UTF_8)); file.getChannel().force(true); } } }
上面提供了三个IO写文件的API,BasicIO使用最直接的IO方式,BufferIO在应用程序层面添加一层缓存,减少IO频率,RealIO使用NIO包中getChannel().force()在每次写入内存后强制刷盘;以下是测试结果
结果和认识一致,一层OS层面的Buffer将结果加快几千倍,一层Java层面的Buffer将结果又加快几十倍。可以通过strace命令 具体查看过程中的系统调用。结果如下
-
basicIO
可以看到,就算系统调用了write,但其实也没有刷到磁盘里(因为刷盘的时间根本不可能这么短)
-
BufferedIO
可以看到Java层面的Buffer大小是8190,每次调用write也只是刷到pageCache
-
RealIO
可以看出,只有系统调用fSync()或者sync()方法,才能强制刷盘
但是使用Buffer的风险在于不安全,假如在刷盘前设备宕机就很尴尬——Java中显示写入成功(没有任何报错),但再启动设备后文件并不存在
测试思路:当调用write()写完后,强制关机(没想到虚拟机也提供强制关机的功能),再启动看看有无成功写入
//测试过程TODO
-
mmap
mmap是一种将文件映射到内存pageCache中,对用户而言操作这片内存区域就无需考虑刷盘的事情。
原本用户读写的流程是:应用程序使用open,write,read等系统调用,将用户态的数据赋值给内核态(或者相反),数据到达内核态PageCache后由操作系统完成刷盘。mmap提出的优化是:假如用户能够直接访问内核态的pageCache,就可以减少一次数据复制,顺利将用户态的数据就当作内核态(因为就写到了PageCache中);
-
mmap的API说明:
参数大致含义为
- *addr 告知内核选取哪块空间进行映射,addr表示空间开头指针
- len:需要映射的文件的长度
- filedes:文件描述符
- off:映射部分在文件中的便宜量
- prot:保护模式,可选可读,可写,可执行...
- flag:此映射文件是否共享
API示例:
可以看到直接修改p的内容,就可以实现更改具体文件。 而过程中没有任何传统VFS的接口,如open,read,write等
详情见大佬视频:www.bilibili.com/video/BV1Fk…
-
零拷贝
上述各种技术都是在给它铺垫,或者是零拷贝是集成了上述技术的一大实践。零拷贝是用来优化网络通信过程的。
-
我猜想下什么技术都没有的,最早期网络通信
- 用户发现发现想发的内容不在内存,触发缺页中断,试图从磁盘中加载相应文件。
- 用户线程在执行int 0x80后进入阻塞状态,等待数据准备好。
- int 0x80 执行加载文件的系统调用,内核运行磁盘驱动代码,磁盘先将文件加载到磁盘缓冲区,
- CPU负责将数据通过CPU内的寄存器复制【1】到内核缓冲区,
- 唤醒用户线程,由用户线程负责将数据再复制【2】到用户进程所在的内存空间。
- 经过可能的应用程序操作后,调用socket相关的系统调用;
- 首先将数据复制【3】到内核态的socket缓冲区,再由CPU将数据复制【4】到网卡,最后由网卡编码,发出。
-
DMA出现,CPU无需负责从各个IO设备缓冲区复制到内存的工作(1,4)
-
mmap出现,CPU无需负责数据在用户态和内核态的复制(2)
基于以上技术,仍然无法避免从mmap后的共享空间到socket缓冲区的复制;但是仔细想想,这两块空间都在内存中,如果用来扫描socket缓冲区的线程(进程)有办法直接找到mmap共享空间,那就是三方共享,完全不需要任何复制。
但可惜没有查到有这个技术,下面聊的sendfile也只是内核缓冲区和socket缓冲区共享,没有用户态参与就意味着程序无法修改这个文件;这样设计避免了用户态和内核态的两次切换,但也导致不太灵活;为什么没有sendfile + mmap 的终极版本呢?暂不明
第二种零拷贝的实现方案,就是当只需要文件传输,不需要文件在用户态经历任何操作时,可以使用sendfile系统调用,数据在内核缓冲区无需到应用程序,同时只需要把一些刻画数据的元信息复制到Socket缓冲区,比如offset,length等等,之后发送的任务就委托给DMA。 再加上SG-DMA这项网卡技术,使得socket缓冲区的数据可以引用内核缓冲区的数据,从而做到零拷贝。
-
-
使用零拷贝技术的项目
Kafka
在之前的博客里,我们说过Kafka的broker处理消息的方式是基于磁盘的(为了处理海量数据不得不舍弃基于内存的模式,这也是RocketMQ,Kafka和RabbitMQ的区别之一),基于磁盘却依然那么快似乎有违常识,但这恰好说明前辈开发者们敢想敢做。具体快原因就是在IO中做了许多优化,我认为可以分为微观优化和宏观优化:
-
微观:
- 写入时通过mmap减少复制次数
- 读取时通过零拷贝(sendfile)减少复制次数
- 顺序写每个segment文件
-
宏观:
- 批处理 + 异步的数据处理方式,更适合大数据场景
- 多partition并行处理
-
-
网络模型 Reactor & Proactor
Netty
-
Netty 物理架构(代码的分层)
三个模块为:
- Core 核心层
提供底层网络通信的通用抽象和实现
- Protocol Support 协议支持层
覆盖主流协议的编解码实现
- 传输服务层
提供了网络传输能力的定义和实现方法
- Core 核心层
-
Netty 逻辑架构(数据流转的方式)
简述流程:
-
网络通信层
客户端和服务器启动时分别由Bootstrap和ServerBootStrap引导,配置好各自的各种属性;其中客户端绑定一个EventLoop,服务器绑定两个EventLoopGroup,一主一从。
客户端的消息通过网络到达服务端的网卡,之后由OS复制到内核缓冲区,此时表示数据已准备好(或者是准备好建立连接了)
-
事件调度层
事件调度层中,Boss EventLoopGroup的一个EventLoop执行Select事件循环,检测到内核缓冲区有数据准备好了,分配group中一个EventLoop完成channel的建立;然后将这个channel注册到worker EventLoopGroup中,绑定某个EventLoop;
这个Eventloop中,selector多路复用模式下检测到数据准备好时,启动Eventloop内的线程完成IO任务。
-
服务编排层 创建channel时,将channel绑定到某个channelPipeline上,ChannelPipeline上绑定有许多入站和出站的Handler。当EventLoop对这个channel进行读写操作时,沿着先进后出的方式经过所有handler;最后返回给客户端。
特点:
- 两个Reactor复杂的数据流转 但内容都是控制流,并没有数据流的来回操作;数据流只经过各种Handler
- 从流程可见:开发者的业务代码只需要关注Handler如何编写,其他网络通信层和时间调度层的代码十分相近。
-
-
详解组件
示例代码应该在下面链接都有
-
ServerBootStrap
示例代码:
public class SimpleHttpServer { public static void main(String[] args) throws Exception { // 创建两个线程组,用于处理不同的任务 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (1) b.group(bossGroup, workerGroup) // (2) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new HttpRequestDecoder(), new HttpObjectAggregator(65536), new HttpResponseEncoder(), new SimpleHttpRequestHandler() ); } }); // 绑定端口并启动服务器 ChannelFuture f = b.bind(8080).sync(); // (5) // 关闭服务器 f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private static class SimpleHttpRequestHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { FullHttpRequest request = (FullHttpRequest) msg; String responseContent = "<html><body><h1>Hello, Netty!</h1></body></html>"; ByteBuf byteBuf = Unpooled.copiedBuffer(responseContent.getBytes()); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, responseContent.length()); ctx.writeAndFlush(response); } } } }
整个代码由bootstrap+建造者模式构造,主要完成三个工作
- 配置线程池
- ChannelPipeline装载handler
- 端口绑定
-
-
EventLoop
具体是这样:一个EventLoop包括两个核心组件:selector事件分发器 + testQueue任务队列
【我去,刚发现有点像Skynet的一个Actor,也就是一个服务,都是用一个消息队列接收外部消息,不过Skynet统一由全局线程池执行每个事件,这里的EventLoop中单独有线程执行】
每个EventLoop中只有一个线程在执行,这个线程需要完成的任务是:1:监听绑定的channel事件有无准备好;2:处理这些事件 3:EventLoop还TM能当作一个任务执行器,执行其他和IO无关的任务。
源码片段
protected void run() { int selectCnt = 0; while(true){ try { if (!this.hasTasks()) { strategy = this.select(curDeadlineNanos); } break; } finally { this.nextWakeupNanos.lazySet(-1L); } case -2: break label792; } } try { if (strategy > 0) { this.processSelectedKeys(); } } finally { ranTasks = this.runAllTasks(); } } else if (strategy > 0) { long ioStartTime = System.nanoTime(); boolean var75 = false; try { var75 = true; this.processSelectedKeys(); var75 = false; } finally { if (var75) { long ioTime = System.nanoTime() - ioStartTime; this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); } } long ioTime = System.nanoTime() - ioStartTime; ranTasks = this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); } else { ranTasks = this.runAllTasks(0L); } ```
原本方法很长,部分代码我删掉后看起来面目全非了,不过可以看到核心的几个功能:
方法在while(true)中实现了三个功能
-
源码第六行 : select(curDeadlineNanos)
private int select(long deadlineNanos) throws IOException { if (deadlineNanos == Long.MAX_VALUE) { return this.selector.select(); } else { long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0L ? this.selector.selectNow() : this.selector.select(timeoutMillis); } }
跟着核心方法一路向下,追踪到:
选择第一个wepoll(Windows版本实现的epoll) 进入JNI
static native int wait(long h, long pollAddress, int numfds, int timeout) throws IOException;
int epoll_wait(HANDLE ephnd, struct epoll_event* events, int maxevents, int timeout) { ts_tree_node_t* tree_node; port_state_t* port_state; int num_events; if (maxevents <= 0) return_set_error(-1, ERROR_INVALID_PARAMETER); if (init() < 0) return -1; tree_node = ts_tree_find_and_ref(&epoll__handle_tree, (uintptr_t) ephnd); if (tree_node == NULL) { err_set_win_error(ERROR_INVALID_PARAMETER); goto err; } port_state = port_state_from_handle_tree_node(tree_node); num_events = port_wait(port_state, events, maxevents, timeout); ts_tree_node_unref(tree_node); if (num_events < 0) goto err; return num_events; err: err_check_handle(ephnd); return -1; }
由于是win模拟Linux的epoll,所以虽然没有明显的epoll函数,但是整体流程和epoll相近
GPT解释如下:
- 查找树节点:使用
ts_tree_find_and_ref
函数查找epoll__handle_tree
树中对应的ephnd
节点,并获取引用。 - 获取状态:从找到的树节点中提取出
port_state
结构体。 - 等待事件:调用
port_wait
函数等待最多maxevents
个事件的发生,超时时间为timeout
。
- 查找树节点:使用
-
ProcessSelectedKeys() 用来处理IO事件
跟踪链路:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { NioEventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable var6) { return; } if (eventLoop == this) { unsafe.close(unsafe.voidPromise()); } } else { try { int readyOps = k.readyOps(); if ((readyOps & 8) != 0) { int ops = k.interestOps(); ops &= -9; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & 4) != 0) { unsafe.forceFlush(); } if ((readyOps & 17) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException var7) { unsafe.close(unsafe.voidPromise()); } } }
public final void read() { ChannelConfig config = AbstractNioByteChannel.this.config(); if (AbstractNioByteChannel.this.shouldBreakReadReady(config)) { AbstractNioByteChannel.this.clearReadPending(); } else { ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline(); ByteBufAllocator allocator = config.getAllocator(); RecvByteBufAllocator.Handle allocHandle = this.recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(AbstractNioByteChannel.this.doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { AbstractNioByteChannel.this.readPending = false; } break; } allocHandle.incMessagesRead(1); AbstractNioByteChannel.this.readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while(allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { this.closeOnRead(pipeline); } } catch (Throwable var11) { Throwable t = var11; this.handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!AbstractNioByteChannel.this.readPending && !config.isAutoRead()) { this.removeReadOp(); } }
这里的read就是基于nio的方式,通过巧妙使用bytebuf,完成对直接内存的读取
-
runAllTasks(): 当处理完IO事件后,再处理异步任务队列
我们可以通过eventLoop的execute方法提交一个异步任务
EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.execute(() -> { // 异步任务的执行逻辑 // ... });
也可以用schedule方法提交定时任务
ScheduledFuture\<?> scheduledFuture = channel.eventLoop().schedule( () -> { // 延时任务的逻辑 System.out.println("延时任务执行"); }, 10, // 延时时间 TimeUnit.SECONDS );
真实代码:
protected boolean runAllTasks(long timeoutNanos) { this.fetchFromScheduledTaskQueue(); Runnable task = this.pollTask(); if (task == null) { this.afterRunningAllTasks(); return false; } else { long deadline = timeoutNanos > 0L ? this.getCurrentTimeNanos() + timeoutNanos : 0L; long runTasks = 0L; long lastExecutionTime; while(true) { safeExecute(task); ++runTasks; if ((runTasks & 63L) == 0L) { lastExecutionTime = this.getCurrentTimeNanos(); if (lastExecutionTime >= deadline) { break; } } task = this.pollTask(); if (task == null) { lastExecutionTime = this.getCurrentTimeNanos(); break; } } this.afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } }
核心作用:从任务队列中循环取出任务(类型为Runnable),根据设定的事件规则执行这些任务; 核心代码:
protected static void safeExecute(Runnable task) { try { runTask(task); } catch (Throwable var2) { logger.warn("A task raised an exception. Task: {}", task, var2); } } protected static void runTask(@Execute Runnable task) { task.run(); }
所谓safeExecute就是在execute外加了一层try-catch,感觉可能后期还有更完善的版本来处理发生异常后的情况
随记: Java虽然没有函数式编程,但是通过将函数抽象成一个Runnable对象,也可以实现和函数式编程相近的行为;但是仅仅由Runnable无法实现参数传递和返回值传递;问题根源在于,我们使用Runnable接口其实是重写了其中的run方法,而run方法本身没有参数,想动态设置参数在Java中根本不可能,所以解决方法之一是:自定义一个函数式接口,其中的run方法接收参数,如下:
@FunctionalInterface
interface MyFunction {
void execute(int a, int b);
}
public class Example {
public static void main(String[] args) {
MyFunction sumFunction = (a, b) -> System.out.println("Sum: " + (a + b));
// Pass parameters to the function
executeFunction(sumFunction, 10, 20);
}
public static void executeFunction(MyFunction function, int a, int b) {
function.execute(a, b);
}
}
这确实有点太麻烦了,Java还提供了一些内置的函数式接口,比如Function,Consumer,Supplier等 其中,Function可以接收一个参数并返回一个结果
import java.util.function.Function;
public class Example {
public static void main(String[] args) {
Function<Integer, String> intToString = (i) -> "Converted: " + i;
// Pass a parameter and get the result
String result = intToString.apply(10);
System.out.println(result);
}
}
我们可以将多个参数封装到一个集合中,实现动态参数的效果
但是绕了一圈实现的函数式编程,还是不如真正函数式编程那样简单灵活
- 在写lua时,一个函数接收几个参数,都是什么类型,返回几个参数,又都是什么类型,这些信息只在函数定义阶段明确,而在调用时全然不知,只靠程序员确保两端能够对齐
- 主要原因在于lua这种脚本语言没有编译期间校验的能力,也没有完全OOP需要承担的桎梏;而Java在编译阶段就做了许多安全校验,OOP中对类型的明确也强化了这种校验
总结下来,EventPool两个功能为:事件轮询(不停检查channel上有无事件,当事件发生时,调用对应的handler进行处理,事件可能是建立连接,IO事件等);任务调度;两个功能都在一个while(true)中执行。
还有很多知识点待补充,文中有理解错误希望大佬可以指出
转载自:https://juejin.cn/post/7407271112994291738