likes
comments
collection
share

Golang调度器(7)—网络轮询器

作者站长头像
站长
· 阅读数 12

0. 简介

上篇博客提到了goroutine有关系统调用的调度进行了叙述,对于IO密集型的访问,每次请求都可能导致一次M的创建,这其实是不能接受的。Go SDK为了解决网络IO密集型的应用场景,开发了网络轮询器这一关键组件。

网络轮询器利用了操作系统提供的IO多路复用模型来提升网络设备的利用率以及程序的性能,下面,我们将分别介绍几种常见的IO模型以及网络轮询器在以Linux为例子的系统上的实现。

1. IO多路复用

所谓IO多路复用就是使用select/poll/epoll这一系列的多路选择器,实现一个线程监控多个文件句柄,一旦某个文件句柄就绪(ready),就能够通知到对应应用程序的读写操作;没有文件句柄就绪时,就会阻塞应用程序,从而释放CPU资源。

1.1 select&poll

select实现多路复用的方式是,将需要监听的描述符都放到一个文件描述符集合,然后调用select函数将文件描述符集合拷贝到内核里,让内核来检查是否有事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此描述符标记为可读或可写,接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的描述符,然后再对其处理。

所以,对于select方式,在时间上,需要进行两遍遍历,一次在内核态,一次在用户态,所以其时间复杂度是O(N);在空间上,会发生两次文件描述符集合的拷贝,一次由用户空间拷贝到内核空间,一次由内核空间拷贝到用户空间,但是由于select使用的是固定长度的bitsmap,默认最多1024个文件描述符。

pollselect没有本质区别,只是不再使用bitsmap来存储关注的描述符,而是采用链表的方式存储,突破了文件描述符的个数限制,但是随着文件描述符的个数增加,其O(N)的时间复杂度也会使得效率越来越低下。

1.2 epoll

int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...);
listen(s, ...)

int epfd = epoll_create(...);
epoll_ctl(epfd, ...); // 将所有需要监听的socket添加到epfd中

while(1) {
    int n = epoll_wait(...);
    for(events){
        // 处理逻辑
    }
}

以上是一个很经典的epoll使用逻辑:先用epoll_create创建一个epfd对象,然后通过epoll_ctl将需要监控的文件描述符放到epfd中,最后调用epoll_wait等待数据。

相比于selectpollepoll很好地解决了前二者在时间和空间上效率问题:

  1. epoll在内核中使用红黑树跟踪文件描述符集合,大大缩减了时间,相比于select/poll遍历集合的 O(N) 的时间复杂度,红黑树的时间复杂度是 O(logN)
  2. epoll使用事件驱动的机制(前缀e应该就是event的首字母),当某个文件描述符有消息时,当用户调用epoll_wait函数时,只会返回有事件发生的文件描述符的个数,在空间上也大大节省了。

插个题外话,网上不少文章以讹传讹,认为epoll使用mmap进行内存映射,从而节省了内存拷贝,这完全是错误的,可以查看源码,或者参考epoll以及Epoll到底用没用到mmap之一

1.3 磁盘IO是否适用多路复用IO

需要说明的是,以下讨论均指发生在Linux系统中的多路复用;国外文件一般不称磁盘IO为disk io,而是regular file

我们先说结论:磁盘IO不适用多路复用IO!!!

多路复用IO最好搭配非阻塞IO使用,Linux手册有以下一段话:

Under Linux, select() may report a socket file descriptor as "ready for reading", while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.

谷歌翻译如下:

在 Linux 下,select() 可能会将套接字文件描述符报告为“准备好读取”,但随后的读取会阻塞。 例如,这可能发生在数据已到达但检查时校验和错误并被丢弃的情况下。 可能存在文件描述符被虚假报告为就绪的其他情况。 因此,在不应阻塞的套接字上使用 O_NONBLOCK 可能更安全。

简单来说,至少多路复用的select方式被官方推荐使用O_NONBLOCK来设置成非阻塞IO。不过,磁盘IO并不支持非阻塞IO,根据Non-blocking I/O with regular files

Regular files are always readable and they are also always writeable. This is clearly stated in the relevant POSIX specifications. I cannot stress this enough. Putting a regular file in non-blocking has ABSOLUTELY no effects other than changing one bit in the file flags.

简单而言,就是磁盘IO永远是可读可写的,非阻塞IO对其没有效果。

但是以上还不是磁盘IO不支持多路复用的最主要原因,请看以下分析:

select使用磁盘IO无意义,因为其永远是ready的

File descriptors associated with regular files always select true for ready to read, ready to write, and error conditions.

注意,以上标注来源于Linux手册,但是我本地man select却并没有这段话,只能贴出链接

epoll直接不支持磁盘IO

如果说,selectpoll只是对磁盘IO不起作用,因为其状态使用是ready,所以每次循环读都会得到结果,那么epoll做的更加决绝:不支持磁盘IO的句柄,直接报EPERM,通过man epoll_ctl得到:

EPERM The target file fd does not support epoll. This error can occur if fd refers to, for example, a regular file or a directory.

在Linux环境上,Go中的网络轮询器的底层是基于epoll实现的,所以其根本不支持磁盘IO的多路复用,任何磁盘IO的操作都会向上篇博客中描述的那样,陷入内核调用。而且在os.file_unix.go中,在创建文件时会尝试将其纳入到网络轮询器的体系中,有如下描述,有些情况下会无法将文件注册到网络轮询器中,譬如Linux系统中的磁盘文件。

// An error here indicates a failure to register
// with the netpoll system. That can happen for
// a file descriptor that is not supported by
// epoll/kqueue; for example, disk files on
// Linux systems. We assume that any real error
// will show up in later I/O.

所以,在《Go 语言设计与实现》的6.6节,作者反复提及网络轮询器不仅用于监控网络IO,还能用于监控文件IO,这个结论是错误的!至少在Linux环境下,磁盘IO不能使用多路复用。我看有很多小伙伴的博客转述了其观点,恐怕会造成更大的谬误传播。

2. Go netpoller

Go是一门跨平台的编程语言,而不同平台针对IO多路复用也有不同的实现方式,Go netpoller通过在底层对Linux下的epoll、freeBSD或者MacOS下的kqueue或者Windows下的iocp进行封装,实现使用同步编程模式达到异步执行的效果,实现Go语言的网络轮询器,提升网络IO密集型应用的效率。这里需要说明的是,kqueue支持对磁盘IO做多路复用,但是如今大多数的服务器都是Linux系统的,所以我们以下讨论只针对Linux系统。

2.1 场景

以下,使用Go语言实现一个典型的TCP echo server

package main

import (
   "fmt"
   "net"
)

func main() {
   lis, err := net.Listen("tcp", ":8080")
   if err != nil {
      panic(err)
   }

   for {
      conn, err := lis.Accept()
      if err != nil {
         panic(err)
      }

      go func() {
         defer conn.Close()
         handle(conn)
      }()
   }
}

func handle(conn net.Conn) {
   buf := make([]byte, 1024)
   for {
      n, err := conn.Read(buf)
      if err != nil {
         fmt.Printf("read err: %+v\n", err)
         return
      }

      _, _ = conn.Write(buf[:n])
   }

}

上述模式是典型的goroutine-per-connection模式,下面,我们就来看看Go调度器是怎么做到这种模式的。

我们可以通过以下指令反编译代码

$ go build -gcflags "-N -l" -o main main.go
$ objdump -d main >> main.i

2.2 net.Listen

main.main -> net.Listen -> net.(*ListenConfig).Listen -> net.(*sysListener).listenTCP -> net.internetSocket -> net.socket -> net.sysSocket,注意这个net.sysSocketnet.sock_cloexec.go中,其调用的函数如下,可以发现,其使用的IO是非阻塞IO。

s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)

其实所谓socketFunc就是系统调用socket

var socketFunc        func(int, int, int) (int, error)  = syscall.Socket

以上是整个系统调用的过程,net.socket函数在通过net.sysSocket创建完socket句柄后,会通过newFD函数创建net.netFD对象,而这个对象包含着最重要的网络轮询器的重要对象poll.FD

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
   s, err := sysSocket(family, sotype, proto)
   if err != nil {
      return nil, err
   }
   if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
      poll.CloseFunc(s)
      return nil, err
   }
   if fd, err = newFD(s, family, sotype, net); err != nil {
      poll.CloseFunc(s)
      return nil, err
   }

   if laddr != nil && raddr == nil {
      switch sotype {
      case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
         if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
            fd.Close()
            return nil, err
         }
         return fd, nil
      case syscall.SOCK_DGRAM:
         if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
            fd.Close()
            return nil, err
         }
         return fd, nil
      }
   }
   if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
      fd.Close()
      return nil, err
   }
   return fd, nil
}

然后,由于我们建立的是TCP连接,所以会走入到listenStream方法,里面调用了系统调用bindlisten(可以看到,Go中的net.Listen封装了原来C语言建立tcp服务时调用系统调用的socket -> bind -> listen三个步骤)。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
   var err error
   if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
      return err
   }
   var lsa syscall.Sockaddr
   if lsa, err = laddr.sockaddr(fd.family); err != nil {
      return err
   }
   if ctrlFn != nil {
      c, err := newRawConn(fd)
      if err != nil {
         return err
      }
      if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
         return err
      }
   }
   if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
      return os.NewSyscallError("bind", err)
   }
   if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
      return os.NewSyscallError("listen", err)
   }
   if err = fd.init(); err != nil {
      return err
   }
   lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
   fd.setAddr(fd.addrFunc()(lsa), nil)
   return nil
}

然后,以上函数调用了fd.init()来初始化我们的句柄操作,为了这部分能继续讲下去,也是我们网络轮询器的重点,接下来,我们需要介绍一下Go网络轮询器几个重要的结构体。

2.3 数据结构以及fd.init()做了什么

net.netFD

// Network file descriptor.
type netFD struct {
   pfd poll.FD

   // immutable until Close
   family      int
   sotype      int
   isConnected bool // handshake completed or use of association with peer
   net         string
   laddr       Addr
   raddr       Addr
}

net.netFD是网络描述符,其包含了重要的网络轮询器的结构体pfd,类型是poll.FD

poll.FD

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
   // Lock sysfd and serialize access to Read and Write methods.
   fdmu fdMutex

   // System file descriptor. Immutable until Close.
   Sysfd int

   // I/O poller.
   pd pollDesc

   // Writev cache.
   iovecs *[]syscall.Iovec

   // Semaphore signaled when file is closed.
   csema uint32

   // Non-zero if this file has been set to blocking mode.
   isBlocking uint32

   // Whether this is a streaming descriptor, as opposed to a
   // packet-based descriptor like a UDP socket. Immutable.
   IsStream bool

   // Whether a zero byte read indicates EOF. This is false for a
   // message based socket connection.
   ZeroReadIsEOF bool

   // Whether this is a file rather than a network socket.
   isFile bool
}

poll.FD定义如上,其包含系统的描述符Sysfd和轮询器的重要结构pd,类型是poll.pollDesc

pollDesc

type pollDesc struct {
   runtimeCtx uintptr
}

这里的这个struct只包含一个指针,其实这是为了进行不同系统兼容而设置的,3.2中最后分析到fd.init()函数进行初始化,最终会调用到以下函数:

func (pd *pollDesc) init(fd *FD) error {
   serverInit.Do(runtime_pollServerInit)
   ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
   if errno != 0 {
      return errnoErr(syscall.Errno(errno))
   }
   pd.runtimeCtx = ctx
   return nil
}

以上init函数,在编译条件是linux的情形下,会定位到runtime.netpoll.go中以下函数:

func poll_runtime_pollServerInit() {
   netpollGenericInit()
}

func netpollGenericInit() {
   if atomic.Load(&netpollInited) == 0 {
      lockInit(&netpollInitLock, lockRankNetpollInit)
      lock(&netpollInitLock)
      if netpollInited == 0 {
         netpollinit()
         atomic.Store(&netpollInited, 1)
      }
      unlock(&netpollInitLock)
   }
}

可以看到,以上操作最终又会调用到netpollinit函数,而这个函数会根据系统不同编译不同文件,比如Linux系统中是runtime.netpoll_epoll.go,若是Mac,则会定位到runtime.netpoll_kqueue.go,我们依旧选择Linux平台进行分析:

func netpollinit() {
   epfd = epollcreate1(_EPOLL_CLOEXEC)
   if epfd < 0 {
      epfd = epollcreate(1024)
      if epfd < 0 {
         println("runtime: epollcreate failed with", -epfd)
         throw("runtime: netpollinit failed")
      }
      closeonexec(epfd)
   }
   r, w, errno := nonblockingPipe()
   if errno != 0 {
      println("runtime: pipe failed with", -errno)
      throw("runtime: pipe failed")
   }
   ev := epollevent{
      events: _EPOLLIN,
   }
   *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
   errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
   if errno != 0 {
      println("runtime: epollctl failed with", -errno)
      throw("runtime: epollctl failed")
   }
   netpollBreakRd = uintptr(r)
   netpollBreakWr = uintptr(w)
}

其中需要注意一点,netpollBreakRdnetpollBreakWr是注册的管道,用于唤醒epollwait函数的,将netpollBreakRd注册到epoll中,如果需要唤醒epollwait阻塞,则利用netpollBreakWr在管道这端写入即可。

最后需要注意的是,在以上init中,还会调用runtime_pollOpen(uintptr(fd.Sysfd))listenFD注册到epoll中。

分析完以上代码可以发现,在创建tcp服务并且监听的过程中,同时初进行了epoll的相关操作,并且预置了可以唤醒epollwait阻塞的管道。

2.4 Listener.Accept

Listener.Accept的调用过程为Listener.Accept -> net.(*TCPListener).Accept -> net.(*TCPListener).accept -> net.(*netFD).accept -> poll.(*FD).Accept -> poll.accept -> syscall.Accept。在poll.(*FD).Accept中,做如下处理:

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { // 这里的fd,就是之前listen的fd
   if err := fd.readLock(); err != nil {
      return -1, nil, "", err
   }
   defer fd.readUnlock()

   if err := fd.pd.prepareRead(fd.isFile); err != nil {
      return -1, nil, "", err
   }
   for {
      s, rsa, errcall, err := accept(fd.Sysfd) // 使用listen时的fd接收链接,因为listen fd设置为非阻塞,所以肯定返回
      if err == nil { // 如果没有错,说明真的有连接,返回
         return s, rsa, "", err
      }
      switch err {
      case syscall.EINTR:
         continue
      case syscall.EAGAIN: // 如果返回EAGAIN,判断可以使用poll系统后使用waitRead等待
         if fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil {
               continue
            }
         }
      case syscall.ECONNABORTED:
         // This means that a socket on the listen
         // queue was closed before we Accept()ed it;
         // it's a silly error, so try again.
         continue
      }
      return -1, nil, errcall, err
   }
}

pollDesc.waitRead函数是怎么做到等待的呢,其内部调用了pollDesc.wait -> runtime_pollWait -> runtime.poll_runtime_pollWait

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
   errcode := netpollcheckerr(pd, int32(mode))
   if errcode != pollNoError {
      return errcode
   }
   // As for now only Solaris, illumos, and AIX use level-triggered IO.
   if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
      netpollarm(pd, mode)
   }
   
   // 进入 netpollblock 判断是否有期待的IO事件发生
   for !netpollblock(pd, int32(mode), false) {
      errcode = netpollcheckerr(pd, int32(mode))
      if errcode != pollNoError {
         return errcode
      }
      // Can happen if timeout has fired and unblocked us,
      // but before we had a chance to run, timeout has been reset.
      // Pretend it has not happened and retry.
   }
   return pollNoError
}

netpollblock函数如下:

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   gpp := &pd.rg
   if mode == 'w' {
      gpp = &pd.wg
   }

   // set the gpp semaphore to pdWait
   for {
      // Consume notification if already ready.
      if gpp.CompareAndSwap(pdReady, 0) { // 如果当前IO已经ready,那么直接返回true
         return true
      }
      if gpp.CompareAndSwap(0, pdWait) { // 如果没有,则跳出循环
         break
      }

      // Double check that this isn't corrupt; otherwise we'd loop
      // forever.
      if v := gpp.Load(); v != pdReady && v != 0 {
         throw("runtime: double wait")
      }
   }

   // need to recheck error states after setting gpp to pdWait
   // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
   // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
   if waitio || netpollcheckerr(pd, mode) == pollNoError {
   // waitio = false;
   // netpollcheckerr(pd, mode) == pollNoError 此时一般是成立的,所以gopark住当前的goroutine
      gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
   }
   // be careful to not lose concurrent pdReady notification
   old := gpp.Swap(0)
   if old > pdWait {
      throw("runtime: corrupted polldesc")
   }
   return old == pdReady
}

通过以上分析可以看到,在Go中调用Listener.Accept函数后,会首先调用一次系统调用的accept,如果有连接请求则建立连接;如果没有连接请求,那么内核返回EAGAIN错误,那么会通过一系列调用,最终通过gopark函数挂起协程,等待接入事件的到来再唤醒协程。

在被唤醒之后,让我们回到net.(*netFD).accept函数,接下来会将accept系统操作返回的句柄重新生成一个FD,并且同样注册到epoll中,为之后的read、write操作做铺垫。

func (fd *netFD) accept() (netfd *netFD, err error) {
   d, rsa, errcall, err := fd.pfd.Accept()
   if err != nil {
      if errcall != "" {
         err = wrapSyscallError(errcall, err)
      }
      return nil, err
   }

   if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
      poll.CloseFunc(d)
      return nil, err
   }
   if err = netfd.init(); err != nil {
      netfd.Close()
      return nil, err
   }
   lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
   netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
   return netfd, nil
}

2.4 conn.Read/conn.Write

通过前面的分析我们知道,net.Listen操作返回的FD被注册到epoll是为了Listener.Accept,而Listener.Accept操作返回的FD注册到epoll中是为了conn.Read/conn.Write的。

Listener.Accept一样,我们以conn.Read为例,经历如下调用:conn.Read -> net.(*conn).Read -> net.(*netFD).Read -> poll.(*FD).Read

func (fd *FD) Read(p []byte) (int, error) {
   if err := fd.readLock(); err != nil {
      return 0, err
   }
   defer fd.readUnlock()
   if len(p) == 0 {
      // If the caller wanted a zero byte read, return immediately
      // without trying (but after acquiring the readLock).
      // Otherwise syscall.Read returns 0, nil which looks like
      // io.EOF.
      // TODO(bradfitz): make it wait for readability? (Issue 15735)
      return 0, nil
   }
   if err := fd.pd.prepareRead(fd.isFile); err != nil {
      return 0, err
   }
   if fd.IsStream && len(p) > maxRW {
      p = p[:maxRW]
   }
   for {
      n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) // 尝试去读一次,如果读到则返回,因为是非阻塞IO,所以读不到也会返回EAGAIN
      if err != nil {
         n = 0
         if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil { // 同样调用waitRead等待事件
               continue
            }
         }
      }
      err = fd.eofError(n, err)
      return n, err
   }
}

可以发现,读的时候和Accept操作类似,也会先尝试读一次,没读到之后同样调用waitRead挂起协程,营造阻塞读的假象。写的过程很类似,我们就不分析了。

以上,我们分析了accept和读写操作时,利用poller机制营造出同步阻塞调用的假象,实际上系统使用多路IO的工作过程,那么,协程什么时候会被唤醒呢?

2.5 协程的唤醒

实际上,对于网络轮询器,是通过runtime.netpoll函数去唤醒的,比如之前我们提过的findrunnable函数,就会调用runtime.netpoll函数去检查是否有ready的读写事件,具体调度我们就不分析了,下面看看runtime.netpoll是怎么工作的:

func netpoll(delay int64) gList {
   if epfd == -1 {
      return gList{}
   }
   var waitms int32 // 以下逻辑讲delay转换为 epollwait 的 timeout 值
   if delay < 0 {
      waitms = -1
   } else if delay == 0 {
      waitms = 0
   } else if delay < 1e6 {
      waitms = 1
   } else if delay < 1e15 {
      waitms = int32(delay / 1e6)
   } else {
      // An arbitrary cap on how long to wait for a timer.
      // 1e9 ms == ~11.5 days.
      waitms = 1e9
   }
   var events [128]epollevent
retry:
   n := epollwait(epfd, &events[0], int32(len(events)), waitms)
   if n < 0 {
      if n != -_EINTR {
         println("runtime: epollwait on fd", epfd, "failed with", -n)
         throw("runtime: netpoll failed")
      }
      // If a timed sleep was interrupted, just return to
      // recalculate how long we should sleep now.
      if waitms > 0 {
         return gList{}
      }
      goto retry
   }
   var toRun gList // goroutine链表,最后返回给调用方
   for i := int32(0); i < n; i++ {
      ev := &events[i]
      if ev.events == 0 {
         continue
      }

      if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { // 时间过长会被通过管道操作中断 epollwait
         if ev.events != _EPOLLIN {
            println("runtime: netpoll: break fd ready for", ev.events)
            throw("runtime: netpoll: break fd ready for something unexpected")
         }
         if delay != 0 {
            // netpollBreak could be picked up by a
            // nonblocking poll. Only read the byte
            // if blocking.
            var tmp [16]byte
            read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
            atomic.Store(&netpollWakeSig, 0)
         }
         continue
      }

      var mode int32
      if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
         mode += 'r'
      }
      if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
         mode += 'w'
      }
      if mode != 0 {
         pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
         pd.setEventErr(ev.events == _EPOLLERR)
         netpollready(&toRun, pd, mode) // 将g加入到队列中
      }
   }
   return toRun
}

其实以上工作主要就是把已经就绪的goroutine加入到toRun队列中,而runtime.findrunnable有如下代码处理就绪的

if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
   ...
   list := netpoll(delay) // block until new work is available
   atomic.Store64(&sched.pollUntil, 0)
   atomic.Store64(&sched.lastpoll, uint64(nanotime()))
   if faketime != 0 && list.empty() {
      // Using fake time and nothing is ready; stop M.
      // When all M's stop, checkdead will call timejump.
      stopm()
      goto top
   }
   lock(&sched.lock)
   _p_ = pidleget()
   unlock(&sched.lock)
   if _p_ == nil { // 如果没有P,将list队列中的所有g加到全局队列中
      injectglist(&list)
   } else { // 否则pop出一个g来运行,剩下的根据策略,放置到本P的本地队列或者全局队列中
      acquirep(_p_)
      if !list.empty() {
         gp := list.pop()
         injectglist(&list)
         casgstatus(gp, _Gwaiting, _Grunnable)
         if trace.enabled {
            traceGoUnpark(gp, 0)
         }
         return gp, false
      }
      if wasSpinning {
         _g_.m.spinning = true
         atomic.Xadd(&sched.nmspinning, 1)
      }
      goto top
   }
}

通过以上分析,我们知道了被park住的协程是如何被唤醒的。

3. 小结

Go netpoller利用多路IO复+非阻塞IO+Go runtime scheduler打造的原生网络模型,可以防止每次调用都陷入内核中,并且为上层封装了阻塞IO接口,提供goroutine-per-connection这种简单优雅的网络模型,大大提升了网络处理的能力,可以说,Go是一门适用于网络IO密集型应用的语言。

4. 参考文献

9.2 I/O 多路复用:select/poll/epoll

Go netpoller原生网络模型之源码全面解密

Epoll到底用没用到mmap之一

转载自:https://juejin.cn/post/7219847723092738106
评论
请登录