go 高并发 TCP 网络编程
什么是非阻塞 I/O
下图是四层网络分层,其中数据链路层和网络层都是不可靠的,到了传输层就是可靠的了,机器和机器才能进行可靠的传输,RESP
协议是属于应用层的
TCP
通信过程也就是通过三次握手建立连接,建立连接之后就可以进行数据传输,之后就可以通过四次挥手来通信
操作系统给我们提供了 socket
,作为开发者不需要关心底层的网络通信,只需要关心应用层的通信
在 linux
中有一个 Internet domain socket
,它的底层是 SOCK_STREAM
在 linux
中每个 socket
都有一个 id
,这个 id
叫做“文件描述符”,用 FD
作为标识
我们处理 TCP
实际上就是在处理 socket
,socket
内部的过程如下图所示:
server
会新建一个 socket
,用来监听心的连接,这是 server socket
处理 listen
状态
这时如果 client1
建立了一个 socket
,它会连接 listen
状态的 server socket
,然后进行三次握手,握手结束后 server
会再次新建一个 socket
用于和 client1 socket
进行通信,这时这两个 socket
状态是 established
,也就是说此时 server
会有两个 socket
,一个用于监听新的连接,一个用于和 client
通信
如果又来了一个 client2
,server
会再次新建一个 socket
用于和 client2
进行通信
这时 server
会有 3
个 socket
,当 client
越来越多时,server
会有越来越多的 socket
这时就诞生了 I/O
模型,I/O
指的是同时操作 socket
的方案,那 I/O
用什么方案同时操作多个 socket
呢?
三个方案:
- 阻塞
- 非阻塞
- 多路复用
阻塞 I/O
阻塞方案是建立三个线程,这三个线程的业务都是类似的,调用 socket
是阻塞式,如果没有新的数据会卡住,也就是说线程会卡住,直到外面有新的数据过来,才会返回新的数据,业务会根据新的数据进行处理,然后把药返回的数据在写回 socket
中
阻塞 I/O
的特点是:
- 同步读取
socket
时,线程陷入内核态 - 当读写成功后,切换会用户态,继续执行
- 优点:开发难度小,代码简单
- 缺点:内核态切换开销大
非阻塞 I/O
业务要处理三个 socket
,目的是要不断是去这三个 socket
中有没有新数据过来,有新数据就处理新数据,处理后再把数据写回去
非阻塞的意思是,不会卡住,而是会去询问,你有没有新数据,没有新数据就直接返回,在去询问下一个 socket
,一直循环下去,不会卡在任何一个 socket
中
那么理论上来说所有的 socket
可以通过一个线程来处理
非阻塞 I/O
的特点是:
- 如何暂时无法收发数据,会返回错误
- 应用会不断的轮询,直到某个
socket
可以读写 - 优点:不会陷入内核态,自由度高
- 缺点:需要自旋轮询
多路复用
在 linux
中多路复用叫做 epoll
,全称 event poll
,是事件池的意思
event poll
中的事件是 各个 scoket
可读/可写事件,就可以让操作系统来监控各个 socket
是否可以操作
还是和之前一样,一个 socket
负责监听,两个 socket
负责和 client
通信,然后把这三个 socket
可读事件注册到 linux epoll
中
然后非阻塞的去调用 linux epoll
,去询问这三个事件发生了哪些,如果某个 socket
事件发生了,epoll
会返回一个发生事件的列表,然后业务直接调用对应 socket
背后的业务处理
多路复用 epoll
的特点:
- 注册多个
socket
事件 - 调用
epoll
,当有时间发生时,返回发生事件的列表 - 优点:提供了事件列表,不需要查询各个
socket
- 缺点:开发难度大,逻辑复杂
- 不同操作系统的多路复用实现不同:
linux
:epoll
mac
:kqueue
windows
:IOCP
go 是如何抽象 epoll
- 在底层使用使用操作系统的多路复用
I/O
- 在协程层次使用阻塞模型
- 阻塞协程时,休眠协程
go
将各个系统对 epoll
的操作做了一层封装,抹平各平台的差异
多路复用器在各个系统都有以下功能:
- 新建多路复用器
epoll_create()
- 往多路复用器里插入需要监听的事件
epoll_ctl()
- 查询发生了什么事件
epoll_wait()
在 go sdk
中搜索 epoll_create
,会出现很多结果,我们选择 /cmd/vendor/golang.org/x/sys/unix/zsysnum_linux_amd64.go
在 go sdk
中有一个 netpoll
的文件
它一进来就写了很多注释,第一行注释写了这个工具叫做 network poller
,下面有很多被注释掉的方法,这些方法是在不同平台的文件中单独去实现的,netpoll
这个文件是一个总的文件
比如 linux
平台是在 netpoll_epoll
中实现,mac
平台是在 netpoll_kqueue
中实现,windows
平台是在 netpoll_windows
中实现
newtpollinit
:初始化多路复用器netpollopen
:往多路复用器里插入需要监听的事件netpoll
:看下有什么事件发生
// Integrated network poller (platform-independent part).
// A particular implementation (epoll/kqueue/port/AIX/Windows)
// must define the following functions:
//
// func netpollinit()
// Initialize the poller. Only called once.
//
// func netpollopen(fd uintptr, pd *pollDesc) int32
// Arm edge-triggered notifications for fd. The pd argument is to pass
// back to netpollready when fd is ready. Return an errno value.
//
// func netpollclose(fd uintptr) int32
// Disable notifications for fd. Return an errno value.
//
// func netpoll(delta int64) gList
// Poll the network. If delta < 0, block indefinitely. If delta == 0,
// poll without blocking. If delta > 0, block for up to delta nanoseconds.
// Return a list of goroutines built by calling netpollready.
//
// func netpollBreak()
// Wake up the network poller, assumed to be blocked in netpoll.
//
// func netpollIsPollDescriptor(fd uintptr) bool
// Reports whether fd is a file descriptor used by the poller.
也就说 linux
中的 epoll
对应 go
中的 netpoll
epoll_create
->netpollinit
epoll_ctl
->netpollopen
epoll_wait
->netpoll
netpollinit
作用是创建多路复用器:
- 新建
epoll
- 新建一个
pipe
管道,用于中断epoll
- 将“管道有数据到达”注册在
epoll
中
func netpollinit() {
var errno uintptr
// 新建 epoll
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if errno != 0 {
println("runtime: epollcreate failed with", errno)
throw("runtime: netpollinit failed")
}
// 创建一个管道,linux 的管道,用来关闭 epoll
r, w, errpipe := nonblockingPipe()
if errpipe != 0 {
println("runtime: pipe failed with", -errpipe)
throw("runtime: pipe failed")
}
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
// 将管道的读事件加入 epoll
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
netpollopen
作用是插入事件
- 传入一个
socket
的fd
和pollDesc
指针 pollDesc
指针是socket
相关详细信息pollDesc
中记录了哪个协程休眠在等待此socket
- 将
socket
可读,可写,端开事件注册到epoll
中
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
// 设置事件
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
// 调用底层的 epoll_ctl
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}
pollDesc
是 go
网络层对 socket
的描述
fd
:socket id
rg
:等待读取的协程wg
:等待写入的协程
type pollDesc struct {
fd uintptr // constant for pollDesc usage lifetime
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}
netpoll
查询发生了什么事件
- 调用
epoll_wait
查询有哪些事件发生 - 根据
socket
相关的pollDesc
信息,返回哪些协程可以唤醒
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
var events [128]syscall.EpollEvent
retry:
// epfd 是 epoll 的 id
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
if errno != 0 {
if errno != _EINTR {
println("runtime: epollwait on fd", epfd, "failed with", errno)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
// 多少个事件就有多少 n
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.Events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
netpollWakeSig.Store(0)
}
continue
}
var mode int32
// EPOLLIN:可读事件
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
// EPOLLOUT:可写事件
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
pd := (*pollDesc)(tp.pointer())
tag := tp.tag()
if pd.fdseq.Load() == tag {
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
netpollready(&toRun, pd, mode)
}
}
}
// 返回需要唤醒的协程列表
return toRun
}
Network Poller 是如何工作的
Network Poller
底层有一个多路复用抽象层,用来屏蔽不同平台对多路复用器的实现
Network Poller 初始化
Network Poller
初始化是 poll_runtime_pollServerInit
,使用原子操作,保证一个 go
程序只会初始化一次 netpoll
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
// 用原子操作初始化,保证只初始化一次
if netpollInited.Load() == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited.Load() == 0 {
// 初始化多路复用器
netpollinit()
netpollInited.Store(1)
}
unlock(&netpollInitLock)
}
}
pollCache
和 pollDesc
pollCache
:是一个带锁的链表头pollDesc
:链表的成员pollDesc
是runtime
包对socket
的详细描述
pollCache
结构体如下
pollCache
实际上是一个链表头,它记录了 pollDesc
中第一个成员的指针,pollCache
的作用是为了放置一个锁,用来锁住 pollDesc
type pollCache struct {
lock mutex // 锁
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
pollDesc
是一个链表成员,link
属性指向了下一个 pollDesc
rg
:可能是pdReady
、pdWait
、等待读取这个socket
的地址wg
:可能是pdReady
、pdWait
、等待写这个socket
的地址- 写为什么也需要等待,是因为底层的网卡是有发送能力的,写数据时可能需要等待前面的数据发送完毕
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
fd uintptr // constant for pollDesc usage lifetime
// rg, wg are accessed atomically and hold g pointers.
// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}
Network Poller 新增监听 socket
Network Poller
新增监听 socket
是 poll_runtime_pollOpen
方法,在 pollCache
中分配一个新的 pollDesc
,初始化 pollDesc
(rg
、wg
为 0
),调用 netpollopen
将 fd
和 pd
插入 epoll
go:linkname
是一个go
语言的关键字,用来将一个函数的实现指向另一个函数poll_runtime_pollOpen
和internal/poll.runtime_pollOpen
是同一个函数
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// 分配一个新的 pollDesc
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
if pd.fdseq.Load() == 0 {
// The value 0 is special in setEventErr, so don't use it.
pd.fdseq.Store(1)
}
// 初始化 pollDesc
pd.closing = false
pd.setEventErr(false, 0)
pd.rseq++
pd.rg.Store(pdNil)
pd.rd = 0
pd.wseq++
pd.wg.Store(pdNil)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
// 将 fd 和 pd 插入 epoll
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
Network Poller 收发数据
Network Poller
是怎么做到一个协程对应一个 socket
收发数据分为两个场景:
- 协程需要收发数据时,
socket
已经可读可写runtime
循环调用netpoll
方法(g0
协程)netpoll
被startTheWorldWithSema
调用,startTheWorldWithSema
被gcStart
调用。为什么事件循环会被gcStart
调用呢?因为只是gcStart
是周期性不断的在调用
- 发现
socket
可读可写时,给对应的rg
或者wg
设置为pdReady(1)
- 协程调用
poll_runtime_pollWait
方法 - 判断
rg
或者wg
已经被置为pdReady(1)
,返回0
- 协程需要收发数据时,
socket
暂时无法读写runtime
循环调用netpoll
方法(g0
协程)- 协程调用
poll_runtime_pollWait
方法 - 发现对应的
rg
或者wg
为0
- 给对应的
rg
或者wg
置为协程地址 - 休眠等待
发现 socket
可读可写时,查看对应的 rg
和 wg
,如果rg
或者 wg
是协程地址的话(不是 0
、1
、2
),说明有协程在休眠监听,将协程地址返回给 runtime
,然后调度器开始调度对应的协程
go 是如何抽象 socket
go
中 socket
是 net
提供的
net
包抽象了TCP
网络操作- 使用
net.Listen()
得到TCPListener
(listen
状态的socket
) - 使用
listen.Accept()
得到TCPConn
(established
状态的socket
) TCPConn.Read/Write
进行读写socket
操作Network poll
作为上述功能的底层支撑
我们在前面知道了 server
监听新连接,会新建一个 socket
,新建新连接是由 net.listen
方法完成的
net.listen
方法返回的是一个 net.Listener
接口,net.Listener
接口中有一个 Accept
方法,用来接收新的连接
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
Listen
方法中调用了 ListenConfig
的 Listen
方法,ListenConfig
是一个结构体,它的 Listen
方法中根据不同的网络类型,调用不同的 listenTCP
、listenUnix
方法
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// 解析地址
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
if sl.MultipathTCP() {
l, err = sl.listenMPTCP(ctx, la)
} else {
// 调用 listenTCP 方法
l, err = sl.listenTCP(ctx, la)
}
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
listenTCP
方法内部会调用 socket
方法
net
包中对 socket
详细的描述是 netFD
netFD
中有一个 pfd
属性,pfd
是 poll.FD
类型,poll.FD
就是上面说的 pollDesc
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
net.Listen
:
- 新建
socket
,并执行bind
操作 - 新建一个
FD
(net
包对socket
详细描述) - 返回一个
TCPListener
对象 - 将
TCPListener
的FD
信息加入监听 TCPListener
对象本质上是一个LISTEN
状态的socket
ls.Accept
方法是用来接收新连接的,返回的时候 Conn
接口`
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
- 直接调用
socket
的accept
方法 - 如果失败,休眠等待新的连接
- 将新的
socket
包装为TCPConn
变量返回 - 将
TCPConn
的FD
信息加入监听 TCPConn
本质上是一个ESTABLISHED
状态的socket
conn.Read
方法是用来读取数据
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
conn.Write
方法是用来写入数据
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
TCPConn.Read
和 TCPConn.Write
方法内部调用了 netFD.Read
和 netFD.Write
方法
netFD.Read
和 netFD.Write
方法内部调用了 poll.FD.Read
和 poll.FD.Write
方法
poll.FD.Read
和 poll.FD.Write
方法内部调用了 netpoll
方法
- 调用
socket
原生读写方法 - 如果调用失败,休眠等待可读/可写
- 被唤醒后调用系统
socket
转载自:https://juejin.cn/post/7392537225629941772