go 高并发 TCP 网络编程
什么是非阻塞 I/O
下图是四层网络分层,其中数据链路层和网络层都是不可靠的,到了传输层就是可靠的了,机器和机器才能进行可靠的传输,RESP 协议是属于应用层的

TCP 通信过程也就是通过三次握手建立连接,建立连接之后就可以进行数据传输,之后就可以通过四次挥手来通信

操作系统给我们提供了 socket,作为开发者不需要关心底层的网络通信,只需要关心应用层的通信
在 linux 中有一个 Internet domain socket,它的底层是 SOCK_STREAM
在 linux 中每个 socket 都有一个 id,这个 id 叫做“文件描述符”,用 FD 作为标识
我们处理 TCP 实际上就是在处理 socket,socket 内部的过程如下图所示:

server 会新建一个 socket,用来监听心的连接,这是 server socket 处理 listen 状态
这时如果 client1 建立了一个 socket,它会连接 listen 状态的 server socket,然后进行三次握手,握手结束后 server 会再次新建一个 socket 用于和 client1 socket 进行通信,这时这两个 socket 状态是 established,也就是说此时 server 会有两个 socket,一个用于监听新的连接,一个用于和 client 通信
如果又来了一个 client2,server 会再次新建一个 socket 用于和 client2 进行通信

这时 server 会有 3 个 socket,当 client 越来越多时,server 会有越来越多的 socket
这时就诞生了 I/O 模型,I/O 指的是同时操作 socket 的方案,那 I/O 用什么方案同时操作多个 socket 呢?
三个方案:
- 阻塞
- 非阻塞
- 多路复用
阻塞 I/O
阻塞方案是建立三个线程,这三个线程的业务都是类似的,调用 socket 是阻塞式,如果没有新的数据会卡住,也就是说线程会卡住,直到外面有新的数据过来,才会返回新的数据,业务会根据新的数据进行处理,然后把药返回的数据在写回 socket 中

阻塞 I/O 的特点是:
- 同步读取
socket时,线程陷入内核态 - 当读写成功后,切换会用户态,继续执行
- 优点:开发难度小,代码简单
- 缺点:内核态切换开销大
非阻塞 I/O
业务要处理三个 socket,目的是要不断是去这三个 socket 中有没有新数据过来,有新数据就处理新数据,处理后再把数据写回去
非阻塞的意思是,不会卡住,而是会去询问,你有没有新数据,没有新数据就直接返回,在去询问下一个 socket,一直循环下去,不会卡在任何一个 socket 中

那么理论上来说所有的 socket 可以通过一个线程来处理
非阻塞 I/O 的特点是:
- 如何暂时无法收发数据,会返回错误
- 应用会不断的轮询,直到某个
socket可以读写 - 优点:不会陷入内核态,自由度高
- 缺点:需要自旋轮询
多路复用
在 linux 中多路复用叫做 epoll,全称 event poll,是事件池的意思
event poll 中的事件是 各个 scoket 可读/可写事件,就可以让操作系统来监控各个 socket 是否可以操作

还是和之前一样,一个 socket 负责监听,两个 socket 负责和 client 通信,然后把这三个 socket 可读事件注册到 linux epoll 中
然后非阻塞的去调用 linux epoll,去询问这三个事件发生了哪些,如果某个 socket 事件发生了,epoll 会返回一个发生事件的列表,然后业务直接调用对应 socket 背后的业务处理
多路复用 epoll 的特点:
- 注册多个
socket事件 - 调用
epoll,当有时间发生时,返回发生事件的列表 - 优点:提供了事件列表,不需要查询各个
socket - 缺点:开发难度大,逻辑复杂
- 不同操作系统的多路复用实现不同:
linux:epollmac:kqueuewindows:IOCP
go 是如何抽象 epoll
- 在底层使用使用操作系统的多路复用
I/O - 在协程层次使用阻塞模型
- 阻塞协程时,休眠协程
go 将各个系统对 epoll 的操作做了一层封装,抹平各平台的差异
多路复用器在各个系统都有以下功能:
- 新建多路复用器
epoll_create() - 往多路复用器里插入需要监听的事件
epoll_ctl() - 查询发生了什么事件
epoll_wait()
在 go sdk 中搜索 epoll_create,会出现很多结果,我们选择 /cmd/vendor/golang.org/x/sys/unix/zsysnum_linux_amd64.go
在 go sdk 中有一个 netpoll 的文件
它一进来就写了很多注释,第一行注释写了这个工具叫做 network poller,下面有很多被注释掉的方法,这些方法是在不同平台的文件中单独去实现的,netpoll 这个文件是一个总的文件
比如 linux 平台是在 netpoll_epoll 中实现,mac 平台是在 netpoll_kqueue 中实现,windows 平台是在 netpoll_windows 中实现
newtpollinit:初始化多路复用器netpollopen:往多路复用器里插入需要监听的事件netpoll:看下有什么事件发生
// Integrated network poller (platform-independent part).
// A particular implementation (epoll/kqueue/port/AIX/Windows)
// must define the following functions:
//
// func netpollinit()
// Initialize the poller. Only called once.
//
// func netpollopen(fd uintptr, pd *pollDesc) int32
// Arm edge-triggered notifications for fd. The pd argument is to pass
// back to netpollready when fd is ready. Return an errno value.
//
// func netpollclose(fd uintptr) int32
// Disable notifications for fd. Return an errno value.
//
// func netpoll(delta int64) gList
// Poll the network. If delta < 0, block indefinitely. If delta == 0,
// poll without blocking. If delta > 0, block for up to delta nanoseconds.
// Return a list of goroutines built by calling netpollready.
//
// func netpollBreak()
// Wake up the network poller, assumed to be blocked in netpoll.
//
// func netpollIsPollDescriptor(fd uintptr) bool
// Reports whether fd is a file descriptor used by the poller.
也就说 linux 中的 epoll 对应 go 中的 netpoll
epoll_create->netpollinitepoll_ctl->netpollopenepoll_wait->netpoll
netpollinit 作用是创建多路复用器:
- 新建
epoll - 新建一个
pipe管道,用于中断epoll - 将“管道有数据到达”注册在
epoll中
func netpollinit() {
var errno uintptr
// 新建 epoll
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if errno != 0 {
println("runtime: epollcreate failed with", errno)
throw("runtime: netpollinit failed")
}
// 创建一个管道,linux 的管道,用来关闭 epoll
r, w, errpipe := nonblockingPipe()
if errpipe != 0 {
println("runtime: pipe failed with", -errpipe)
throw("runtime: pipe failed")
}
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
// 将管道的读事件加入 epoll
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
netpollopen 作用是插入事件
- 传入一个
socket的fd和pollDesc指针 pollDesc指针是socket相关详细信息pollDesc中记录了哪个协程休眠在等待此socket- 将
socket可读,可写,端开事件注册到epoll中
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
// 设置事件
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
// 调用底层的 epoll_ctl
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}
pollDesc 是 go 网络层对 socket 的描述
fd:socket idrg:等待读取的协程wg:等待写入的协程
type pollDesc struct {
fd uintptr // constant for pollDesc usage lifetime
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}
netpoll 查询发生了什么事件
- 调用
epoll_wait查询有哪些事件发生 - 根据
socket相关的pollDesc信息,返回哪些协程可以唤醒
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
var events [128]syscall.EpollEvent
retry:
// epfd 是 epoll 的 id
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
if errno != 0 {
if errno != _EINTR {
println("runtime: epollwait on fd", epfd, "failed with", errno)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
// 多少个事件就有多少 n
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.Events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
netpollWakeSig.Store(0)
}
continue
}
var mode int32
// EPOLLIN:可读事件
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
// EPOLLOUT:可写事件
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
pd := (*pollDesc)(tp.pointer())
tag := tp.tag()
if pd.fdseq.Load() == tag {
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
netpollready(&toRun, pd, mode)
}
}
}
// 返回需要唤醒的协程列表
return toRun
}
Network Poller 是如何工作的
Network Poller 底层有一个多路复用抽象层,用来屏蔽不同平台对多路复用器的实现
Network Poller 初始化
Network Poller 初始化是 poll_runtime_pollServerInit,使用原子操作,保证一个 go 程序只会初始化一次 netpoll
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
// 用原子操作初始化,保证只初始化一次
if netpollInited.Load() == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited.Load() == 0 {
// 初始化多路复用器
netpollinit()
netpollInited.Store(1)
}
unlock(&netpollInitLock)
}
}
pollCache 和 pollDesc
pollCache:是一个带锁的链表头pollDesc:链表的成员pollDesc是runtime包对socket的详细描述
pollCache 结构体如下
pollCache 实际上是一个链表头,它记录了 pollDesc 中第一个成员的指针,pollCache 的作用是为了放置一个锁,用来锁住 pollDesc
type pollCache struct {
lock mutex // 锁
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
pollDesc 是一个链表成员,link 属性指向了下一个 pollDesc
rg:可能是pdReady、pdWait、等待读取这个socket的地址wg:可能是pdReady、pdWait、等待写这个socket的地址- 写为什么也需要等待,是因为底层的网卡是有发送能力的,写数据时可能需要等待前面的数据发送完毕
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
fd uintptr // constant for pollDesc usage lifetime
// rg, wg are accessed atomically and hold g pointers.
// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}
Network Poller 新增监听 socket
Network Poller 新增监听 socket 是 poll_runtime_pollOpen 方法,在 pollCache 中分配一个新的 pollDesc,初始化 pollDesc(rg、wg 为 0),调用 netpollopen 将 fd 和 pd 插入 epoll
go:linkname是一个go语言的关键字,用来将一个函数的实现指向另一个函数poll_runtime_pollOpen和internal/poll.runtime_pollOpen是同一个函数
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// 分配一个新的 pollDesc
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
if pd.fdseq.Load() == 0 {
// The value 0 is special in setEventErr, so don't use it.
pd.fdseq.Store(1)
}
// 初始化 pollDesc
pd.closing = false
pd.setEventErr(false, 0)
pd.rseq++
pd.rg.Store(pdNil)
pd.rd = 0
pd.wseq++
pd.wg.Store(pdNil)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
// 将 fd 和 pd 插入 epoll
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
Network Poller 收发数据
Network Poller 是怎么做到一个协程对应一个 socket
收发数据分为两个场景:
- 协程需要收发数据时,
socket已经可读可写runtime循环调用netpoll方法(g0协程)netpoll被startTheWorldWithSema调用,startTheWorldWithSema被gcStart调用。为什么事件循环会被gcStart调用呢?因为只是gcStart是周期性不断的在调用
- 发现
socket可读可写时,给对应的rg或者wg设置为pdReady(1) - 协程调用
poll_runtime_pollWait方法 - 判断
rg或者wg已经被置为pdReady(1),返回0
- 协程需要收发数据时,
socket暂时无法读写runtime循环调用netpoll方法(g0协程)- 协程调用
poll_runtime_pollWait方法 - 发现对应的
rg或者wg为0 - 给对应的
rg或者wg置为协程地址 - 休眠等待
发现 socket 可读可写时,查看对应的 rg 和 wg,如果rg 或者 wg 是协程地址的话(不是 0、1、2),说明有协程在休眠监听,将协程地址返回给 runtime,然后调度器开始调度对应的协程
go 是如何抽象 socket
go 中 socket 是 net 提供的
net包抽象了TCP网络操作- 使用
net.Listen()得到TCPListener(listen状态的socket) - 使用
listen.Accept()得到TCPConn(established状态的socket) TCPConn.Read/Write进行读写socket操作Network poll作为上述功能的底层支撑
我们在前面知道了 server 监听新连接,会新建一个 socket,新建新连接是由 net.listen 方法完成的
net.listen 方法返回的是一个 net.Listener 接口,net.Listener 接口中有一个 Accept 方法,用来接收新的连接
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
Listen 方法中调用了 ListenConfig 的 Listen 方法,ListenConfig 是一个结构体,它的 Listen 方法中根据不同的网络类型,调用不同的 listenTCP、listenUnix 方法
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// 解析地址
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
if sl.MultipathTCP() {
l, err = sl.listenMPTCP(ctx, la)
} else {
// 调用 listenTCP 方法
l, err = sl.listenTCP(ctx, la)
}
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
listenTCP 方法内部会调用 socket 方法
net 包中对 socket 详细的描述是 netFD
netFD 中有一个 pfd 属性,pfd 是 poll.FD 类型,poll.FD 就是上面说的 pollDesc
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
net.Listen:
- 新建
socket,并执行bind操作 - 新建一个
FD(net包对socket详细描述) - 返回一个
TCPListener对象 - 将
TCPListener的FD信息加入监听 TCPListener对象本质上是一个LISTEN状态的socket
ls.Accept 方法是用来接收新连接的,返回的时候 Conn 接口`
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
- 直接调用
socket的accept方法 - 如果失败,休眠等待新的连接
- 将新的
socket包装为TCPConn变量返回 - 将
TCPConn的FD信息加入监听 TCPConn本质上是一个ESTABLISHED状态的socket
conn.Read 方法是用来读取数据
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
conn.Write 方法是用来写入数据
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
TCPConn.Read 和 TCPConn.Write 方法内部调用了 netFD.Read 和 netFD.Write 方法
netFD.Read 和 netFD.Write 方法内部调用了 poll.FD.Read 和 poll.FD.Write 方法
poll.FD.Read 和 poll.FD.Write 方法内部调用了 netpoll 方法
- 调用
socket原生读写方法 - 如果调用失败,休眠等待可读/可写
- 被唤醒后调用系统
socket
转载自:https://juejin.cn/post/7392537225629941772