likes
comments
collection
share

Golang网络IO模型源码分析-goroutinue+epoll

作者站长头像
站长
· 阅读数 18

在了解Golang的网络IO模型的具体实现之前,可以先了解一下Linux的几种经典网络模型


Linux经典网络模型

阻塞式IO(BIO)

应用进程从发起IO系统调用一直到返回结果,整个期间都是处于阻塞状态,当前线程被挂起

Golang网络IO模型源码分析-goroutinue+epoll

非阻塞式IO(NIO)

Socket可以设置为非阻塞,这样应用进程可发起IO系统调用后可以立刻返回。轮询发起的IO系统调用直到返回结束标识,需要应用进程不停地访问执行结果。

Golang网络IO模型源码分析-goroutinue+epoll

IO多路复用模型

可以将多个应用进程的Socket注册到一个Select上,用一个进程来监听该Select(阻塞)。这样就可以实现有一个Socket准备好立刻返回,并发起IO系统调用来完成数据读取。有select/poll/epoll这一系列的多路选择器,实现了对线程的复用,一个线程可以处理多个IO

Golang网络IO模型源码分析-goroutinue+epoll


Golang的代码实现

我们从服务端与客户端的连接建立看起:

// 服务端
func server() {
    listener, err := net.Listen("tcp", "127.0.0.1:8081")
    if err != nil {
        return
    }
    defer listener.Close()
    
    for {
        conn, err := listener.Accept()
        if err != nil {
            return
        }
        fmt.Println(conn)
    }
}

// 客户端
func client()  {
    conn, err := net.Dial("tcp", "127.0.0.1:8081")
    if err != nil {
        return
    }
    defer conn.Close()
}

可以分别通过服务端的 net.Listen()、Accept() 与 客户端的 net.Dial() 函数来深入看看Golang的内部实现


服务端的 net.Listen()

进入net.Listen()方法,可以看到实际上对ListenConfig.Listen()方法的一层封装,直接进入 net/dial.go 文件可以查看到 ListenConfig 的成员函数 Listen():

// 在本地网络地址上监听通告
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
   // 根据协议名称和地址获得Internet协议族地址列表
   addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
   if err != nil {
      return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
   }
   sl := &sysListener{
      ListenConfig: *lc,
      network:      network,
      address:      address,
   }
   var l Listener
   la := addrs.first(isIPv4)
   switch la := la.(type) {
   case *TCPAddr:
      // TCP监听
      l, err = sl.listenTCP(ctx, la)
   case *UnixAddr:
      // Unix
      l, err = sl.listenUnix(ctx, la)
   default:
      return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
   }
   if err != nil {
      return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
   }
   return l, nil
}

实例中我们创建的是TCP连接,所以通过一个 switch-case 语句可以进入 TCP 类型对应的处理分支内,也就是函数 listenTCP() 为实际处理函数

// 返回值关键,包含了 poll.FD
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
   // 各平台对应调用生产socket具体描述符
   fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
   if err != nil {
      return nil, err
   }
   // 关键结构体
   return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

这个方法首先通过对 internetSocket() 方法的调用,根据不同的平台生成socket描述符,返回值为关键结果

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
   if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
      raddr = raddr.toLocal(net)
   }
   family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
   // 返回socket描述符具体地址,此函数:1.调用sysSocket生产描述符 2.调用newFD封装描述符 构造netFD 3.调用netFD实现bind与listen
   return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

在 internetSocket() 方法内部,可以看到会先将零地址值转换为具体的地址,随后调用了比较重要的 socket() 方法,这个方法实现了:

    1.调用sysSocket生产描述符   
    2.调用newFD封装描述符 构造netFD    
    3.调用netFD实现bindlisten   
// 返回一个网络文件描述用于网络轮询器进行异步 IO
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
   // 根据操作系统获取对应的socket
   s, err := sysSocket(family, sotype, proto)
   if err != nil {
      return nil, err
   }
   // 设置socket选项
   if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
      poll.CloseFunc(s)
      return nil, err
   }
   // 根据socket创建fd,实际上对于底层socket进行封装,包含文件句柄等关键信息
   if fd, err = newFD(s, family, sotype, net); err != nil {
      poll.CloseFunc(s)
      return nil, err
   }

   /* 机器翻译:
      此函数为以下应用程序创建网络文件描述符: 
       - 打开被动流连接的端点持有者,称为流侦听器 
       - 打开目标非特定数据报连接的端点持有者,称为数据报侦听器 
       - 端点持有者打开活动流或特定于目标的数据报连接,称为拨号器 
       - 打开其他连接的端点持有者,例如与内核内的协议栈对话 
      对于流和数据报侦听器,它们只需要命名套接字
      所以我们可以假设当 laddr 不为 nil 但 raddr 为 nil 时,是来自流或数据报侦听器的请求。否则我们假设它仅用于拨号器或其他连接持有者*/

   // 监听
   if laddr != nil && raddr == nil {
      switch sotype {
      // windows实现在socket_windows.go  linux实现在socket_cloexec.go
      case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
         if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
            fd.Close()
            return nil, err
         }
         return fd, nil
      case syscall.SOCK_DGRAM:
         // UDP
         if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
            fd.Close()
            return nil, err
         }
         return fd, nil
      }
   }
   // 发起连接,非listen socket处理
   if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
      fd.Close()
      return nil, err
   }
   return fd, nil
}

前面对 internetSocket() 方法调用时的参数 syscall.SOCK_STREAM,程序会进行 listenStream() 方法的调用。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
   ...
   // 将socket与 ip port进行绑定
   if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
      return os.NewSyscallError("bind", err)
   }
   // 调用系统的 syscall.Listen,开启监听
   if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
      return os.NewSyscallError("listen", err)
   }
   // 初始化 fd
   if err = fd.init(); err != nil {
      return err
   }
   lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
   fd.setAddr(fd.addrFunc()(lsa), nil)
   return nil
}

可以看到,到这里基本上印证了前面的三点作用。服务端通过对方法 internetSocket() 方法的调用,生成 netFD 实例并将地址返回并包装进 TCPListener 结构体中。

到这里对服务端的流程基本结束,让我们先看看客户端的连接过程再去看看 netFD 是怎么回事


客户端的 net.Dial()

进入net.Dial()方法,可以看到与服务端调用的Listen()类似,实际上是对Dialer.Dial()方法的一层封装,直接进入 net/dial.go 文件可以查看到 Dial():

// 连接到指定网络上的地址
func (d *Dialer) Dial(network, address string) (Conn, error) {
   // conn用TCPConn
   return d.DialContext(context.Background(), network, address)
}

// 使用提供的上下文连接到指定网络上的地址。
// 提供的上下文必须是非空的。
// 如果上下文在连接完成之前过期,则会返回错误。一旦连接成功,上下文的任何过期都不会影响连接

// 最终返回TCPConn > Conn 里面就是是基于关键网络描述符 netFD,与服务端对应上了
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
   ...
   var c Conn
   if len(fallbacks) > 0 {
      c, err = sd.dialParallel(ctx, primaries, fallbacks)
   } else {
      c, err = sd.dialSerial(ctx, primaries)
   }
   if err != nil {
      return nil, err
   }

   if tc, ok := c.(*TCPConn); ok && d.KeepAlive >= 0 {
      setKeepAlive(tc.fd, true)
      ka := d.KeepAlive
      if d.KeepAlive == 0 {
         ka = defaultTCPKeepAlive
      }
      setKeepAlivePeriod(tc.fd, ka)
      testHookSetKeepAlive(ka)
   }
   return c, nil
}

在 DialContext() 方法的最后声明了一个 Conn类型的interface(),通过断言转为 *TCPConn 类型进行后续操作

// TCPConn 是用于 TCP 网络连接的 Conn 接口的实现
type TCPConn struct {
   conn
}

type conn struct {
   fd *netFD // 关键 网络描述符/句柄 无论net.listener还是dial都是基于此
}

逐步进入源码中 conn 结构体就可以看到,客户端最后也是生成了一个包含 netFD 的结构体,至此客户端与服务端的连接过程就比较清晰了。


服务端与客户端的总结

Golang网络IO模型源码分析-goroutinue+epoll


netFD

poll.FD

接下来就是最重要的 netFD 结构体,server端在创建socket的时候会创建好 fd 结构体,可以从源码 net/fd_posix.go 中看到该结构体的详细信息

// Network file descriptor.
// 包含在Conn结构中,而Conn又包含在TCPConn结构中,所以应该处于用户接口层
type netFD struct {
   // 包含两个重要的数据结构:Sysfd 和 pollDesc,用户层接口调用完成交互
   // 1.前者是真正的系统文件描述符
   // 2.后者是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法所实现的
   pfd poll.FD

   // immutable until Close
   family      int
   sotype      int
   isConnected bool // handshake completed or use of association with peer
   net         string
   laddr       Addr
   raddr       Addr
}

关键结构体:poll.FD,源码位置 internal/poll/fd_unix.go

// FD 是一个文件描述符。 net 和 os 包使用此类型作为表示网络连接或操作系统文件的较大类型的字段
type FD struct {
   // Lock sysfd and serialize access to Read and Write methods.
   fdmu fdMutex   // 读写锁 锁定sysfd并序列化对Read和Write方法的访问

   // System file descriptor. Immutable until Close.
   Sysfd int  // 关键 系统文件描述符

   // I/O poller.
   pd pollDesc    // 底层事件驱动的封装 所有的读写超时等操作都是通过它

   // Writev cache.
   iovecs *[]syscall.Iovec

   // Semaphore signaled when file is closed. 关闭文件时的信号
   csema uint32

   // Non-zero if this file has been set to blocking mode.
   isBlocking uint32

   // Whether this is a streaming descriptor, as opposed to a
   // packet-based descriptor like a UDP socket. Immutable. TCP还是UDP
   IsStream bool

   // Whether a zero byte read indicates EOF. This is false for a
   // message based socket connection. 读取到0字节是是否为错误
   ZeroReadIsEOF bool

   // Whether this is a file rather than a network socket. 是否是系统文件或者网络socket连接
   isFile bool
}

进入到关键字段 pd(pollDesc) 的源码,位于internal/poll/fd_poll_runtime.go,可以看到:

type pollDesc struct {
   runtimeCtx uintptr // 只包含了一个指针 指针具体内容是关键 运行时上下文
   // 通过init进行初始化
}

var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
   // 一次、首次
   serverInit.Do(runtime_pollServerInit)
   // 关键 内核态用户共享的关联切换
   // 注册epoll实例到fd 实际link到runtime包下的 poll_runtime_pollOpen 函数。具体实现在 runtime/netpoll.go
   // go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
   ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
   if errno != 0 {
      return errnoErr(syscall.Errno(errno))
   }
   pd.runtimeCtx = ctx
   return nil
}

结构 pollDesc() 中只包含了一个指针,内容是运行时的上下文信息,结构体通过自己的 init 方法进行初始化。

从源码中可以看到,先通过 sync.Once 来执行函数,保证只执行一次初始化,随后调用方法 runtime_pollOpen 来将 fd 加入 epoll中,实现了内核态用户态的关联切换。

几个关键的方法在文件开始的地方均有函数签名,但却没有具体实现:

func runtime_pollServerInit()
func runtime_pollOpen(fd uintptr) (uintptr, int)
func runtime_pollClose(ctx uintptr)
func runtime_pollWait(ctx uintptr, mode int) int
func runtime_pollWaitCanceled(ctx uintptr, mode int) int
func runtime_pollReset(ctx uintptr, mode int) int
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
func runtime_pollUnblock(ctx uintptr)
func runtime_isPollServerDescriptor(fd uintptr) bool

这里的方法是通过Golang的链接器这一骚操作来链接过来的,实际的代码都位于 runtime 包下,通过 linkname 可以实现对另一个包的 unexported 的方法或者变量的引用,达成定向访问的目的,在Golang的源码中有非常多的使用场景。

随后打开 runtime/netpoll.go 文件,可以看到上面只有签名的函数的具体实现


runtime/netpoll.go

不同系统的具体实现是有差异的,以 Linux 为例,具体实现位于 runtime/netpoll_epoll.go,但 darwin 系统的实现位于 runtime/netpoll_kqueque.go中,而 windows 实现位于 runtime/netpoll_windows.go 中,这里以 Linux 系统为例:

初始化

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
// 初始化
func poll_runtime_pollServerInit() {
   netpollGenericInit()
}

func netpollGenericInit() {
    ...
    // 实际调用
         netpollinit()
    ...
}

// 初始化网络轮询器 通过sync.Once 以及 netpollInited 状态值保证只遍历一次
func netpollinit() {
   // 尝试使用create1系统调用创建epoll,否则使用create来创建epoll
   epfd = epollcreate1(_EPOLL_CLOEXEC)
   if epfd < 0 {
      epfd = epollcreate(1024)
      if epfd < 0 {
         println("runtime: epollcreate failed with", -epfd)
         throw("runtime: netpollinit failed")
      }
      closeonexec(epfd)
   }
   // IO多路复用
   r, w, errno := nonblockingPipe()
   if errno != 0 {
      println("runtime: pipe failed with", -errno)
      throw("runtime: pipe failed")
   }
   ev := epollevent{
      events: _EPOLLIN,
   }
   *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
   // 调用三个关键函数的创建
   errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
   if errno != 0 {
      println("runtime: epollctl failed with", -errno)
      throw("runtime: epollctl failed")
   }
   netpollBreakRd = uintptr(r)
   netpollBreakWr = uintptr(w)
}

在初始化过程中,可以看到主要是对 epoll 的初始化,同时生成r w,r和w是最底层的读写fd,也是要进行复用的fd,之后在netpoll注册监听的所有fd都是复用这个netpollBreakRdnetpollBreakWr,在函数其中有三个关于epoll的关键函数:epollcreate1()、epollcreate()、epollctl(),从文件开头可以看到对几个函数的签名:

// 三大关键函数
// 1.创建eventpoll,返回一个epfd句柄,后续根据这个句柄进行对fd的添加删除等操作
func epollcreate(size int32) int32
func epollcreate1(flags int32) int32

//go:noescape
// 2.创建红黑树结构epitem 完成epoll监听事件咋注册 向epfd添加、删除、修改要监听的fd
func epollctl(epfd, op, fd int32, ev *epollevent) int32

//go:noescape
// 3.轮询获取就绪的epitem维护到双向链表,传入创建返回的epfd句柄,以及超时时间,返回就绪的fd句柄
func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32
func closeonexec(fd int32)

golang的 epoll 结构最初是通过 hash表来实现,后面改用红黑树来实现,通过方法 epollctl来维持红黑树结构,关于红黑树:

Golang网络IO模型源码分析-goroutinue+epoll

以上的三个函数(具体用汇编实现,不再研究) :

  1. epollcreate:在内核创建一个epoll对象,返回一个epfd句柄,后续根据句柄来操作对象
  2. epollctl:创建红黑树结构体,将fd封装为一个epitem加入红黑树结构,并加一个回调函数注册到内核,状态改变时将item加入rdlist就绪队列中,同时维持红黑树的结构
  3. epollwait:轮询获取就绪的epitem维护到rdlist双向链表中,传入创建返回的epfd句柄,以及超时时间,返回就绪的fd句柄。如果就绪队列为空就阻塞等待直到超时

整个过程整理流程可以如图所示,上面部分为用户态,下面部分为内核态:

Golang网络IO模型源码分析-goroutinue+epoll

open

随后我们可以看 poll_runtime_pollOpen() 的具体实现:

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
// pollDesc open 连接至此 打开
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
   //通过此,分配一个pollDesc,返回*pollDesc类型的变量,这个是runtime的pollDesc,和之前提到的不是完全相同的
   pd := pollcache.alloc()
   lock(&pd.lock)
   wg := pd.wg.Load()
   if wg != 0 && wg != pdReady {
      throw("runtime: blocked write on free polldesc")
   }
   rg := pd.rg.Load()
   if rg != 0 && rg != pdReady {
      throw("runtime: blocked read on free polldesc")
   }
   pd.fd = fd
   pd.closing = false
   pd.setEventErr(false)
   pd.rseq++
   pd.rg.Store(0)
   pd.rd = 0
   pd.wseq++
   pd.wg.Store(0)
   pd.wd = 0
   pd.self = pd
   pd.publishInfo()
   unlock(&pd.lock)
   // 初始化 netpollopen
   errno := netpollopen(fd, pd)
   if errno != 0 {
      // poll 缓存区
      pollcache.free(pd)
      return nil, int(errno)
   }
   // 关键 pollDesc的真正结构
   return pd, 0
}

// 监听文件描述符上的边缘触发事件,创建事件并加入监听
// 这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联
func netpollopen(fd uintptr, pd *pollDesc) int32 {
   var ev epollevent
   // 具体事件
   // 注册event事件,这里使用了epoll的ET模式,相对于ET,ET需要每次产生时间是就处理时间,否则容易丢失事件
   ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
   // events记录上pd的指针
   *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
   // 系统调用将该fd加到eventpoll对象中,交由内核监听
   return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

函数 poll_runtime_pollOpen() 最后返回的 pd 是 runtime 包的pollDesc的指针,与之前提到的不是相同的,这里我们可以理解到,之前我们提到的指针类型的 pollDesc 保存的地址就是这个 runtime.Polldesc 的实例地址!

后面实际调用了 netpollopen() 函数,这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联,通过epollctl的调用将fd加入到epoll中,交由内核监听

到这里服务端的 net.Listener 创建过程就结束了,最后就是对服务端调用的Accept()的研究,关于epoll三大函数的 wait 也将在 Accept 中展开应用


Accept()

从前面的一系列过程可以得知,net.Listen()返回了一个 TCPListener,这个结构体实现了 Listener() 接口,直接找到他的 Accept() 方法来看源码,位置在 net/tcpscok.go 中:

// Accept 实现了Listener接口中的Accept方法;它等待下一次调用并返回一个通用的 Conn
func (l *TCPListener) Accept() (Conn, error) {
   ...
   c, err := l.accept()
   ...
}

func (ln *TCPListener) accept() (*TCPConn, error) {
   fd, err := ln.fd.accept()
   if err != nil {
      return nil, err
   }
   tc := newTCPConn(fd)
   if ln.lc.KeepAlive >= 0 {
      setKeepAlive(fd, true)
      ka := ln.lc.KeepAlive
      if ln.lc.KeepAlive == 0 {
         ka = defaultTCPKeepAlive
      }
      setKeepAlivePeriod(fd, ka)
   }
   return tc, nil
}

可以看到,TCPListener的Accept经过包装,最后调用的函数为 fd.accept(),也就是netFD.Accept(),再次和我们前面的关键结构netFD关联了起来!

func (fd *netFD) accept() (netfd *netFD, err error) {
   // 调用了 poll.FD 的 Accept() 方法
   d, rsa, errcall, err := fd.pfd.Accept()
   if err != nil {
      if errcall != "" {
         err = wrapSyscallError(errcall, err)
      }
      return nil, err
   }
   ...
}

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
   // 加锁
   if err := fd.readLock(); err != nil {
      return -1, nil, "", err
   }
   defer fd.readUnlock()

   if err := fd.pd.prepareRead(fd.isFile); err != nil {
      return -1, nil, "", err
   }
   for {
      // 尝试直接获取客户端连接,成功直接返回
      s, rsa, errcall, err := accept(fd.Sysfd)
      if err == nil {
         return s, rsa, "", err
      }
      switch err {
      case syscall.EINTR:
         continue
      // 创建的socket非阻塞,没有新连接返回 EAGAIN 而不是阻塞
      case syscall.EAGAIN:
         // 可轮询的调用 epoll wait来等待通知
         if fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil {
               continue
            }
         }
      case syscall.ECONNABORTED:
         // 这意味着监听队列中的套接字在我们接受()之前已经关闭
         continue
      }
      return -1, nil, errcall, err
   }
}

通过代码可以看到一些关键逻辑,其中关键函数有 accept() 与 waitRead(),来看一下 waitRead() 是如何做的:

func (pd *pollDesc) waitRead(isFile bool) error {
   return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
   if pd.runtimeCtx == 0 {
      return errors.New("waiting for unsupported file type")
   }
   res := runtime_pollWait(pd.runtimeCtx, mode)
   return convertErr(res, isFile)
}

可以看到,像前面的函数一样,调用了 runtime 包的 epollwait,让我们再次回到 runtime/netpoll_epoll.go 来看看具体实现:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
// 等待 检测是否需要等待
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
   errcode := netpollcheckerr(pd, int32(mode))
   if errcode != pollNoError {
      return errcode
   }
   // As for now only Solaris, illumos, and AIX use level-triggered IO.
   if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
      netpollarm(pd, mode)
   }
   for !netpollblock(pd, int32(mode), false) {
      errcode = netpollcheckerr(pd, int32(mode))
      if errcode != pollNoError {
         return errcode
      }
   }
   return pollNoError
}

关键函数为 netpollblock():

// 轮询时调用的方法,如果ip就绪了返回true,如果没有就绪返回false,分为 'r' 'w' 两种模式
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   // 如果是'r'模式,则gpp是&pd.rg,否则是 &pd.wg
   gpp := &pd.rg
   if mode == 'w' {
      gpp = &pd.wg
   }

   // set the gpp semaphore to pdWait
   for {
      // Consume notification if already ready.
      if gpp.CompareAndSwap(pdReady, 0) {
         return true
      }
      if gpp.CompareAndSwap(0, pdWait) {
         break
      }

      // Double check that this isn't corrupt; otherwise we'd loop
      // forever.
      if v := gpp.Load(); v != pdReady && v != 0 {
         throw("runtime: double wait")
      }
   }

   // 在将 gpp 设置为 pdWait 后需要重新检查错误状态这是必要的,
   // 因为runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl 做相反的事情:
   // 存储到 closing/rd/wd,publishInfo,rg/wg 的加载
   if waitio || netpollcheckerr(pd, mode) == pollNoError {
      // gopark是很重要的方法,让出当前协程的执行权,一般是返回到g0,让g0重新调度
      // 调用 netpollblockcommit,将当前的 G 保存进 pollDesc 的rg或wg中
      gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
   }
   // be careful to not lose concurrent pdReady notification
   old := gpp.Swap(0)
   if old > pdWait {
      throw("runtime: corrupted polldesc")
   }
   return old == pdReady
}

到这里,Accept() 的执行逻辑也基本清楚了:

  1. 通过socket执行accept直接获取客户端连接
  2. 客户端没有连接返回 EAGAIN 信号量
  3. 调用runtime.poll_runtime_pollWait挂起当前协程并保存到pollDesc中
  4. 有新客户端连接来的时候恢复阻塞的协程重复执行第一步

但是这里没有关于 epoll wait 的执行逻辑呀,其实epoll wait的相关执行位于golang的协程调度代码中,也就是位置于:runtime/proc.go 文件中,调用的是函数 netpoll()

// netpoll 轮询网络并返回可运行的准备就绪的 goroutines 列表,传入的参数会决定他的行为
// - 参数 < 0: 无限期阻塞等待文件就绪
// - 参数 == 0: 非阻塞轮询
// - 参数 > 0: 阻塞定期轮询
func netpoll(delay int64) gList {
   if epfd == -1 {
      return gList{}
   }
   var waitms int32
   if delay < 0 {
      waitms = -1
   } else if delay == 0 {
      waitms = 0
   } else if delay < 1e6 {
      waitms = 1
   } else if delay < 1e15 {
      waitms = int32(delay / 1e6)
   } else {
      // An arbitrary cap on how long to wait for a timer.
      // 1e9 ms == ~11.5 days.
      waitms = 1e9
   }
   // 声明一个epollevent事件,在epoll_wait系统调用的时候会给该数组赋值并返回一个索引值
   // 之后遍历数组取出就绪的fd事件
   var events [128]epollevent
retry:
   // 陷入系统调用,取出内核eventpoll中的rdlist,返回就绪的事件
   n := epollwait(epfd, &events[0], int32(len(events)), waitms)
   if n < 0 {
      if n != -_EINTR {
         println("runtime: epollwait on fd", epfd, "failed with", -n)
         throw("runtime: netpoll failed")
      }
      // If a timed sleep was interrupted, just return to
      // recalculate how long we should sleep now.
      if waitms > 0 {
         return gList{}
      }
      goto retry
   }
   var toRun gList
   // 遍历evnet事件数组
   for i := int32(0); i < n; i++ {
      ev := &events[i]
      if ev.events == 0 {
         continue
      }

      if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
         if ev.events != _EPOLLIN {
            println("runtime: netpoll: break fd ready for", ev.events)
            throw("runtime: netpoll: break fd ready for something unexpected")
         }
         if delay != 0 {
            // netpollBreak could be picked up by a
            // nonblocking poll. Only read the byte
            // if blocking.
            var tmp [16]byte
            read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
            atomic.Store(&netpollWakeSig, 0)
         }
         continue
      }

      var mode int32
      // 是否有就绪的读写事件,放入mode标志位
      if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
         mode += 'r'
      }
      if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
         mode += 'w'
      }
      if mode != 0 {
         // 取出存入的pollDesc指针
         pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
         pd.setEventErr(ev.events == _EPOLLERR)
         // 具体实现在 netpoll.go 中
         // 取出 pd 中的 rg 或 wg,后面放到运行队列
         netpollready(&toRun, pd, mode)
      }
   }
   return toRun
}

// 将就绪好的io事件,写入就绪的goroutinue队列
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
   var rg, wg *g
   if mode == 'r' || mode == 'r'+'w' {
      rg = netpollunblock(pd, 'r', true)
   }
   if mode == 'w' || mode == 'r'+'w' {
      wg = netpollunblock(pd, 'w', true)
   }
   // 将阻塞的goroutinue加入gList队列返回
   if rg != nil {
      toRun.push(rg)
   }
   if wg != nil {
      toRun.push(wg)
   }
}

结合前面的 epoll 流程图可以较快理解上面的代码,可以看到在协程调度过程中会获取准备好的socket并唤醒对应的Goroutinue


epoll的其他方法

close()

关于 close()的核心逻辑 同样位于 runtime/netpoll.go 与 runtime/netpoll_epoll.go 中

//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
// 当发生某些情况,如断开连接,fd销毁都会调用到此
func poll_runtime_pollClose(pd *pollDesc) {
   if !pd.closing {
      throw("runtime: close polldesc w/o unblock")
   }
   wg := pd.wg.Load()
   if wg != 0 && wg != pdReady {
      throw("runtime: blocked write on closing polldesc")
   }
   rg := pd.rg.Load()
   if rg != 0 && rg != pdReady {
      throw("runtime: blocked read on closing polldesc")
   }
   // 调用epoll_ctl系统调用,删除该fd在eventpoll上对应的epitem
   netpollclose(pd.fd)
   // 释放对应的pd
   pollcache.free(pd)
}

// 关闭
func netpollclose(fd uintptr) int32 {
   var ev epollevent
   return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
}

// 释放内存 对应的pd
func (c *pollCache) free(pd *pollDesc) {
   lock(&c.lock)
   pd.link = c.first
   c.first = pd
   unlock(&c.lock)
}

aloc()

pollDesc的空间申请方法

// 关键 缓存pollDesc实现
// go runtime调用 poll_runtime_pollOpen时,往epoll实例注册fd 首次调用次方法 初始大小 4k
func (c *pollCache) alloc() *pollDesc {
   lock(&c.lock)
   // 首次
   if c.first == nil {
      const pdSize = unsafe.Sizeof(pollDesc{})
      n := pollBlockSize / pdSize
      if n == 0 {
         n = 1
      }
      // Must be in non-GC memory because can be referenced
      // only from epoll/kqueue internals.
      // 分配不会被GC回收的内存,确保这些数据结构只能被 epoll和kqueue的内核空间去引用
      // GC操作会在go runtime关闭时调用pollcache.free释放内存
      mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
      for i := uintptr(0); i < n; i++ {
         pd := (*pollDesc)(add(mem, i*pdSize))
         pd.link = c.first
         c.first = pd
      }
   }
   // 往后每次会先判断链表头是否分配过值
   pd := c.first
   // 如果是则直接返回表头的pollDesc
   // 常见优化设计 批量初始化数据进行缓存 提高netpoller吞吐量
   c.first = pd.link
   lockInit(&pd.lock, lockRankPollDesc)
   unlock(&c.lock)
   return pd
}

Golang netpoll 总结

同样用一个流程图总结一下吧,可以与上面的流程图结合理解

Golang网络IO模型源码分析-goroutinue+epoll

参考

segmentfault.com/a/119000003… mcll.top/2019/04/07/… strikefreedom.top/archives/go… int64.ink/blog/golang…