likes
comments
collection
share

从 net 标准库源码探索 go 网络编程高效的秘密

作者站长头像
站长
· 阅读数 17

go net

golang 官方 net 标准库提供一套网络编程的接口,一个简单的网络编程范式如下,该程序会开启一个监听在 40000 端口的 TCP 服务,在主循环中接收用户的请求,请求到来后开启一个 goroutine 进行处理。

func main() {
    listen, _ := net.Listen("tcp", ":40000")
    defer listen.Close()

    for {
        conn, _ := listen.Accept()
        // 处理连接
        go process(conn)
    }
}

将这段代码对应到 c 的网络编程,看起来就像这样:

int main() {
    server_fd = socket(AF_INET, SOCK_STREAM, 0)
    bind(server_fd...)
    listen(server_fd...)
        
    while (1) {
        client_fd = accept(server_fd...)
        // 创建线程处理请求
        pthread_create(...)
    }
    return 0;
}

虽然二者的代码看来起十分相似,但实际行为却大不一样。golang 对 socket 编程有着更高层的封装,go 在内部实际封装了epoll 多路复用机制,在一次 net.Listen 方法调用中,相当于 c 语言中调用了 socket、bind、listen、epoll_create、epoll_ctl 等方法,虽然上层调用看起来相似,但实际 go 的这套范式的运行机制和上面 c 代码是不一样的。

net.Listen

以我们的 TCP 服务为例,net.Listen() 调用了 ListenConfig.Listen() ,方法会根据用户传入的 networkaddress 地址解析出一个可用的地址列表,然后在合适的地址上开始监听服务:

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    // 解析地址列表
    addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
    if err != nil {
        return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
    }
    
    sl := &sysListener{
        ListenConfig: *lc,
        network:      network,
        address:      address,
    }
    var l Listener
    // 从地址列表中选出第一个 ipv4 地址,如何没有,就选择第一个地址
    la := addrs.first(isIPv4)
    switch la := la.(type) {
    case *TCPAddr:
        // 开始监听服务
        l, err = sl.listenTCP(ctx, la)
        ...

    return l, nil
}

listenTCP() 方法创建一个 TCP 服务,它主要就是调用了内部的 socket 入口方法 internetSocket(),再经过一系列调用,最终到达 socket() 函数。

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", ctrlCtxFn)
    ...
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlCtxFn)
}

socket() 函数位于源码 src/net/sock_posix.go 文件,函数首先会进行 socket() 系统调用,拿到监听描述符,然后设置一些默认的 socketopt,将 fd 包装起来,随后调用 listenStream() 进行 bind 和 listen。

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // socket 系统调用,返回 socket fd
    s, err := sysSocket(family, sotype, proto)
    ...
    // 一些 socketopt
    if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    // 封装 fd
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    
    // laddr: local addr
    // raddr: remote addr
    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            // bind 和 listen 操作
            if err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
    }
    ...
}

syscall socket

使用 syscall 进行了 socket 系统调用,并将返回的描述符设置为 non-block 非阻塞模式,非阻塞模式的作用是使程序在等待数据到达时不会被阻塞,而是立即返回,这样可以让程序在等待数据到达的同时继续执行其他任务,提高程序的并发性和响应性。

func sysSocket(family, sotype, proto int) (int, error) {
    ...
    s, err := socketFunc(family, sotype, proto)

    if err = syscall.SetNonblock(s, true); err != nil {
        poll.CloseFunc(s)
        return -1, os.NewSyscallError("setnonblock", err)
    }
    return s, nil
}

// src/net/hook_unix.go
var (
    // 对应的一些系统调用
    socketFunc        func(int, int, int) (int, error)  = syscall.Socket
    connectFunc       func(int, syscall.Sockaddr) error = syscall.Connect
    listenFunc        func(int, int) error              = syscall.Listen
    getsockoptIntFunc func(int, int, int) (int, error)  = syscall.GetsockoptInt
)

syscall bind & listen

listenStream() 中对 socket 调用了 bindlisten,这里的行为和 c 中的一致,最关键的地方是 fd.init(),这里初始化了 socket 与 epoll 相关的内容。

func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
    ...
    // bind 系统调用
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    // listen 系统调用
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    // 初始化 fd
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

fd.init() 最终会执行到 pollDesc.init() 方法,它执行了 runtime_pollServerInit()runtime_pollOpen() 两个运行时的函数,并使用 sync 包保证了只初始化一次,两个函数的函数体都定义在源码的 runtime 包下,前者对应 runtime 包下的 poll_runtime_pollServerInit() 函数,后者对应 poll_runtime_pollOpen() 函数:

// src/internal/poll/fd_poll_runtime.go
var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    ...
    return nil
}

// src/runtime/netpoll.go
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    ...
}

// src/runtime/netpoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    ...
}

syscall epoll

在 runtime 包的实际函数定义针对不同操作系统以文件的形式进行了区分,如在 linux 中使用 epoll,对应 runtime/netpoll_epoll.go;在 darwin/macOS 中使用 kqueue,对应 runtime/netpoll_kqueue.go

初始化的主要工作就是创建了一个 epoll 实例,poll_runtime_pollServerInit() 函数最终会调用到 netpollinit(),在这里会进行 epoll 系统调用创建一个 epoll 实例(在 MacOS 中为 kqueue),这里与 c 语言中的 epoll api 行为也是一致的。

func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    ...
    netpollinit()
}

// src/runtime/netpoll_epoll.go
func netpollinit() {
    // 创建 epoll 实例
    epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
    ...
    ev := syscall.EpollEvent{
        Events: syscall.EPOLLIN,
    }
    errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
    ...
}

初始化完成之后,继续调用 poll_runtime_pollOpen() ,它主要调用了 netpollopen() 函数将 fd 交由 epoll 进行管理,这里也是我们熟悉的 epoll api,给 fd (这里是我们的 listen fd)赋予要监听的事件然后加入到 epoll 中。

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    ...
    errno := netpollopen(fd, pd)
}

// src/runtime/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
    var ev syscall.EpollEvent
    ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.Data)) = pd
    return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

Accept

在我们的 TCP 服务示例中,Accept() 会调用到 TCPListenerAccept 实现,当有客户端请求时,会返回一个已连接对象 Conn

func (l *TCPListener) Accept() (Conn, error) {
    ...
    c, err := l.accept()
    return c, nil
}

Accept() 会一路执行到 netFDaccept() 方法,首先会调用 fd.pfd.Accept() 尝试获取接收一个客户端的连接,如果没有客户端请求,会阻塞当前 goroutine。接收到客户端的请求后,流程与之前 socket() 方法一样,创建一个 netFD 对象并进行初始化,即将新的已连接 fd 加入到 epoll。

func (fd *netFD) accept() (netfd *netFD, err error) {
    d, rsa, errcall, err := fd.pfd.Accept()
    ...
    // 创建 netFD
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        ...
    }
    // 初始化 netFD,加入 epoll
    if err = netfd.init(); err != nil {
        ...
    }
    return netfd, nil
}

fd.pfd.Accept() 方法是 Accept 流程的核心内容,它在一个无限循环中调用 accept() ,它的流程就是进行 accept 系统调用,在系统调用成功获取到一个已连接 fd 后,会将其变成 non-block 非阻塞模式,这里就不展开看了。

还记得我们之前在 socket() 中将监听描述也设置成了 non-block 模式,在没有客户端连接的情况下,accept 系统调用会立刻返回一个 EAGAIN 错误,不再阻塞线程/协程。捕获到 EAGAIN 错误后,会进入到 fd.pd.waitRead()

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    ...
    for {
        // accept 系统调用以及设置 non-block 模式
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        // 由于监听 fd 是 non-block 模式,accept 在没有客户请求的时候会立刻返回
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        ...
        }
        return -1, nil, errcall, err
    }
}

fd.pd.waitRead() 最终会执行到 runtime_pollWait()

func (pd *pollDesc) wait(mode int, isFile bool) error {
    ...
    res := runtime_pollWait(pd.runtimeCtx, mode)
}

与之前的 runtime_pollOpen() 一样,我们可以在 runtime/netpoll.go 下看到 runtime_pollWait() 具体的函数定义,其会根据 mode 参数调用 netpollblock() 阻塞当前协程等待一个描述符准备好读或写。

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    ...
    for !netpollblock(pd, int32(mode), false) {
        ...
    }
}

netpollblock() 内部调用了 gopark() 阻塞当前协程,它是 go 内部用于 goroutinue 调度的函数。这里可以认为是将当前协程加入了 socket fd 的等待队列,当 fd 有新数据到来时,runtime 会将阻塞在该 fd 上的 goroutine 唤醒。可以看到,虽然 accept 系统调用本身不阻塞,但 go 内部还是会将协程挂起,以便 runtime 进行调度。

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    ...
    if waitio || netpollcheckerr(pd, mode) == pollNoError {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
}

go runtime 内部会调用 epoll_wait 来等待 fd 上的事件,当调度 goroutine 时,会获取到产生事件的 fd,然后唤醒阻塞在 fd 的协程,这里属于 go 内部的运行逻辑了,这里了解一下即可。

当有客户端请求过来时,accept 系统调用会成功返回已连接 fd,并使用 newFD()netfd.init() 交由 epoll 管理,也就是会执行到之前提到的 poll_runtime_pollOpen() 函数。

Read

接下来看看从一个连接中读取数据的过程,在 Accept() 返回后,通常我们会开启一个 goroutine 来处理这个连接:

func main() {
    listen, _ := net.Listen("tcp", ":40000")
    defer listen.Close()

    for {
        conn, _ := listen.Accept()
        // 处理连接
        go process(conn)
    }
}

func process(conn net.Conn) {
    var buf [1024]byte
    // 从连接中读取数据
    conn.Read(buf[:])
}

Read() 最终会执行到 FD.Read() ,其内部流程几乎之前的 FD.Accept() 一致,首先进行一个 read() 系统调用,如果连接的接收队列中存在消息,则直接读取到 []byte 中并返回;在接收队列没有消息可读的情况下,read() 系统调用会立刻返回一个 EAGAIN 错误,这是因为在 Accept 中新的连接描述符都会被设置为 non-block 模式,捕获到 EAGAIN 后调用 fd.pd.waitRead 将当前 goroutine 挂起,等待 runtime 调度。

func (fd *FD) Read(p []byte) (int, error) {
    ...
    for {
        // syscall.Read 系统调用
        n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
        if err != nil {
            // 没有数据的情况返回 EAGAIN 并阻塞当前 goroutine
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        }
        return n, err
    }
}

Write

既然我们了解了 Read() 的流程,那么 Write() 自然也能够大致猜出来了,它的流程与 Read() 是类似的。首先进行 write 系统调用,如果连接的发送队列中没有足够的空间或某些原因导致的无法发送,系统调用会立刻返回一个错误,如果是 EAGAIN 则将当前 goroutine 挂起等待调度。

func (fd *FD) Write(p []byte) (int, error) {
    ...
    for {
        // Write 系统调用
        n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
        ...
        // 捕获 EAGAIN 并挂起 goroutine
        if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitWrite(fd.isFile); err == nil {
                continue
            }
        }
    }
}

唤醒 goroutine

前面已经知道在 AcceptReadWrite 产生 EAGAIN 错误时,与当前 fd 关联的 goroutine 会被阻塞,避免一直占用 CPU,而将这些 goroutine 唤醒的操作是由 runtime 内部来进行的。runtime 会在一些过程中(例如进行 goroutine 调度时)调用 netpoll 来不断地执行 epoll_wait 来查看哪些 fd 有准备就绪的待处理事件,epoll 的 netpoll 函数源码位于 src/runtime/netpoll_epoll.go

func netpoll(delay int64) gList {
retry:
    // epoll_wait 系统调用
    n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
    if errno != 0 {
        ...
        // 没有已就绪的 fd,继续尝试
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := events[i]
        ...

        // 判断事件类型读或写
        var mode int32
        if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            ...
            // 唤醒并重新调度阻塞的 goroutinue
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

总结

最后针对我们简易的 TCP 服务大致总结一下。在 Listen 阶段,流程为:

  1. network 以及地址解析。
  2. socket 系统调用,获取到监听描述符并包装成 netFD 对象。
  3. bindlisten 系统调用,标准的 socket 编程 api。
  4. 初始化监听描述符对象,epoll 系统调用,创建 epoll 实例并将监听描述符给到其管理。

Accept 阶段,流程为:

  1. accept 系统调用。
  2. 没有客户端连接时,挂起 goroutine,等待 runtime 调度。
  3. 有客户端连接时,将已连接 fd 包装成 netFD 并加入 epoll 管理。

对连接进行 Read / Write 时,会先进行相应的系统调用,如果返回 EAGAIN 则挂起 goroutine,等待 runtime 调度。在整个模型中,需要 runtime 配合调用 epoll_wait 等待网络事件并唤醒阻塞的 goroutine。

go 对网络编程有着高度封装,利用了 epoll 和 goroutine 结合的方式,使其有这非常不俗的性能,go 的 http 和 rpc 标准库也都依赖这一套网络编程模型,可能这也是 go 比较适合网络编程方面的原因之一。