likes
comments
collection
share

golang select 源码解析

作者站长头像
站长
· 阅读数 36

golang select 源码解析

背景

golang 中主推 channel 通信。单个 channel 的通信可以通过一个goroutinechannel 发数据,另外一个从channel取数据进行。这是阻塞的,因为要想顺利执行完这个步骤,需要 channel 准备好才行,准备好的条件如下:

  1. 发送
    • 缓存有空间(如果是有缓存的 channel)
    • 有等待接收的 goroutine
  2. 接收
    • 缓存有数据(如果是有缓存的 channel)
    • 有等待发送的 goroutine

channel实际使用中还有如下两个需求,这个时候就需要select了。

  1. 同时监听多个channel
  2. 在没有channel准备好的时候,也可以往下执行。

select 流程

  1. select。作用是阻塞当前goroutine。不要用for{}来阻塞goroutine,因为会占用cpu。而select{}不会,因为当前goroutine不会再被调度。

     if len(cases) == 0 {
             block()
     }
    
  2. 配置好poll的顺序。由于是同时监听多个channel的发送或者接收,所以需要按照一定的顺序查看哪个channel准备好了。如果每次采用select中的顺序查看channel是否准备好了,那么只要在前面的channel准备好的足够快,那么会造成后面的channel即使准备好了,也永远不会被执行。打乱顺序的逻辑如下,采用了洗牌算法\color{red}{洗牌算法}注意此过程中会过滤掉channel为nil的case。\color{red}{注意此过程中会过滤掉 channel 为 nil 的 case。}channelnilcase

     // generate permuted order
     norder := 0
     for i := range scases {
             cas := &scases[i]
    
             // Omit cases without channels from the poll and lock orders.
             if cas.c == nil {
                     cas.elem = nil // allow GC
                     continue
             }
    
             j := fastrandn(uint32(norder + 1))
             pollorder[norder] = pollorder[j]
             pollorder[j] = uint16(i)
             norder++
     }
    
  3. 配置好lock的顺序。由于可能会修改channel中的数据,所以在打算往channel中发送数据或者从channel接收数据的时候,需要锁住 channel。而一个channel可能被多个select监听,如果两个select对两个channel A和B,分别按照顺序A, B和B,A上锁,是可能会造成死锁的,导致两个select都执行不下去。

    golang select 源码解析

    所以select中锁住channel的顺序至关重要,解决方案是按照channel的地址的顺序锁住channel。因为在两个selectchannel有交集的时候,都是按照交集中channel的地址顺序锁channel

    实际排序代码如下,采用堆排序算法\color{red}{堆排序算法}按照channel的地址从小到大对channel进行排序。

     // sort the cases by Hchan address to get the locking order.
     // simple heap sort, to guarantee n log n time and constant stack footprint.
     for i := range lockorder {
             j := i
             // Start with the pollorder to permute cases on the same channel.
             c := scases[pollorder[i]].c
             for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
                     k := (j - 1) / 2
                     lockorder[j] = lockorder[k]
                     j = k
             }
             lockorder[j] = pollorder[i]
     }
     for i := len(lockorder) - 1; i >= 0; i-- {
             o := lockorder[i]
             c := scases[o].c
             lockorder[i] = lockorder[0]
             j := 0
             for {
                     k := j*2 + 1
                     if k >= i {
                             break
                     }
                     if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
                             k++
                     }
                     if c.sortkey() < scases[lockorder[k]].c.sortkey() {
                             lockorder[j] = lockorder[k]
                             j = k
                             continue
                     }
                     break
             }
             lockorder[j] = o
     }
    
  4. 锁住select中的所有channel。要查看channel中的数据了。

     // lock all the channels involved in the select
     sellock(scases, lockorder)
    
  5. 第一轮查看是否已有准备好的channel。如果有直接发送数据到channel或者从channel接收数据。注意selectchannel切片中,前面部分是从channel接收数据的case,后半部分是往channel发送数据的case。

    golang select 源码解析 按照pollorder顺序查看是否有channel准备好了。

     for _, casei := range pollorder {
             casi = int(casei)
             cas = &scases[casi]
             c = cas.c
             if casi >= nsends {
                     sg = c.sendq.dequeue()
                     if sg != nil {
                             goto recv
                     }
                     if c.qcount > 0 {
                             goto bufrecv
                     }
                     if c.closed != 0 {
                             goto rclose
                     }
             } else {
                     if raceenabled {
                             racereadpc(c.raceaddr(), casePC(casi), chansendpc)
                     }
                     if c.closed != 0 {
                             goto sclose
                     }
                     sg = c.recvq.dequeue()
                     if sg != nil {
                             goto send
                     }
                     if c.qcount < c.dataqsiz {
                             goto bufsend
                     }
             }
     }
    
  6. 直接执行default分支

     if !block {
             selunlock(scases, lockorder)
             casi = -1
             goto retc
     }
    
  7. 第二轮遍历channel。创建sudog把当前goroutine放到每个channel的等待列表中去,等待channel准备好时被唤醒。

     // pass 2 - enqueue on all chans
     gp = getg()
     if gp.waiting != nil {
             throw("gp.waiting != nil")
     }
     nextp = &gp.waiting
     for _, casei := range lockorder {
             casi = int(casei)
             cas = &scases[casi]
             c = cas.c
             sg := acquireSudog()
             sg.g = gp
             sg.isSelect = true
             // No stack splits between assigning elem and enqueuing
             // sg on gp.waiting where copystack can find it.
             sg.elem = cas.elem
             sg.releasetime = 0
             if t0 != 0 {
                     sg.releasetime = -1
             }
             sg.c = c
             // Construct waiting list in lock order.
             *nextp = sg
             nextp = &sg.waitlink
    
             if casi < nsends {
                     c.sendq.enqueue(sg)
             } else {
                     c.recvq.enqueue(sg)
             }
     }
    
  8. 等待被唤醒。其中gopark的时候会释放对所有channel占用的锁。

     // wait for someone to wake us up
     gp.param = nil
     // Signal to anyone trying to shrink our stack that we're about
     // to park on a channel. The window between when this G's status
     // changes and when we set gp.activeStackChans is not safe for
     // stack shrinking.
     atomic.Store8(&gp.parkingOnChan, 1)
     gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
     gp.activeStackChans = false
    
  9. 被唤醒

    1. 锁住所有channel
    2. 清理当前goroutine的等待sudog
    3. 找到是被哪个channel唤醒的,并清理每个channel上当前的goroutine对应的sudog
     sellock(scases, lockorder)
    
     gp.selectDone = 0
     sg = (*sudog)(gp.param)
     gp.param = nil
    
     // pass 3 - dequeue from unsuccessful chans
     // otherwise they stack up on quiet channels
     // record the successful case, if any.
     // We singly-linked up the SudoGs in lock order.
     casi = -1
     cas = nil
     caseSuccess = false
     sglist = gp.waiting
     // Clear all elem before unlinking from gp.waiting.
     for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
             sg1.isSelect = false
             sg1.elem = nil
             sg1.c = nil
     }
     gp.waiting = nil
    
     for _, casei := range lockorder {
             k = &scases[casei]
             if sg == sglist {
                     // sg has already been dequeued by the G that woke us up.
                     casi = int(casei)
                     cas = k
                     caseSuccess = sglist.success
                     if sglist.releasetime > 0 {
                             caseReleaseTime = sglist.releasetime
                     }
             } else {
                     c = k.c
                     if int(casei) < nsends {
                             c.sendq.dequeueSudoG(sglist)
                     } else {
                             c.recvq.dequeueSudoG(sglist)
                     }
             }
             sgnext = sglist.waitlink
             sglist.waitlink = nil
             releaseSudog(sglist)
             sglist = sgnext
     }
    
转载自:https://juejin.cn/post/7182844543551668284
评论
请登录