一看就会的Netty源码
整理下Netty,之前写了几篇博客,翻来翻去太累了,这里我就整理了下整理在了一起,并且,去掉了繁杂了源码解读,demo我也顺便精简了一下,完全当一个备忘录来看,完全没有一点点的源码。方便快速会议和整理,如果想看的话这里附上前几个博客的链接,估计也没谁想看
- linuxApi:juejin.cn/post/714538…
- java->linux:juejin.cn/post/714638…
- netty启动流程分析:juejin.cn/post/715527…
C中网络编程
这里是伪代码!伪代码!伪代码!懂个大概即可,麻烦各位dalao就不要深究,这是我之前一篇博客的精简(链接),再次重申,以下c部分的代码只做梳理使用及api的介绍。
BIO|server
struct sockaddr_in stSockAddr;
//创建套接字
int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
//初始化 server_addr
memset(&stSockAddr, 0, sizeof(struct sockaddr_in));
//绑定
bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)
//监听状态
listen(SocketFD, 10)
while(1) {
//阻塞
client = accept(serverSocket, (struct sockaddr*)&clientAddr, (socklen_t*)&addr_len);
while(1) {
recv(client, buffer, 1024, 0);
//dosomething
send(client, buffer, strlen(buffer), 0); //服务端也向客户端发送消息
}
}
上述api list
socket()
创建一个新的确定类型的套接字,返回套接字。bind()
为一个套接字分配地址。listen()
开始监听可能的连接请求。accept()
当应用程序监听来自其他主机的面对数据流的连接时,必须用accept()
函数初始化连接。recv()
函数用于从已连接的套接字中接收信息send()
函数用于向处于连接状态的套接字中发送数据
BIO|client
struct sockaddr_in stSockAddr;
//创建套接字
int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
//连接服务器
connect(serverSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)
//发送
send(serverSocket, sendbuf, strlen(sendbuf), 0); //向服务端发送消息
上述api list
socket()
创建一个新的确定类型的套接字,返回套接字。connect()
函数用于客户端,该函数的功能为向服务器发起连接请求。send()
函数用于向处于连接状态的套接字中发送数据
Epoll|server
struct epoll_event tep,ep[MAX_OPEN_FD];
listenfd = socket(AF_INET,SOCK_STREAM,0);
fcntl(listenfd, F_SETFL, fdflags | O_NONBLOCK);
bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
listen(listenfd,20);
efd = epoll_create(MAX_OPEN_FD);
epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep);
for (;;){
size_t nready = epoll_wait(efd,ep,MAX_OPEN_FD,-1);
for (int i = 0; i < nready; ++i){
// 如果是新的连接,需要把新的socket添加到efd中
if (ep[i].data.fd == listenfd ){
connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);
tep.events = EPOLLIN;
tep.data.fd = connfd;
ret = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep);
}// 否则,读取数据
else
{
connfd = ep[i].data.fd;
int bytes = read(connfd,buf,MAXLEN);
// 客户端关闭连接
if (bytes == 0){
ret =epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL);
close(connfd);
}
else
{
for (int j = 0; j < bytes; ++j)
{
buf[j] = toupper(buf[j]);
}
// 向客户端发送数据
write(connfd,buf,bytes);
}
}
}
fcntl()
打开文件描述符,具体操作由cmd决定epoll_create()
在内核中创建epoll
实例并返回一个epoll
文件描述符,结构体为eventpoll
。epoll_ctl()
向 epfd 对应的内核epoll
实例添加、修改或删除对 fd 上事件 event 的监听。epoll_wait()
等待其管理的连接上的 IO 事件accept()
当应用程序监听来自其他主机的面对数据流的连接时,必须用accept()
函数初始化连接。read()
函数用于从已连接的套接字中接收信息write()
函数用于向处于连接状态的套接字中发送数据
Java Nio对于c的封装
操作系统底层就开了这么多接口,Java这种汇编语言底层也是调用这些方法,所以要看下怎么封装就理解几个头疼的对象(ByteBuffer
,Channel
,Selector
,SelectionKey
)分别代表什么
这里之前的博客(链接)也分析了,这里只简写
public class ServerConnect
{
public static void main(String[] args)
{
selector();
}
public static void handleAccept(SelectionKey key) throws IOException{
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));
}
public static void handleRead(SelectionKey key) throws IOException{
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buf = (ByteBuffer)key.attachment();
long bytesRead = sc.read(buf);
while(bytesRead>0){
buf.flip();
while(buf.hasRemaining()){
System.out.print((char)buf.get());
}
buf.clear();
bytesRead = sc.read(buf);
}
if(bytesRead == -1){
sc.close();
}
}
public static void handleWrite(SelectionKey key) throws IOException{
ByteBuffer buf = (ByteBuffer)key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while(buf.hasRemaining()){
sc.write(buf);
}
buf.compact();
}
public static void selector() {
Selector selector = null;
ServerSocketChannel ssc = null;
try{
selector = Selector.open();
ssc= ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
if(selector.select(3000) == 0){
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
if(key.isAcceptable()){
handleAccept(key);
}
if(key.isReadable()){
handleRead(key);
}
if(key.isWritable() && key.isValid()){
handleWrite(key);
}
if(key.isConnectable()){
System.out.println("isConnectable = true");
}
iter.remove();
}
}
}catch(IOException e){
e.printStackTrace();
}finally{
try{
if(selector!=null){
selector.close();
}
if(ssc!=null){
ssc.close();
}
}catch(IOException e){
e.printStackTrace();
}
}
}
}
直接说结论
- Channel:对socket的封装
ServerSocketChannel.open();
<--->socket()ssc.socket().bind()
<--->bind()+listen()
- Selector:对epoll的封装
Selector.open()
<--->epoll_create()+epoll_ctl()selector.select(3000)
<--->epoll_ctl()+epoll_wait()
- SelectionKey:关联上面2个,有了文件描述符快速找到对应Socket
NettyApi
之前博客整理的有点问题,这里先介绍下各个api是怎么用的,再简单讲述下流程
EventLoopGroup
这个类似于线程池,前面的博客也写到了,线程池是通过Worker
和阻塞队列实现解耦,EventLoopGroup
的Worker
就是EventLoop
,这里EventLoop
呢又有点不同,他是个只有单线程的线程池,可以这么理解,具体执行就是通过EventLoop
,当然还有Nio相关的一些封装,这里后面再说(后续EventLoop
都是指代的NioEventLoop
其他系列也都是指代的Nio=。=)
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
for (int i = 0; i < 10000; i++) {
group.execute(()->{
System.out.println(111111111);
});
}
}
Future
这个和java原生的还是类似,这里主要是回调,添加了回调,在任务完成的时候,会调用回调。异步这个东西,自己写的时候很爽,但是别人看的时候会忍不住一句mmp,回调地狱了解下。这里呢看下使用即可
public static void main(String[] args) throws ExecutionException, InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Integer ans=null;
Future<Integer> future=group.submit(()->{
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("wait complete");
return 1;
});
//通过wait,notify阻塞
// Object o=future.get();
// System.out.println(o);
future.addListener(temp->{
System.out.println(temp.getNow());
});
}
promise同理,不解释了
EventLoop
之前说过类似线程池,它同时也封装了Selector
和SelectionKey
,也就是相当于Epoll结构体,主要管理各种网络事件
Channel
Netty的ServerSocketChannel
是对于javanio的Channel
的封装,你可以看到javachannel()
这种方法,accept事件在这里处理,因为要把返回的Channel
继续封装成ServerSocketChannel
,其余的read之类的事件则是交给pipeline中的各个Handler
进行处理,
- BossGroup:由于初始化的时候添加了
ServerBootstrapAcceptor
(这个是netty自己添加的),对应BossGroup对于事件的处理都在这 - WorkGroup:这里就是上面的
ServerBootstrapAcceptor
中的实现,这里就贴一小段代码,多了没人看。
ByteBuf
ByteBuffer
的封装,单指针变成了双指针
流程梳理
多的图我也不放了,核心就是Reactor,这里放一下Doug Lea大佬ppt的图片,完美诠释了Netty
的核心流程
封装梳理
说多了也没人看,这里精简成一张图
其他
粘包半包问题
- 粘包:服务端一次性读到了多个数据包
- 半包:数据量过大读了一半 解决方案
- 固定报文长度
- 缺点:长度不好把握,长度定的太短还是有半包问题
- 特定分隔符
- 缺点:报文包含了这个分隔符,会报错
- 预设长度
- 用前几个字节说明报文长度,
Netty
实现LengthFieldBasedFrameDecoder
- 用前几个字节说明报文长度,
转载自:https://juejin.cn/post/7243342597002461241