Golang调度器(7)—网络轮询器
0. 简介
上篇博客提到了goroutine
有关系统调用的调度进行了叙述,对于IO密集型的访问,每次请求都可能导致一次M的创建,这其实是不能接受的。Go SDK
为了解决网络IO密集型的应用场景,开发了网络轮询器
这一关键组件。
网络轮询器利用了操作系统提供的IO多路复用模型来提升网络设备的利用率以及程序的性能,下面,我们将分别介绍几种常见的IO模型以及网络轮询器在以Linux为例子的系统上的实现。
1. IO多路复用
所谓IO多路复用就是使用select/poll/epoll
这一系列的多路选择器,实现一个线程监控多个文件句柄,一旦某个文件句柄就绪(ready)
,就能够通知到对应应用程序的读写操作;没有文件句柄就绪时,就会阻塞应用程序,从而释放CPU资源。
1.1 select&poll
select
实现多路复用的方式是,将需要监听的描述符都放到一个文件描述符集合,然后调用select
函数将文件描述符集合拷贝到内核里,让内核来检查是否有事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此描述符标记为可读或可写,接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的描述符,然后再对其处理。
所以,对于select
方式,在时间上,需要进行两遍遍历,一次在内核态,一次在用户态,所以其时间复杂度是O(N);在空间上,会发生两次文件描述符集合的拷贝,一次由用户空间拷贝到内核空间,一次由内核空间拷贝到用户空间,但是由于select
使用的是固定长度的bitsmap,默认最多1024个文件描述符。
poll
和select
没有本质区别,只是不再使用bitsmap来存储关注的描述符,而是采用链表的方式存储,突破了文件描述符的个数限制,但是随着文件描述符的个数增加,其O(N)的时间复杂度也会使得效率越来越低下。
1.2 epoll
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...);
listen(s, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); // 将所有需要监听的socket添加到epfd中
while(1) {
int n = epoll_wait(...);
for(events){
// 处理逻辑
}
}
以上是一个很经典的epoll
使用逻辑:先用epoll_create
创建一个epfd
对象,然后通过epoll_ctl
将需要监控的文件描述符放到epfd
中,最后调用epoll_wait
等待数据。
相比于select
和poll
,epoll
很好地解决了前二者在时间和空间上效率问题:
epoll
在内核中使用红黑树跟踪文件描述符集合,大大缩减了时间,相比于select/poll
遍历集合的 O(N) 的时间复杂度,红黑树的时间复杂度是 O(logN);epoll
使用事件驱动的机制(前缀e
应该就是event
的首字母),当某个文件描述符有消息时,当用户调用epoll_wait
函数时,只会返回有事件发生的文件描述符的个数,在空间上也大大节省了。
插个题外话,网上不少文章以讹传讹,认为epoll
使用mmap
进行内存映射,从而节省了内存拷贝,这完全是错误的,可以查看源码,或者参考epoll以及Epoll到底用没用到mmap之一。
1.3 磁盘IO是否适用多路复用IO
需要说明的是,以下讨论均指发生在Linux系统中的多路复用;国外文件一般不称磁盘IO为disk io
,而是regular file
。
我们先说结论:磁盘IO不适用多路复用IO!!!
多路复用IO最好搭配非阻塞IO使用,Linux手册有以下一段话:
Under Linux, select() may report a socket file descriptor as "ready for reading", while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.
谷歌翻译如下:
在 Linux 下,select() 可能会将套接字文件描述符报告为“准备好读取”,但随后的读取会阻塞。 例如,这可能发生在数据已到达但检查时校验和错误并被丢弃的情况下。 可能存在文件描述符被虚假报告为就绪的其他情况。 因此,在不应阻塞的套接字上使用 O_NONBLOCK 可能更安全。
简单来说,至少多路复用的select
方式被官方推荐使用O_NONBLOCK
来设置成非阻塞IO。不过,磁盘IO并不支持非阻塞IO,根据Non-blocking I/O with regular files:
Regular files are always readable and they are also always writeable. This is clearly stated in the relevant POSIX specifications. I cannot stress this enough. Putting a regular file in non-blocking has ABSOLUTELY no effects other than changing one bit in the file flags.
简单而言,就是磁盘IO永远是可读可写的,非阻塞IO对其没有效果。
但是以上还不是磁盘IO不支持多路复用的最主要原因,请看以下分析:
select使用磁盘IO无意义,因为其永远是ready的:
File descriptors associated with regular files always select true for ready to read, ready to write, and error conditions.
注意,以上标注来源于Linux手册,但是我本地man select
却并没有这段话,只能贴出链接。
如果说,select
和poll
只是对磁盘IO不起作用,因为其状态使用是ready,所以每次循环读都会得到结果,那么epoll
做的更加决绝:不支持磁盘IO的句柄,直接报EPERM
,通过man epoll_ctl
得到:
EPERM The target file fd does not support epoll. This error can occur if fd refers to, for example, a regular file or a directory.
在Linux环境上,Go
中的网络轮询器的底层是基于epoll
实现的,所以其根本不支持磁盘IO的多路复用,任何磁盘IO的操作都会向上篇博客中描述的那样,陷入内核调用。而且在os.file_unix.go
中,在创建文件时会尝试将其纳入到网络轮询器的体系中,有如下描述,有些情况下会无法将文件注册到网络轮询器中,譬如Linux系统中的磁盘文件。
// An error here indicates a failure to register
// with the netpoll system. That can happen for
// a file descriptor that is not supported by
// epoll/kqueue; for example, disk files on
// Linux systems. We assume that any real error
// will show up in later I/O.
所以,在《Go 语言设计与实现》的6.6节,作者反复提及网络轮询器不仅用于监控网络IO,还能用于监控文件IO,这个结论是错误的!至少在Linux环境下,磁盘IO不能使用多路复用。我看有很多小伙伴的博客转述了其观点,恐怕会造成更大的谬误传播。
2. Go netpoller
Go
是一门跨平台的编程语言,而不同平台针对IO多路复用也有不同的实现方式,Go netpoller
通过在底层对Linux下的epoll
、freeBSD或者MacOS下的kqueue
或者Windows下的iocp
进行封装,实现使用同步编程模式达到异步执行的效果,实现Go
语言的网络轮询器,提升网络IO密集型应用的效率。这里需要说明的是,kqueue
支持对磁盘IO做多路复用,但是如今大多数的服务器都是Linux系统的,所以我们以下讨论只针对Linux系统。
2.1 场景
以下,使用Go
语言实现一个典型的TCP echo server
:
package main
import (
"fmt"
"net"
)
func main() {
lis, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
for {
conn, err := lis.Accept()
if err != nil {
panic(err)
}
go func() {
defer conn.Close()
handle(conn)
}()
}
}
func handle(conn net.Conn) {
buf := make([]byte, 1024)
for {
n, err := conn.Read(buf)
if err != nil {
fmt.Printf("read err: %+v\n", err)
return
}
_, _ = conn.Write(buf[:n])
}
}
上述模式是典型的goroutine-per-connection
模式,下面,我们就来看看Go
调度器是怎么做到这种模式的。
我们可以通过以下指令反编译代码
$ go build -gcflags "-N -l" -o main main.go
$ objdump -d main >> main.i
2.2 net.Listen
main.main -> net.Listen -> net.(*ListenConfig).Listen -> net.(*sysListener).listenTCP -> net.internetSocket -> net.socket -> net.sysSocket
,注意这个net.sysSocket
在net.sock_cloexec.go
中,其调用的函数如下,可以发现,其使用的IO是非阻塞IO。
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
其实所谓socketFunc
就是系统调用socket
。
var socketFunc func(int, int, int) (int, error) = syscall.Socket
以上是整个系统调用的过程,net.socket
函数在通过net.sysSocket
创建完socket句柄后,会通过newFD
函数创建net.netFD
对象,而这个对象包含着最重要的网络轮询器的重要对象poll.FD
。
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
然后,由于我们建立的是TCP连接,所以会走入到listenStream
方法,里面调用了系统调用bind
和listen
(可以看到,Go
中的net.Listen
封装了原来C语言建立tcp服务时调用系统调用的socket -> bind -> listen
三个步骤)。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
然后,以上函数调用了fd.init()
来初始化我们的句柄操作,为了这部分能继续讲下去,也是我们网络轮询器的重点,接下来,我们需要介绍一下Go
网络轮询器几个重要的结构体。
2.3 数据结构以及fd.init()
做了什么
net.netFD
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
net.netFD
是网络描述符,其包含了重要的网络轮询器的结构体pfd
,类型是poll.FD
。
poll.FD
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
poll.FD
定义如上,其包含系统的描述符Sysfd
和轮询器的重要结构pd
,类型是poll.pollDesc
。
pollDesc
type pollDesc struct {
runtimeCtx uintptr
}
这里的这个struct只包含一个指针,其实这是为了进行不同系统兼容而设置的,3.2中最后分析到fd.init()
函数进行初始化,最终会调用到以下函数:
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
以上init
函数,在编译条件是linux
的情形下,会定位到runtime.netpoll.go
中以下函数:
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
可以看到,以上操作最终又会调用到netpollinit
函数,而这个函数会根据系统不同编译不同文件,比如Linux系统中是runtime.netpoll_epoll.go
,若是Mac,则会定位到runtime.netpoll_kqueue.go
,我们依旧选择Linux平台进行分析:
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
其中需要注意一点,netpollBreakRd
和netpollBreakWr
是注册的管道,用于唤醒epollwait
函数的,将netpollBreakRd
注册到epoll
中,如果需要唤醒epollwait
阻塞,则利用netpollBreakWr
在管道这端写入即可。
最后需要注意的是,在以上init
中,还会调用runtime_pollOpen(uintptr(fd.Sysfd))
将listenFD
注册到epoll
中。
分析完以上代码可以发现,在创建tcp
服务并且监听的过程中,同时初进行了epoll
的相关操作,并且预置了可以唤醒epollwait
阻塞的管道。
2.4 Listener.Accept
Listener.Accept
的调用过程为Listener.Accept -> net.(*TCPListener).Accept -> net.(*TCPListener).accept -> net.(*netFD).accept -> poll.(*FD).Accept -> poll.accept -> syscall.Accept
。在poll.(*FD).Accept
中,做如下处理:
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { // 这里的fd,就是之前listen的fd
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
s, rsa, errcall, err := accept(fd.Sysfd) // 使用listen时的fd接收链接,因为listen fd设置为非阻塞,所以肯定返回
if err == nil { // 如果没有错,说明真的有连接,返回
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN: // 如果返回EAGAIN,判断可以使用poll系统后使用waitRead等待
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
而pollDesc.waitRead
函数是怎么做到等待的呢,其内部调用了pollDesc.wait -> runtime_pollWait -> runtime.poll_runtime_pollWait
:
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 判断是否有期待的IO事件发生
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
netpollblock
函数如下:
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) { // 如果当前IO已经ready,那么直接返回true
return true
}
if gpp.CompareAndSwap(0, pdWait) { // 如果没有,则跳出循环
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
// waitio = false;
// netpollcheckerr(pd, mode) == pollNoError 此时一般是成立的,所以gopark住当前的goroutine
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
通过以上分析可以看到,在Go
中调用Listener.Accept
函数后,会首先调用一次系统调用的accept
,如果有连接请求则建立连接;如果没有连接请求,那么内核返回EAGAIN
错误,那么会通过一系列调用,最终通过gopark
函数挂起协程,等待接入事件的到来再唤醒协程。
在被唤醒之后,让我们回到net.(*netFD).accept
函数,接下来会将accept
系统操作返回的句柄重新生成一个FD,并且同样注册到epoll
中,为之后的read、write操作做铺垫。
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
2.4 conn.Read/conn.Write
通过前面的分析我们知道,net.Listen
操作返回的FD被注册到epoll
是为了Listener.Accept
,而Listener.Accept
操作返回的FD注册到epoll
中是为了conn.Read/conn.Write
的。
和Listener.Accept
一样,我们以conn.Read
为例,经历如下调用:conn.Read -> net.(*conn).Read -> net.(*netFD).Read -> poll.(*FD).Read
:
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) // 尝试去读一次,如果读到则返回,因为是非阻塞IO,所以读不到也会返回EAGAIN
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil { // 同样调用waitRead等待事件
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
可以发现,读的时候和Accept操作类似,也会先尝试读一次,没读到之后同样调用waitRead
挂起协程,营造阻塞读的假象。写的过程很类似,我们就不分析了。
以上,我们分析了accept
和读写操作时,利用poller
机制营造出同步阻塞调用的假象,实际上系统使用多路IO的工作过程,那么,协程什么时候会被唤醒呢?
2.5 协程的唤醒
实际上,对于网络轮询器,是通过runtime.netpoll
函数去唤醒的,比如之前我们提过的findrunnable
函数,就会调用runtime.netpoll
函数去检查是否有ready
的读写事件,具体调度我们就不分析了,下面看看runtime.netpoll
是怎么工作的:
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32 // 以下逻辑讲delay转换为 epollwait 的 timeout 值
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList // goroutine链表,最后返回给调用方
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { // 时间过长会被通过管道操作中断 epollwait
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode) // 将g加入到队列中
}
}
return toRun
}
其实以上工作主要就是把已经就绪的goroutine
加入到toRun
队列中,而runtime.findrunnable
有如下代码处理就绪的
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
...
list := netpoll(delay) // block until new work is available
atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if faketime != 0 && list.empty() {
// Using fake time and nothing is ready; stop M.
// When all M's stop, checkdead will call timejump.
stopm()
goto top
}
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ == nil { // 如果没有P,将list队列中的所有g加到全局队列中
injectglist(&list)
} else { // 否则pop出一个g来运行,剩下的根据策略,放置到本P的本地队列或者全局队列中
acquirep(_p_)
if !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
}
通过以上分析,我们知道了被park
住的协程是如何被唤醒的。
3. 小结
Go netpoller
利用多路IO复+非阻塞IO+Go runtime scheduler
打造的原生网络模型,可以防止每次调用都陷入内核中,并且为上层封装了阻塞IO接口,提供goroutine-per-connection
这种简单优雅的网络模型,大大提升了网络处理的能力,可以说,Go
是一门适用于网络IO密集型应用的语言。
4. 参考文献
转载自:https://juejin.cn/post/7219847723092738106