likes
comments
collection
share

一看就会的Netty源码

作者站长头像
站长
· 阅读数 85

整理下Netty,之前写了几篇博客,翻来翻去太累了,这里我就整理了下整理在了一起,并且,去掉了繁杂了源码解读,demo我也顺便精简了一下,完全当一个备忘录来看,完全没有一点点的源码。方便快速会议和整理,如果想看的话这里附上前几个博客的链接,估计也没谁想看

C中网络编程

这里是伪代码!伪代码!伪代码!懂个大概即可,麻烦各位dalao就不要深究,这是我之前一篇博客的精简(链接),再次重申,以下c部分的代码只做梳理使用及api的介绍。

BIO|server

struct sockaddr_in stSockAddr;
//创建套接字
int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
//初始化 server_addr
memset(&stSockAddr, 0, sizeof(struct sockaddr_in));
//绑定
bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)
//监听状态
listen(SocketFD, 10)
while(1) {
    //阻塞
    client = accept(serverSocket, (struct sockaddr*)&clientAddr, (socklen_t*)&addr_len);
    while(1) {
        recv(client, buffer, 1024, 0);
        //dosomething
        send(client, buffer, strlen(buffer), 0); //服务端也向客户端发送消息
    }
}

上述api list

  • socket() 创建一个新的确定类型的套接字,返回套接字。
  • bind() 为一个套接字分配地址。
  • listen()开始监听可能的连接请求。
  • accept()当应用程序监听来自其他主机的面对数据流的连接时,必须用 accept()函数初始化连接。
  • recv()函数用于从已连接的套接字中接收信息
  • send()函数用于向处于连接状态的套接字中发送数据

BIO|client

struct sockaddr_in stSockAddr;
//创建套接字
int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
//连接服务器
connect(serverSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)
//发送
send(serverSocket, sendbuf, strlen(sendbuf), 0); //向服务端发送消息

上述api list

  • socket() 创建一个新的确定类型的套接字,返回套接字。
  • connect()函数用于客户端,该函数的功能为向服务器发起连接请求。
  • send()函数用于向处于连接状态的套接字中发送数据

Epoll|server

struct epoll_event tep,ep[MAX_OPEN_FD];
listenfd = socket(AF_INET,SOCK_STREAM,0);
fcntl(listenfd, F_SETFL, fdflags | O_NONBLOCK);
bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
listen(listenfd,20);
efd = epoll_create(MAX_OPEN_FD);
epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep);
for (;;){
    size_t nready = epoll_wait(efd,ep,MAX_OPEN_FD,-1);
    for (int i = 0; i < nready; ++i){
        // 如果是新的连接,需要把新的socket添加到efd中
        if (ep[i].data.fd == listenfd ){
            connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);
            tep.events = EPOLLIN;
            tep.data.fd = connfd;
            ret = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep);
        }// 否则,读取数据
        else
        {
            connfd = ep[i].data.fd;
            int bytes = read(connfd,buf,MAXLEN);
            // 客户端关闭连接
            if (bytes == 0){
                ret =epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL);
                close(connfd);
            }
            else
            {
                for (int j = 0; j < bytes; ++j)
                {
                    buf[j] = toupper(buf[j]);
                }
                // 向客户端发送数据
                write(connfd,buf,bytes);
            }
        }
}
  • fcntl()打开文件描述符,具体操作由cmd决定
  • epoll_create()在内核中创建epoll实例并返回一个epoll文件描述符,结构体为eventpoll
  • epoll_ctl()向 epfd 对应的内核epoll 实例添加、修改或删除对 fd 上事件 event 的监听。
  • epoll_wait()等待其管理的连接上的 IO 事件
  • accept()当应用程序监听来自其他主机的面对数据流的连接时,必须用 accept()函数初始化连接。
  • read()函数用于从已连接的套接字中接收信息
  • write()函数用于向处于连接状态的套接字中发送数据

Java Nio对于c的封装

操作系统底层就开了这么多接口,Java这种汇编语言底层也是调用这些方法,所以要看下怎么封装就理解几个头疼的对象(ByteBuffer,Channel,Selector,SelectionKey)分别代表什么 这里之前的博客(链接)也分析了,这里只简写

public class ServerConnect  
{  
    public static void main(String[] args)  
    {  
        selector();  
    }  
    public static void handleAccept(SelectionKey key) throws IOException{  
        ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();  
        SocketChannel sc = ssChannel.accept();  
        sc.configureBlocking(false);  
        sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));  
    }  
    public static void handleRead(SelectionKey key) throws IOException{  
        SocketChannel sc = (SocketChannel)key.channel();  
        ByteBuffer buf = (ByteBuffer)key.attachment();  
        long bytesRead = sc.read(buf);  
        while(bytesRead>0){  
            buf.flip();  
            while(buf.hasRemaining()){  
                System.out.print((char)buf.get());  
            }  
            buf.clear();  
            bytesRead = sc.read(buf);  
        }  
        if(bytesRead == -1){  
            sc.close();  
        }  
    }  
    public static void handleWrite(SelectionKey key) throws IOException{  
        ByteBuffer buf = (ByteBuffer)key.attachment();  
        buf.flip();  
        SocketChannel sc = (SocketChannel) key.channel();  
        while(buf.hasRemaining()){  
            sc.write(buf);  
        }  
        buf.compact();  
    }  
    public static void selector() {  
        Selector selector = null;  
        ServerSocketChannel ssc = null;  
        try{  
            selector = Selector.open();  
            ssc= ServerSocketChannel.open();  
            ssc.socket().bind(new InetSocketAddress(8080));  
            ssc.configureBlocking(false);  
            ssc.register(selector, SelectionKey.OP_ACCEPT);  
            while(true){  
                if(selector.select(3000) == 0){  
                    continue;  
                }  
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();  
                while(iter.hasNext()){  
                    SelectionKey key = iter.next();  
                    if(key.isAcceptable()){  
                        handleAccept(key);  
                    }  
                    if(key.isReadable()){  
                        handleRead(key);  
                    }  
                    if(key.isWritable() && key.isValid()){  
                        handleWrite(key);  
                    }  
                    if(key.isConnectable()){  
                        System.out.println("isConnectable = true");  
                    }  
                    iter.remove();  
                }  
            }  
        }catch(IOException e){  
            e.printStackTrace();  
        }finally{  
            try{  
                if(selector!=null){  
                    selector.close();  
                }  
                if(ssc!=null){  
                    ssc.close();  
                }  
            }catch(IOException e){  
                e.printStackTrace();  
            }  
        }  
    }  
}

直接说结论

  • Channel:对socket的封装
    • ServerSocketChannel.open();<--->socket()
    • ssc.socket().bind()<--->bind()+listen()
  • Selector:对epoll的封装
    • Selector.open()<--->epoll_create()+epoll_ctl()
    • selector.select(3000)<--->epoll_ctl()+epoll_wait()
  • SelectionKey:关联上面2个,有了文件描述符快速找到对应Socket

NettyApi

之前博客整理的有点问题,这里先介绍下各个api是怎么用的,再简单讲述下流程

EventLoopGroup

这个类似于线程池,前面的博客也写到了,线程池是通过Worker和阻塞队列实现解耦,EventLoopGroupWorker就是EventLoop,这里EventLoop呢又有点不同,他是个只有单线程的线程池,可以这么理解,具体执行就是通过EventLoop,当然还有Nio相关的一些封装,这里后面再说(后续EventLoop都是指代的NioEventLoop其他系列也都是指代的Nio=。=)

public static void main(String[] args) {
    EventLoopGroup group = new NioEventLoopGroup();

    for (int i = 0; i < 10000; i++) {
        group.execute(()->{
            System.out.println(111111111);
        });
    }

}

一看就会的Netty源码

Future

这个和java原生的还是类似,这里主要是回调,添加了回调,在任务完成的时候,会调用回调。异步这个东西,自己写的时候很爽,但是别人看的时候会忍不住一句mmp,回调地狱了解下。这里呢看下使用即可

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        Integer ans=null;
        Future<Integer> future=group.submit(()->{
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("wait complete");
            return 1;

        });
        //通过wait,notify阻塞
//        Object o=future.get();
//        System.out.println(o);

        future.addListener(temp->{
            System.out.println(temp.getNow());
        });

    }

promise同理,不解释了 一看就会的Netty源码

EventLoop

之前说过类似线程池,它同时也封装了SelectorSelectionKey,也就是相当于Epoll结构体,主要管理各种网络事件

Channel

Netty的ServerSocketChannel是对于javanio的Channel的封装,你可以看到javachannel()这种方法,accept事件在这里处理,因为要把返回的Channel继续封装成ServerSocketChannel,其余的read之类的事件则是交给pipeline中的各个Handler进行处理,

  • BossGroup:由于初始化的时候添加了ServerBootstrapAcceptor(这个是netty自己添加的),对应BossGroup对于事件的处理都在这
  • WorkGroup:这里就是上面的ServerBootstrapAcceptor中的实现,这里就贴一小段代码,多了没人看。 一看就会的Netty源码

ByteBuf

ByteBuffer的封装,单指针变成了双指针

流程梳理

多的图我也不放了,核心就是Reactor,这里放一下Doug Lea大佬ppt的图片,完美诠释了Netty的核心流程

一看就会的Netty源码

封装梳理

说多了也没人看,这里精简成一张图 一看就会的Netty源码

其他

粘包半包问题

  • 粘包:服务端一次性读到了多个数据包
  • 半包:数据量过大读了一半 解决方案
  • 固定报文长度
    • 缺点:长度不好把握,长度定的太短还是有半包问题
  • 特定分隔符
    • 缺点:报文包含了这个分隔符,会报错
  • 预设长度
    • 用前几个字节说明报文长度,Netty实现LengthFieldBasedFrameDecoder
转载自:https://juejin.cn/post/7243342597002461241
评论
请登录