likes
comments
collection
share

go 高并发 TCP 网络编程

作者站长头像
站长
· 阅读数 54

什么是非阻塞 I/O

下图是四层网络分层,其中数据链路层和网络层都是不可靠的,到了传输层就是可靠的了,机器和机器才能进行可靠的传输,RESP 协议是属于应用层的

go 高并发 TCP 网络编程

TCP 通信过程也就是通过三次握手建立连接,建立连接之后就可以进行数据传输,之后就可以通过四次挥手来通信

go 高并发 TCP 网络编程

操作系统给我们提供了 socket,作为开发者不需要关心底层的网络通信,只需要关心应用层的通信

linux 中有一个 Internet domain socket,它的底层是 SOCK_STREAM

linux 中每个 socket 都有一个 id,这个 id 叫做“文件描述符”,用 FD 作为标识

我们处理 TCP 实际上就是在处理 socketsocket 内部的过程如下图所示:

go 高并发 TCP 网络编程

server 会新建一个 socket,用来监听心的连接,这是 server socket 处理 listen 状态

这时如果 client1 建立了一个 socket,它会连接 listen 状态的 server socket,然后进行三次握手,握手结束后 server 会再次新建一个 socket 用于和 client1 socket 进行通信,这时这两个 socket 状态是 established,也就是说此时 server 会有两个 socket,一个用于监听新的连接,一个用于和 client 通信

如果又来了一个 client2server 会再次新建一个 socket 用于和 client2 进行通信

go 高并发 TCP 网络编程

这时 server 会有 3socket,当 client 越来越多时,server 会有越来越多的 socket

这时就诞生了 I/O 模型,I/O 指的是同时操作 socket 的方案,那 I/O 用什么方案同时操作多个 socket 呢?

三个方案:

  • 阻塞
  • 非阻塞
  • 多路复用

阻塞 I/O

阻塞方案是建立三个线程,这三个线程的业务都是类似的,调用 socket 是阻塞式,如果没有新的数据会卡住,也就是说线程会卡住,直到外面有新的数据过来,才会返回新的数据,业务会根据新的数据进行处理,然后把药返回的数据在写回 socket

go 高并发 TCP 网络编程

阻塞 I/O 的特点是:

  • 同步读取 socket 时,线程陷入内核态
  • 当读写成功后,切换会用户态,继续执行
  • 优点:开发难度小,代码简单
  • 缺点:内核态切换开销大

非阻塞 I/O

业务要处理三个 socket,目的是要不断是去这三个 socket 中有没有新数据过来,有新数据就处理新数据,处理后再把数据写回去

非阻塞的意思是,不会卡住,而是会去询问,你有没有新数据,没有新数据就直接返回,在去询问下一个 socket,一直循环下去,不会卡在任何一个 socket

go 高并发 TCP 网络编程

那么理论上来说所有的 socket 可以通过一个线程来处理

非阻塞 I/O 的特点是:

  • 如何暂时无法收发数据,会返回错误
  • 应用会不断的轮询,直到某个 socket 可以读写
  • 优点:不会陷入内核态,自由度高
  • 缺点:需要自旋轮询

多路复用

linux 中多路复用叫做 epoll,全称 event poll,是事件池的意思

event poll 中的事件是 各个 scoket 可读/可写事件,就可以让操作系统来监控各个 socket 是否可以操作

go 高并发 TCP 网络编程

还是和之前一样,一个 socket 负责监听,两个 socket 负责和 client 通信,然后把这三个 socket 可读事件注册到 linux epoll

然后非阻塞的去调用 linux epoll,去询问这三个事件发生了哪些,如果某个 socket 事件发生了,epoll 会返回一个发生事件的列表,然后业务直接调用对应 socket 背后的业务处理

多路复用 epoll 的特点:

  • 注册多个 socket 事件
  • 调用 epoll,当有时间发生时,返回发生事件的列表
  • 优点:提供了事件列表,不需要查询各个 socket
  • 缺点:开发难度大,逻辑复杂
  • 不同操作系统的多路复用实现不同:
    • linuxepoll
    • mackqueue
    • windowsIOCP

go 是如何抽象 epoll

  • 在底层使用使用操作系统的多路复用 I/O
  • 在协程层次使用阻塞模型
  • 阻塞协程时,休眠协程

go 将各个系统对 epoll 的操作做了一层封装,抹平各平台的差异

多路复用器在各个系统都有以下功能:

  • 新建多路复用器 epoll_create()
  • 往多路复用器里插入需要监听的事件 epoll_ctl()
  • 查询发生了什么事件 epoll_wait()

go sdk 中搜索 epoll_create,会出现很多结果,我们选择 /cmd/vendor/golang.org/x/sys/unix/zsysnum_linux_amd64.go

go sdk 中有一个 netpoll 的文件

它一进来就写了很多注释,第一行注释写了这个工具叫做 network poller,下面有很多被注释掉的方法,这些方法是在不同平台的文件中单独去实现的,netpoll 这个文件是一个总的文件

比如 linux 平台是在 netpoll_epoll 中实现,mac 平台是在 netpoll_kqueue 中实现,windows 平台是在 netpoll_windows 中实现

  • newtpollinit:初始化多路复用器
  • netpollopen:往多路复用器里插入需要监听的事件
  • netpoll:看下有什么事件发生
// Integrated network poller (platform-independent part).
// A particular implementation (epoll/kqueue/port/AIX/Windows)
// must define the following functions:
//
// func netpollinit()
//     Initialize the poller. Only called once.
//
// func netpollopen(fd uintptr, pd *pollDesc) int32
//     Arm edge-triggered notifications for fd. The pd argument is to pass
//     back to netpollready when fd is ready. Return an errno value.
//
// func netpollclose(fd uintptr) int32
//     Disable notifications for fd. Return an errno value.
//
// func netpoll(delta int64) gList
//     Poll the network. If delta < 0, block indefinitely. If delta == 0,
//     poll without blocking. If delta > 0, block for up to delta nanoseconds.
//     Return a list of goroutines built by calling netpollready.
//
// func netpollBreak()
//     Wake up the network poller, assumed to be blocked in netpoll.
//
// func netpollIsPollDescriptor(fd uintptr) bool
//     Reports whether fd is a file descriptor used by the poller.

也就说 linux 中的 epoll 对应 go 中的 netpoll

  • epoll_create -> netpollinit
  • epoll_ctl -> netpollopen
  • epoll_wait -> netpoll

netpollinit 作用是创建多路复用器:

  • 新建 epoll
  • 新建一个 pipe 管道,用于中断 epoll
  • 将“管道有数据到达”注册在 epoll
func netpollinit() {
  var errno uintptr
  // 新建 epoll
  epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
  if errno != 0 {
    println("runtime: epollcreate failed with", errno)
    throw("runtime: netpollinit failed")
  }
  // 创建一个管道,linux 的管道,用来关闭 epoll
  r, w, errpipe := nonblockingPipe()
  if errpipe != 0 {
    println("runtime: pipe failed with", -errpipe)
    throw("runtime: pipe failed")
  }
  ev := syscall.EpollEvent{
    Events: syscall.EPOLLIN,
  }
  *(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
  // 将管道的读事件加入 epoll
  errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
  if errno != 0 {
    println("runtime: epollctl failed with", errno)
    throw("runtime: epollctl failed")
  }
  netpollBreakRd = uintptr(r)
  netpollBreakWr = uintptr(w)
}

netpollopen 作用是插入事件

  • 传入一个 socketfdpollDesc 指针
  • pollDesc 指针是 socket 相关详细信息
  • pollDesc 中记录了哪个协程休眠在等待此 socket
  • socket 可读,可写,端开事件注册到 epoll
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
  var ev syscall.EpollEvent
  // 设置事件
  ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
  tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
  *(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
  // 调用底层的 epoll_ctl
  return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

pollDescgo 网络层对 socket 的描述

  • fdsocket id
  • rg:等待读取的协程
  • wg:等待写入的协程
type pollDesc struct {
  fd    uintptr        // constant for pollDesc usage lifetime
  rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
  wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}

netpoll 查询发生了什么事件

  • 调用 epoll_wait 查询有哪些事件发生
  • 根据 socket 相关的 pollDesc 信息,返回哪些协程可以唤醒
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
  var events [128]syscall.EpollEvent
retry:
  // epfd 是 epoll 的 id
  n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
  if errno != 0 {
    if errno != _EINTR {
      println("runtime: epollwait on fd", epfd, "failed with", errno)
      throw("runtime: netpoll failed")
    }
    // If a timed sleep was interrupted, just return to
    // recalculate how long we should sleep now.
    if waitms > 0 {
      return gList{}
    }
    goto retry
  }
  var toRun gList
  // 多少个事件就有多少 n
  for i := int32(0); i < n; i++ {
    ev := events[i]
    if ev.Events == 0 {
      continue
    }

    if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
      if ev.Events != syscall.EPOLLIN {
        println("runtime: netpoll: break fd ready for", ev.Events)
        throw("runtime: netpoll: break fd ready for something unexpected")
      }
      if delay != 0 {
        // netpollBreak could be picked up by a
        // nonblocking poll. Only read the byte
        // if blocking.
        var tmp [16]byte
        read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
        netpollWakeSig.Store(0)
      }
      continue
    }

    var mode int32
    // EPOLLIN:可读事件
    if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
      mode += 'r'
    }
    // EPOLLOUT:可写事件
    if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
      mode += 'w'
    }
    if mode != 0 {
      tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
      pd := (*pollDesc)(tp.pointer())
      tag := tp.tag()
      if pd.fdseq.Load() == tag {
        pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
        netpollready(&toRun, pd, mode)
      }
    }
  }
  // 返回需要唤醒的协程列表
  return toRun
}

Network Poller 是如何工作的

Network Poller 底层有一个多路复用抽象层,用来屏蔽不同平台对多路复用器的实现

Network Poller 初始化

Network Poller 初始化是 poll_runtime_pollServerInit,使用原子操作,保证一个 go 程序只会初始化一次 netpoll

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
  netpollGenericInit()
}

func netpollGenericInit() {
  // 用原子操作初始化,保证只初始化一次
  if netpollInited.Load() == 0 {
    lockInit(&netpollInitLock, lockRankNetpollInit)
    lock(&netpollInitLock)
    if netpollInited.Load() == 0 {
      // 初始化多路复用器
      netpollinit()
      netpollInited.Store(1)
    }
    unlock(&netpollInitLock)
  }
}

pollCachepollDesc

  • pollCache:是一个带锁的链表头
  • pollDesc:链表的成员
  • pollDescruntime 包对 socket 的详细描述

pollCache 结构体如下

pollCache 实际上是一个链表头,它记录了 pollDesc 中第一个成员的指针,pollCache 的作用是为了放置一个锁,用来锁住 pollDesc

type pollCache struct {
  lock  mutex      // 锁
  first *pollDesc
  // PollDesc objects must be type-stable,
  // because we can get ready notification from epoll/kqueue
  // after the descriptor is closed/reused.
  // Stale notifications are detected using seq variable,
  // seq is incremented when deadlines are changed or descriptor is reused.
}

pollDesc 是一个链表成员,link 属性指向了下一个 pollDesc

  • rg:可能是 pdReadypdWait、等待读取这个 socket 的地址
  • wg:可能是 pdReadypdWait、等待写这个 socket 的地址
    • 写为什么也需要等待,是因为底层的网卡是有发送能力的,写数据时可能需要等待前面的数据发送完毕
type pollDesc struct {
  link  *pollDesc      // in pollcache, protected by pollcache.lock
  fd    uintptr        // constant for pollDesc usage lifetime
  // rg, wg are accessed atomically and hold g pointers.
  // (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
  rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
  wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}

Network Poller 新增监听 socket

Network Poller 新增监听 socketpoll_runtime_pollOpen 方法,在 pollCache 中分配一个新的 pollDesc,初始化 pollDescrgwg0),调用 netpollopenfdpd 插入 epoll

go:linkname 是一个 go 语言的关键字,用来将一个函数的实现指向另一个函数 poll_runtime_pollOpeninternal/poll.runtime_pollOpen 是同一个函数

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
  // 分配一个新的 pollDesc
  pd := pollcache.alloc()
  lock(&pd.lock)
  wg := pd.wg.Load()
  if wg != pdNil && wg != pdReady {
    throw("runtime: blocked write on free polldesc")
  }
  rg := pd.rg.Load()
  if rg != pdNil && rg != pdReady {
    throw("runtime: blocked read on free polldesc")
  }
  pd.fd = fd
  if pd.fdseq.Load() == 0 {
    // The value 0 is special in setEventErr, so don't use it.
    pd.fdseq.Store(1)
  }
  // 初始化 pollDesc
  pd.closing = false
  pd.setEventErr(false, 0)
  pd.rseq++
  pd.rg.Store(pdNil)
  pd.rd = 0
  pd.wseq++
  pd.wg.Store(pdNil)
  pd.wd = 0
  pd.self = pd
  pd.publishInfo()
  unlock(&pd.lock)

  // 将 fd 和 pd 插入 epoll
  errno := netpollopen(fd, pd)
  if errno != 0 {
    pollcache.free(pd)
    return nil, int(errno)
  }
  return pd, 0
}

Network Poller 收发数据

Network Poller 是怎么做到一个协程对应一个 socket

收发数据分为两个场景:

  • 协程需要收发数据时,socket 已经可读可写
    • runtime 循环调用 netpoll 方法(g0 协程)
      • netpollstartTheWorldWithSema 调用,startTheWorldWithSemagcStart 调用。为什么事件循环会被 gcStart 调用呢?因为只是 gcStart 是周期性不断的在调用
    • 发现 socket 可读可写时,给对应的 rg 或者 wg 设置为 pdReady(1)
    • 协程调用 poll_runtime_pollWait 方法
    • 判断 rg 或者 wg 已经被置为 pdReady(1),返回 0
  • 协程需要收发数据时,socket 暂时无法读写
    • runtime 循环调用 netpoll 方法(g0 协程)
    • 协程调用 poll_runtime_pollWait 方法
    • 发现对应的 rg 或者 wg0
    • 给对应的 rg 或者 wg 置为协程地址
    • 休眠等待

发现 socket 可读可写时,查看对应的 rgwg,如果rg 或者 wg 是协程地址的话(不是 012),说明有协程在休眠监听,将协程地址返回给 runtime,然后调度器开始调度对应的协程

go 是如何抽象 socket

gosocketnet 提供的

  • net 包抽象了 TCP 网络操作
  • 使用 net.Listen() 得到 TCPListener(listen 状态的 socket)
  • 使用 listen.Accept() 得到 TCPConn(established 状态的 socket)
  • TCPConn.Read/Write 进行读写 socket 操作
  • Network poll 作为上述功能的底层支撑

我们在前面知道了 server 监听新连接,会新建一个 socket,新建新连接是由 net.listen 方法完成的

net.listen 方法返回的是一个 net.Listener 接口,net.Listener 接口中有一个 Accept 方法,用来接收新的连接

func Listen(network, address string) (Listener, error) {
  var lc ListenConfig
  return lc.Listen(context.Background(), network, address)
}

Listen 方法中调用了 ListenConfigListen 方法,ListenConfig 是一个结构体,它的 Listen 方法中根据不同的网络类型,调用不同的 listenTCPlistenUnix 方法

// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
  // 解析地址
  addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
  if err != nil {
    return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
  }
  sl := &sysListener{
    ListenConfig: *lc,
    network:      network,
    address:      address,
  }
  var l Listener
  la := addrs.first(isIPv4)
  switch la := la.(type) {
  case *TCPAddr:
    if sl.MultipathTCP() {
      l, err = sl.listenMPTCP(ctx, la)
    } else {
      // 调用 listenTCP 方法
      l, err = sl.listenTCP(ctx, la)
    }
  case *UnixAddr:
    l, err = sl.listenUnix(ctx, la)
  default:
    return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
  }
  if err != nil {
    return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
  }
  return l, nil
}

listenTCP 方法内部会调用 socket 方法

net 包中对 socket 详细的描述是 netFD

netFD 中有一个 pfd 属性,pfdpoll.FD 类型,poll.FD 就是上面说的 pollDesc

// Network file descriptor.
type netFD struct {
  pfd poll.FD

  // immutable until Close
  family      int
  sotype      int
  isConnected bool // handshake completed or use of association with peer
  net         string
  laddr       Addr
  raddr       Addr
}

net.Listen

  • 新建 socket,并执行 bind 操作
  • 新建一个 FDnet 包对 socket 详细描述)
  • 返回一个 TCPListener 对象
  • TCPListenerFD 信息加入监听
  • TCPListener 对象本质上是一个 LISTEN 状态的 socket

ls.Accept 方法是用来接收新连接的,返回的时候 Conn 接口`

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
  if !l.ok() {
    return nil, syscall.EINVAL
  }
  c, err := l.accept()
  if err != nil {
    return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
  }
  return c, nil
}
  • 直接调用 socketaccept 方法
  • 如果失败,休眠等待新的连接
  • 将新的 socket 包装为 TCPConn 变量返回
  • TCPConnFD 信息加入监听
  • TCPConn 本质上是一个 ESTABLISHED 状态的 socket

conn.Read 方法是用来读取数据

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
  if !c.ok() {
    return 0, syscall.EINVAL
  }
  n, err := c.fd.Read(b)
  if err != nil && err != io.EOF {
    err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
  }
  return n, err
}

conn.Write 方法是用来写入数据

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
  if !c.ok() {
    return 0, syscall.EINVAL
  }
  n, err := c.fd.Write(b)
  if err != nil {
    err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
  }
  return n, err
}

TCPConn.ReadTCPConn.Write 方法内部调用了 netFD.ReadnetFD.Write 方法

netFD.ReadnetFD.Write 方法内部调用了 poll.FD.Readpoll.FD.Write 方法

poll.FD.Readpoll.FD.Write 方法内部调用了 netpoll 方法

  • 调用 socket 原生读写方法
  • 如果调用失败,休眠等待可读/可写
  • 被唤醒后调用系统 socket
转载自:https://juejin.cn/post/7392537225629941772
评论
请登录