Golang网络IO模型源码分析-goroutinue+epoll
在了解Golang的网络IO模型的具体实现之前,可以先了解一下Linux的几种经典网络模型
Linux经典网络模型
阻塞式IO(BIO)
应用进程从发起IO系统调用一直到返回结果,整个期间都是处于阻塞状态,当前线程被挂起
非阻塞式IO(NIO)
Socket可以设置为非阻塞,这样应用进程可发起IO系统调用后可以立刻返回。轮询发起的IO系统调用直到返回结束标识,需要应用进程不停地访问执行结果。
IO多路复用模型
可以将多个应用进程的Socket注册到一个Select上,用一个进程来监听该Select(阻塞)。这样就可以实现有一个Socket准备好立刻返回,并发起IO系统调用来完成数据读取。有select/poll/epoll这一系列的多路选择器,实现了对线程的复用,一个线程可以处理多个IO
Golang的代码实现
我们从服务端与客户端的连接建立看起:
// 服务端
func server() {
listener, err := net.Listen("tcp", "127.0.0.1:8081")
if err != nil {
return
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
return
}
fmt.Println(conn)
}
}
// 客户端
func client() {
conn, err := net.Dial("tcp", "127.0.0.1:8081")
if err != nil {
return
}
defer conn.Close()
}
可以分别通过服务端的 net.Listen()、Accept() 与 客户端的 net.Dial() 函数来深入看看Golang的内部实现
服务端的 net.Listen()
进入net.Listen()方法,可以看到实际上对ListenConfig.Listen()方法的一层封装,直接进入 net/dial.go
文件可以查看到 ListenConfig 的成员函数 Listen():
// 在本地网络地址上监听通告
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// 根据协议名称和地址获得Internet协议族地址列表
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
// TCP监听
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
// Unix
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
实例中我们创建的是TCP连接,所以通过一个 switch-case 语句可以进入 TCP 类型对应的处理分支内,也就是函数 listenTCP() 为实际处理函数
// 返回值关键,包含了 poll.FD
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
// 各平台对应调用生产socket具体描述符
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
// 关键结构体
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
这个方法首先通过对 internetSocket() 方法的调用,根据不同的平台生成socket描述符,返回值为关键结果
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
// 返回socket描述符具体地址,此函数:1.调用sysSocket生产描述符 2.调用newFD封装描述符 构造netFD 3.调用netFD实现bind与listen
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
在 internetSocket() 方法内部,可以看到会先将零地址值转换为具体的地址,随后调用了比较重要的 socket() 方法,这个方法实现了:
1.调用sysSocket生产描述符
2.调用newFD封装描述符 构造netFD
3.调用netFD实现bind与listen
// 返回一个网络文件描述用于网络轮询器进行异步 IO
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 根据操作系统获取对应的socket
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
// 设置socket选项
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
// 根据socket创建fd,实际上对于底层socket进行封装,包含文件句柄等关键信息
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
/* 机器翻译:
此函数为以下应用程序创建网络文件描述符:
- 打开被动流连接的端点持有者,称为流侦听器
- 打开目标非特定数据报连接的端点持有者,称为数据报侦听器
- 端点持有者打开活动流或特定于目标的数据报连接,称为拨号器
- 打开其他连接的端点持有者,例如与内核内的协议栈对话
对于流和数据报侦听器,它们只需要命名套接字
所以我们可以假设当 laddr 不为 nil 但 raddr 为 nil 时,是来自流或数据报侦听器的请求。否则我们假设它仅用于拨号器或其他连接持有者*/
// 监听
if laddr != nil && raddr == nil {
switch sotype {
// windows实现在socket_windows.go linux实现在socket_cloexec.go
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
// UDP
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
// 发起连接,非listen socket处理
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
前面对 internetSocket() 方法调用时的参数 syscall.SOCK_STREAM,程序会进行 listenStream() 方法的调用。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
...
// 将socket与 ip port进行绑定
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// 调用系统的 syscall.Listen,开启监听
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 初始化 fd
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
可以看到,到这里基本上印证了前面的三点作用。服务端通过对方法 internetSocket() 方法的调用,生成 netFD 实例并将地址返回并包装进 TCPListener 结构体中。
到这里对服务端的流程基本结束,让我们先看看客户端的连接过程再去看看 netFD 是怎么回事
客户端的 net.Dial()
进入net.Dial()方法,可以看到与服务端调用的Listen()类似,实际上是对Dialer.Dial()
方法的一层封装,直接进入 net/dial.go
文件可以查看到 Dial():
// 连接到指定网络上的地址
func (d *Dialer) Dial(network, address string) (Conn, error) {
// conn用TCPConn
return d.DialContext(context.Background(), network, address)
}
// 使用提供的上下文连接到指定网络上的地址。
// 提供的上下文必须是非空的。
// 如果上下文在连接完成之前过期,则会返回错误。一旦连接成功,上下文的任何过期都不会影响连接
// 最终返回TCPConn > Conn 里面就是是基于关键网络描述符 netFD,与服务端对应上了
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
...
var c Conn
if len(fallbacks) > 0 {
c, err = sd.dialParallel(ctx, primaries, fallbacks)
} else {
c, err = sd.dialSerial(ctx, primaries)
}
if err != nil {
return nil, err
}
if tc, ok := c.(*TCPConn); ok && d.KeepAlive >= 0 {
setKeepAlive(tc.fd, true)
ka := d.KeepAlive
if d.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(tc.fd, ka)
testHookSetKeepAlive(ka)
}
return c, nil
}
在 DialContext() 方法的最后声明了一个 Conn类型的interface(),通过断言转为 *TCPConn 类型进行后续操作
// TCPConn 是用于 TCP 网络连接的 Conn 接口的实现
type TCPConn struct {
conn
}
type conn struct {
fd *netFD // 关键 网络描述符/句柄 无论net.listener还是dial都是基于此
}
逐步进入源码中 conn 结构体就可以看到,客户端最后也是生成了一个包含 netFD 的结构体,至此客户端与服务端的连接过程就比较清晰了。
服务端与客户端的总结
netFD
poll.FD
接下来就是最重要的 netFD 结构体,server端在创建socket的时候会创建好 fd 结构体,可以从源码 net/fd_posix.go
中看到该结构体的详细信息
// Network file descriptor.
// 包含在Conn结构中,而Conn又包含在TCPConn结构中,所以应该处于用户接口层
type netFD struct {
// 包含两个重要的数据结构:Sysfd 和 pollDesc,用户层接口调用完成交互
// 1.前者是真正的系统文件描述符
// 2.后者是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法所实现的
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
关键结构体:poll.FD,源码位置 internal/poll/fd_unix.go
// FD 是一个文件描述符。 net 和 os 包使用此类型作为表示网络连接或操作系统文件的较大类型的字段
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex // 读写锁 锁定sysfd并序列化对Read和Write方法的访问
// System file descriptor. Immutable until Close.
Sysfd int // 关键 系统文件描述符
// I/O poller.
pd pollDesc // 底层事件驱动的封装 所有的读写超时等操作都是通过它
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed. 关闭文件时的信号
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable. TCP还是UDP
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection. 读取到0字节是是否为错误
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket. 是否是系统文件或者网络socket连接
isFile bool
}
进入到关键字段 pd(pollDesc) 的源码,位于internal/poll/fd_poll_runtime.go
,可以看到:
type pollDesc struct {
runtimeCtx uintptr // 只包含了一个指针 指针具体内容是关键 运行时上下文
// 通过init进行初始化
}
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
// 一次、首次
serverInit.Do(runtime_pollServerInit)
// 关键 内核态用户共享的关联切换
// 注册epoll实例到fd 实际link到runtime包下的 poll_runtime_pollOpen 函数。具体实现在 runtime/netpoll.go
// go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
结构 pollDesc() 中只包含了一个指针,内容是运行时的上下文信息,结构体通过自己的 init 方法进行初始化。
从源码中可以看到,先通过 sync.Once 来执行函数,保证只执行一次初始化,随后调用方法 runtime_pollOpen
来将 fd 加入 epoll中,实现了内核态用户态的关联切换。
几个关键的方法在文件开始的地方均有函数签名,但却没有具体实现:
func runtime_pollServerInit()
func runtime_pollOpen(fd uintptr) (uintptr, int)
func runtime_pollClose(ctx uintptr)
func runtime_pollWait(ctx uintptr, mode int) int
func runtime_pollWaitCanceled(ctx uintptr, mode int) int
func runtime_pollReset(ctx uintptr, mode int) int
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
func runtime_pollUnblock(ctx uintptr)
func runtime_isPollServerDescriptor(fd uintptr) bool
这里的方法是通过Golang的链接器这一骚操作来链接过来的,实际的代码都位于 runtime
包下,通过 linkname 可以实现对另一个包的 unexported 的方法或者变量的引用,达成定向访问的目的,在Golang的源码中有非常多的使用场景。
随后打开 runtime/netpoll.go
文件,可以看到上面只有签名的函数的具体实现
runtime/netpoll.go
不同系统的具体实现是有差异的,以 Linux 为例,具体实现位于 runtime/netpoll_epoll.go
,但 darwin 系统的实现位于 runtime/netpoll_kqueque.go
中,而 windows 实现位于 runtime/netpoll_windows.go
中,这里以 Linux 系统为例:
初始化
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
// 初始化
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
...
// 实际调用
netpollinit()
...
}
// 初始化网络轮询器 通过sync.Once 以及 netpollInited 状态值保证只遍历一次
func netpollinit() {
// 尝试使用create1系统调用创建epoll,否则使用create来创建epoll
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
// IO多路复用
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// 调用三个关键函数的创建
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
在初始化过程中,可以看到主要是对 epoll 的初始化,同时生成r w,r和w是最底层的读写fd,也是要进行复用的fd,之后在netpoll注册监听的所有fd都是复用这个netpollBreakRd
和netpollBreakWr
,在函数其中有三个关于epoll的关键函数:epollcreate1()、epollcreate()、epollctl(),从文件开头可以看到对几个函数的签名:
// 三大关键函数
// 1.创建eventpoll,返回一个epfd句柄,后续根据这个句柄进行对fd的添加删除等操作
func epollcreate(size int32) int32
func epollcreate1(flags int32) int32
//go:noescape
// 2.创建红黑树结构epitem 完成epoll监听事件咋注册 向epfd添加、删除、修改要监听的fd
func epollctl(epfd, op, fd int32, ev *epollevent) int32
//go:noescape
// 3.轮询获取就绪的epitem维护到双向链表,传入创建返回的epfd句柄,以及超时时间,返回就绪的fd句柄
func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32
func closeonexec(fd int32)
golang的 epoll 结构最初是通过 hash表来实现,后面改用红黑树来实现,通过方法 epollctl来维持红黑树结构,关于红黑树:
以上的三个函数(具体用汇编实现,不再研究) :
- epollcreate:在内核创建一个epoll对象,返回一个epfd句柄,后续根据句柄来操作对象
- epollctl:创建红黑树结构体,将fd封装为一个epitem加入红黑树结构,并加一个回调函数注册到内核,状态改变时将item加入rdlist就绪队列中,同时维持红黑树的结构
- epollwait:轮询获取就绪的epitem维护到rdlist双向链表中,传入创建返回的epfd句柄,以及超时时间,返回就绪的fd句柄。如果就绪队列为空就阻塞等待直到超时
整个过程整理流程可以如图所示,上面部分为用户态,下面部分为内核态:
open
随后我们可以看 poll_runtime_pollOpen()
的具体实现:
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
// pollDesc open 连接至此 打开
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
//通过此,分配一个pollDesc,返回*pollDesc类型的变量,这个是runtime的pollDesc,和之前提到的不是完全相同的
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.setEventErr(false)
pd.rseq++
pd.rg.Store(0)
pd.rd = 0
pd.wseq++
pd.wg.Store(0)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
// 初始化 netpollopen
errno := netpollopen(fd, pd)
if errno != 0 {
// poll 缓存区
pollcache.free(pd)
return nil, int(errno)
}
// 关键 pollDesc的真正结构
return pd, 0
}
// 监听文件描述符上的边缘触发事件,创建事件并加入监听
// 这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
// 具体事件
// 注册event事件,这里使用了epoll的ET模式,相对于ET,ET需要每次产生时间是就处理时间,否则容易丢失事件
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
// events记录上pd的指针
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
// 系统调用将该fd加到eventpoll对象中,交由内核监听
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
函数 poll_runtime_pollOpen()
最后返回的 pd 是 runtime 包的pollDesc的指针,与之前提到的不是相同的,这里我们可以理解到,之前我们提到的指针类型的 pollDesc 保存的地址就是这个 runtime.Polldesc 的实例地址!
后面实际调用了 netpollopen() 函数,这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联,通过epollctl的调用将fd加入到epoll中,交由内核监听
到这里服务端的 net.Listener 创建过程就结束了,最后就是对服务端调用的Accept()的研究,关于epoll三大函数的 wait 也将在 Accept 中展开应用
Accept()
从前面的一系列过程可以得知,net.Listen()返回了一个 TCPListener,这个结构体实现了 Listener() 接口,直接找到他的 Accept() 方法来看源码,位置在 net/tcpscok.go
中:
// Accept 实现了Listener接口中的Accept方法;它等待下一次调用并返回一个通用的 Conn
func (l *TCPListener) Accept() (Conn, error) {
...
c, err := l.accept()
...
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
可以看到,TCPListener的Accept经过包装,最后调用的函数为 fd.accept(),也就是netFD.Accept(),再次和我们前面的关键结构netFD关联了起来!
func (fd *netFD) accept() (netfd *netFD, err error) {
// 调用了 poll.FD 的 Accept() 方法
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
...
}
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// 加锁
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 尝试直接获取客户端连接,成功直接返回
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
// 创建的socket非阻塞,没有新连接返回 EAGAIN 而不是阻塞
case syscall.EAGAIN:
// 可轮询的调用 epoll wait来等待通知
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// 这意味着监听队列中的套接字在我们接受()之前已经关闭
continue
}
return -1, nil, errcall, err
}
}
通过代码可以看到一些关键逻辑,其中关键函数有 accept() 与 waitRead(),来看一下 waitRead() 是如何做的:
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
可以看到,像前面的函数一样,调用了 runtime 包的 epollwait,让我们再次回到 runtime/netpoll_epoll.go
来看看具体实现:
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
// 等待 检测是否需要等待
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError
}
关键函数为 netpollblock():
// 轮询时调用的方法,如果ip就绪了返回true,如果没有就绪返回false,分为 'r' 'w' 两种模式
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// 如果是'r'模式,则gpp是&pd.rg,否则是 &pd.wg
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) {
return true
}
if gpp.CompareAndSwap(0, pdWait) {
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
// 在将 gpp 设置为 pdWait 后需要重新检查错误状态这是必要的,
// 因为runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl 做相反的事情:
// 存储到 closing/rd/wd,publishInfo,rg/wg 的加载
if waitio || netpollcheckerr(pd, mode) == pollNoError {
// gopark是很重要的方法,让出当前协程的执行权,一般是返回到g0,让g0重新调度
// 调用 netpollblockcommit,将当前的 G 保存进 pollDesc 的rg或wg中
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
到这里,Accept() 的执行逻辑也基本清楚了:
- 通过socket执行accept直接获取客户端连接
- 客户端没有连接返回 EAGAIN 信号量
- 调用runtime.poll_runtime_pollWait挂起当前协程并保存到pollDesc中
- 有新客户端连接来的时候恢复阻塞的协程重复执行第一步
但是这里没有关于 epoll wait 的执行逻辑呀,其实epoll wait的相关执行位于golang的协程调度代码中,也就是位置于:runtime/proc.go
文件中,调用的是函数 netpoll()
:
// netpoll 轮询网络并返回可运行的准备就绪的 goroutines 列表,传入的参数会决定他的行为
// - 参数 < 0: 无限期阻塞等待文件就绪
// - 参数 == 0: 非阻塞轮询
// - 参数 > 0: 阻塞定期轮询
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
// 声明一个epollevent事件,在epoll_wait系统调用的时候会给该数组赋值并返回一个索引值
// 之后遍历数组取出就绪的fd事件
var events [128]epollevent
retry:
// 陷入系统调用,取出内核eventpoll中的rdlist,返回就绪的事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
// 遍历evnet事件数组
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
var mode int32
// 是否有就绪的读写事件,放入mode标志位
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出存入的pollDesc指针
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
// 具体实现在 netpoll.go 中
// 取出 pd 中的 rg 或 wg,后面放到运行队列
netpollready(&toRun, pd, mode)
}
}
return toRun
}
// 将就绪好的io事件,写入就绪的goroutinue队列
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// 将阻塞的goroutinue加入gList队列返回
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
结合前面的 epoll 流程图可以较快理解上面的代码,可以看到在协程调度过程中会获取准备好的socket并唤醒对应的Goroutinue
epoll的其他方法
close()
关于 close()的核心逻辑 同样位于 runtime/netpoll.go 与 runtime/netpoll_epoll.go 中
//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
// 当发生某些情况,如断开连接,fd销毁都会调用到此
func poll_runtime_pollClose(pd *pollDesc) {
if !pd.closing {
throw("runtime: close polldesc w/o unblock")
}
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on closing polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on closing polldesc")
}
// 调用epoll_ctl系统调用,删除该fd在eventpoll上对应的epitem
netpollclose(pd.fd)
// 释放对应的pd
pollcache.free(pd)
}
// 关闭
func netpollclose(fd uintptr) int32 {
var ev epollevent
return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
}
// 释放内存 对应的pd
func (c *pollCache) free(pd *pollDesc) {
lock(&c.lock)
pd.link = c.first
c.first = pd
unlock(&c.lock)
}
aloc()
pollDesc的空间申请方法
// 关键 缓存pollDesc实现
// go runtime调用 poll_runtime_pollOpen时,往epoll实例注册fd 首次调用次方法 初始大小 4k
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
// 首次
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
// 分配不会被GC回收的内存,确保这些数据结构只能被 epoll和kqueue的内核空间去引用
// GC操作会在go runtime关闭时调用pollcache.free释放内存
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
// 往后每次会先判断链表头是否分配过值
pd := c.first
// 如果是则直接返回表头的pollDesc
// 常见优化设计 批量初始化数据进行缓存 提高netpoller吞吐量
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}
Golang netpoll 总结
同样用一个流程图总结一下吧,可以与上面的流程图结合理解
参考
segmentfault.com/a/119000003… mcll.top/2019/04/07/… strikefreedom.top/archives/go… int64.ink/blog/golang…
转载自:https://juejin.cn/post/7230745439125504061