从 net 标准库源码探索 go 网络编程高效的秘密
go net
golang 官方 net
标准库提供一套网络编程的接口,一个简单的网络编程范式如下,该程序会开启一个监听在 40000 端口的 TCP 服务,在主循环中接收用户的请求,请求到来后开启一个 goroutine 进行处理。
func main() {
listen, _ := net.Listen("tcp", ":40000")
defer listen.Close()
for {
conn, _ := listen.Accept()
// 处理连接
go process(conn)
}
}
将这段代码对应到 c 的网络编程,看起来就像这样:
int main() {
server_fd = socket(AF_INET, SOCK_STREAM, 0)
bind(server_fd...)
listen(server_fd...)
while (1) {
client_fd = accept(server_fd...)
// 创建线程处理请求
pthread_create(...)
}
return 0;
}
虽然二者的代码看来起十分相似,但实际行为却大不一样。golang 对 socket 编程有着更高层的封装,go 在内部实际封装了epoll
多路复用机制,在一次 net.Listen
方法调用中,相当于 c 语言中调用了 socket、bind、listen、epoll_create、epoll_ctl 等方法,虽然上层调用看起来相似,但实际 go 的这套范式的运行机制和上面 c 代码是不一样的。
net.Listen
以我们的 TCP 服务为例,net.Listen()
调用了 ListenConfig.Listen()
,方法会根据用户传入的 network
和 address
地址解析出一个可用的地址列表,然后在合适的地址上开始监听服务:
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// 解析地址列表
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
// 从地址列表中选出第一个 ipv4 地址,如何没有,就选择第一个地址
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
// 开始监听服务
l, err = sl.listenTCP(ctx, la)
...
return l, nil
}
listenTCP()
方法创建一个 TCP 服务,它主要就是调用了内部的 socket 入口方法 internetSocket()
,再经过一系列调用,最终到达 socket()
函数。
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", ctrlCtxFn)
...
}
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlCtxFn)
}
socket()
函数位于源码 src/net/sock_posix.go
文件,函数首先会进行 socket()
系统调用,拿到监听描述符,然后设置一些默认的 socketopt
,将 fd 包装起来,随后调用 listenStream()
进行 bind 和 listen。
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
// socket 系统调用,返回 socket fd
s, err := sysSocket(family, sotype, proto)
...
// 一些 socketopt
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
// 封装 fd
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// laddr: local addr
// raddr: remote addr
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// bind 和 listen 操作
if err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
...
}
syscall socket
使用 syscall 进行了 socket
系统调用,并将返回的描述符设置为 non-block
非阻塞模式,非阻塞模式的作用是使程序在等待数据到达时不会被阻塞,而是立即返回,这样可以让程序在等待数据到达的同时继续执行其他任务,提高程序的并发性和响应性。
func sysSocket(family, sotype, proto int) (int, error) {
...
s, err := socketFunc(family, sotype, proto)
if err = syscall.SetNonblock(s, true); err != nil {
poll.CloseFunc(s)
return -1, os.NewSyscallError("setnonblock", err)
}
return s, nil
}
// src/net/hook_unix.go
var (
// 对应的一些系统调用
socketFunc func(int, int, int) (int, error) = syscall.Socket
connectFunc func(int, syscall.Sockaddr) error = syscall.Connect
listenFunc func(int, int) error = syscall.Listen
getsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt
)
syscall bind & listen
在 listenStream()
中对 socket 调用了 bind
和 listen
,这里的行为和 c 中的一致,最关键的地方是 fd.init()
,这里初始化了 socket 与 epoll 相关的内容。
func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
...
// bind 系统调用
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// listen 系统调用
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 初始化 fd
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
fd.init()
最终会执行到 pollDesc.init()
方法,它执行了 runtime_pollServerInit()
和 runtime_pollOpen()
两个运行时的函数,并使用 sync
包保证了只初始化一次,两个函数的函数体都定义在源码的 runtime
包下,前者对应 runtime
包下的 poll_runtime_pollServerInit()
函数,后者对应 poll_runtime_pollOpen()
函数:
// src/internal/poll/fd_poll_runtime.go
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
return nil
}
// src/runtime/netpoll.go
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
...
}
// src/runtime/netpoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
...
}
syscall epoll
在 runtime 包的实际函数定义针对不同操作系统以文件的形式进行了区分,如在 linux 中使用 epoll,对应 runtime/netpoll_epoll.go
;在 darwin/macOS 中使用 kqueue,对应 runtime/netpoll_kqueue.go
。
初始化的主要工作就是创建了一个 epoll
实例,poll_runtime_pollServerInit()
函数最终会调用到 netpollinit()
,在这里会进行 epoll
系统调用创建一个 epoll
实例(在 MacOS 中为 kqueue),这里与 c 语言中的 epoll api 行为也是一致的。
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
...
netpollinit()
}
// src/runtime/netpoll_epoll.go
func netpollinit() {
// 创建 epoll 实例
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
...
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
...
}
初始化完成之后,继续调用 poll_runtime_pollOpen()
,它主要调用了 netpollopen()
函数将 fd 交由 epoll 进行管理,这里也是我们熟悉的 epoll api,给 fd (这里是我们的 listen fd)赋予要监听的事件然后加入到 epoll 中。
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
...
errno := netpollopen(fd, pd)
}
// src/runtime/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.Data)) = pd
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}
Accept
在我们的 TCP 服务示例中,Accept()
会调用到 TCPListener
的 Accept
实现,当有客户端请求时,会返回一个已连接对象 Conn
。
func (l *TCPListener) Accept() (Conn, error) {
...
c, err := l.accept()
return c, nil
}
Accept()
会一路执行到 netFD
的 accept()
方法,首先会调用 fd.pfd.Accept()
尝试获取接收一个客户端的连接,如果没有客户端请求,会阻塞当前 goroutine。接收到客户端的请求后,流程与之前 socket()
方法一样,创建一个 netFD
对象并进行初始化,即将新的已连接 fd 加入到 epoll。
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
...
// 创建 netFD
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
...
}
// 初始化 netFD,加入 epoll
if err = netfd.init(); err != nil {
...
}
return netfd, nil
}
fd.pfd.Accept()
方法是 Accept
流程的核心内容,它在一个无限循环中调用 accept()
,它的流程就是进行 accept
系统调用,在系统调用成功获取到一个已连接 fd 后,会将其变成 non-block
非阻塞模式,这里就不展开看了。
还记得我们之前在 socket()
中将监听描述也设置成了 non-block
模式,在没有客户端连接的情况下,accept
系统调用会立刻返回一个 EAGAIN
错误,不再阻塞线程/协程。捕获到 EAGAIN
错误后,会进入到 fd.pd.waitRead()
。
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
...
for {
// accept 系统调用以及设置 non-block 模式
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
// 由于监听 fd 是 non-block 模式,accept 在没有客户请求的时候会立刻返回
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
...
}
return -1, nil, errcall, err
}
}
fd.pd.waitRead()
最终会执行到 runtime_pollWait()
。
func (pd *pollDesc) wait(mode int, isFile bool) error {
...
res := runtime_pollWait(pd.runtimeCtx, mode)
}
与之前的 runtime_pollOpen()
一样,我们可以在 runtime/netpoll.go
下看到 runtime_pollWait()
具体的函数定义,其会根据 mode 参数调用 netpollblock()
阻塞当前协程等待一个描述符准备好读或写。
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
for !netpollblock(pd, int32(mode), false) {
...
}
}
netpollblock()
内部调用了 gopark()
阻塞当前协程,它是 go 内部用于 goroutinue 调度的函数。这里可以认为是将当前协程加入了 socket fd 的等待队列,当 fd 有新数据到来时,runtime 会将阻塞在该 fd 上的 goroutine 唤醒。可以看到,虽然 accept
系统调用本身不阻塞,但 go 内部还是会将协程挂起,以便 runtime 进行调度。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
...
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
}
go runtime 内部会调用 epoll_wait
来等待 fd 上的事件,当调度 goroutine 时,会获取到产生事件的 fd,然后唤醒阻塞在 fd 的协程,这里属于 go 内部的运行逻辑了,这里了解一下即可。
当有客户端请求过来时,accept
系统调用会成功返回已连接 fd,并使用 newFD()
和 netfd.init()
交由 epoll 管理,也就是会执行到之前提到的 poll_runtime_pollOpen()
函数。
Read
接下来看看从一个连接中读取数据的过程,在 Accept()
返回后,通常我们会开启一个 goroutine 来处理这个连接:
func main() {
listen, _ := net.Listen("tcp", ":40000")
defer listen.Close()
for {
conn, _ := listen.Accept()
// 处理连接
go process(conn)
}
}
func process(conn net.Conn) {
var buf [1024]byte
// 从连接中读取数据
conn.Read(buf[:])
}
Read()
最终会执行到 FD.Read()
,其内部流程几乎之前的 FD.Accept()
一致,首先进行一个 read()
系统调用,如果连接的接收队列中存在消息,则直接读取到 []byte 中并返回;在接收队列没有消息可读的情况下,read()
系统调用会立刻返回一个 EAGAIN
错误,这是因为在 Accept
中新的连接描述符都会被设置为 non-block 模式,捕获到 EAGAIN
后调用 fd.pd.waitRead
将当前 goroutine 挂起,等待 runtime 调度。
func (fd *FD) Read(p []byte) (int, error) {
...
for {
// syscall.Read 系统调用
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
// 没有数据的情况返回 EAGAIN 并阻塞当前 goroutine
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
return n, err
}
}
Write
既然我们了解了 Read()
的流程,那么 Write()
自然也能够大致猜出来了,它的流程与 Read()
是类似的。首先进行 write
系统调用,如果连接的发送队列中没有足够的空间或某些原因导致的无法发送,系统调用会立刻返回一个错误,如果是 EAGAIN
则将当前 goroutine 挂起等待调度。
func (fd *FD) Write(p []byte) (int, error) {
...
for {
// Write 系统调用
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
...
// 捕获 EAGAIN 并挂起 goroutine
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
}
}
唤醒 goroutine
前面已经知道在 Accept
、 Read
、Write
产生 EAGAIN
错误时,与当前 fd 关联的 goroutine 会被阻塞,避免一直占用 CPU,而将这些 goroutine 唤醒的操作是由 runtime 内部来进行的。runtime 会在一些过程中(例如进行 goroutine 调度时)调用 netpoll
来不断地执行 epoll_wait
来查看哪些 fd 有准备就绪的待处理事件,epoll 的 netpoll
函数源码位于 src/runtime/netpoll_epoll.go
。
func netpoll(delay int64) gList {
retry:
// epoll_wait 系统调用
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
if errno != 0 {
...
// 没有已就绪的 fd,继续尝试
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := events[i]
...
// 判断事件类型读或写
var mode int32
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
...
// 唤醒并重新调度阻塞的 goroutinue
netpollready(&toRun, pd, mode)
}
}
return toRun
}
总结
最后针对我们简易的 TCP 服务大致总结一下。在 Listen
阶段,流程为:
- network 以及地址解析。
socket
系统调用,获取到监听描述符并包装成netFD
对象。bind
、listen
系统调用,标准的 socket 编程 api。- 初始化监听描述符对象,
epoll
系统调用,创建 epoll 实例并将监听描述符给到其管理。
在 Accept
阶段,流程为:
accept
系统调用。- 没有客户端连接时,挂起 goroutine,等待 runtime 调度。
- 有客户端连接时,将已连接 fd 包装成
netFD
并加入 epoll 管理。
对连接进行 Read
/ Write
时,会先进行相应的系统调用,如果返回 EAGAIN
则挂起 goroutine,等待 runtime 调度。在整个模型中,需要 runtime 配合调用 epoll_wait
等待网络事件并唤醒阻塞的 goroutine。
go 对网络编程有着高度封装,利用了 epoll 和 goroutine 结合的方式,使其有这非常不俗的性能,go 的 http 和 rpc 标准库也都依赖这一套网络编程模型,可能这也是 go 比较适合网络编程方面的原因之一。