likes
comments
collection
share

深入理解Golang之channel

作者站长头像
站长
· 阅读数 57

前言

Golang在并发编程上有两大利器,分别是channelgoroutine,这篇文章我们先聊聊channel。熟悉Golang的人都知道一句名言:“使用通信来共享内存,而不是通过共享内存来通信”。这句话有两层意思,Go语言确实在sync包中提供了传统的锁机制,但更推荐使用channel来解决并发问题。这篇文章会先从channel的用法、channel的原理两部分对channel做一个较为深入的探究。

channel用法

什么是channel

从字面上看,channel的意思大概就是管道的意思。channel是一种go协程用以接收或发送消息的安全的消息队列,channel就像两个go协程之间的导管,来实现各种资源的同步。可以用下图示意:

深入理解Golang之channel

channel的用法很简单:

func main() {    ch := make(chan int, 1// 创建一个类型为int,缓冲区大小为1的channel    ch <- 2 // 将2发送到ch    n, ok := <- ch // n接收从ch发出的值    if ok {        fmt.Println(n) // 2    }    close(ch) // 关闭channel}

使用channel时有几个注意点:

  • 向一个nil channel发送消息,会一直阻塞;
  • 向一个已经关闭的channel发送消息,会引发运行时恐慌(panic)
  • channel关闭后不可以继续向channel发送消息,但可以继续从channel接收消息;
  • channel关闭并且缓冲区为空时,继续从从channel接收消息会得到一个对应类型的零值。

Unbuffered channels与Buffered channels

Unbuffered channels是指缓冲区大小为0的channel,这种channel的接收者会阻塞直至接收到消息,发送者会阻塞直至接收者接收到消息,这种机制可以用于两个goroutine进行状态同步;Buffered channels拥有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞。

引用The Nature Of Channels In Go中的两张图来说明Unbuffered channelsBuffered channels, 非常形象,读者可自行体会一下:

Unbuffered channels

深入理解Golang之channelUnbuffered channels

Buffered channels

深入理解Golang之channelBuffered channels

channel的遍历

for range

channel支持 for range 的方式进行遍历:

package main  import "fmt"  func main() {      ci := make(chan int5)      for i := 1; i <= 5; i++ {        ci <- i    }        close(ci)      for i := range ci {          fmt.Println(i)      }  }  

值得注意的是,在遍历时,如果channel 没有关闭,那么会一直等待下去,出现 deadlock 的错误;如果在遍历时channel已经关闭,那么在遍历完数据后自动退出遍历。也就是说,for range 的遍历方式时阻塞型的遍历方式。

for select

select可以处理非阻塞式消息发送、接收及多路选择。

package main  import "fmt"  func main() {      ci := make(chan int2)    for i := 1; i <= 2; i++ {        ci <- i    }    close(ci)    cs := make(chan string2)    cs <- "hi"    cs <- "golang"    close(cs)    ciClosed, csClosed := falsefalse    for {        if ciClosed && csClosed {            return        }        select {        case i, ok := <-ci:            if ok {                fmt.Println(i)            } else {                ciClosed = true                fmt.Println("ci closed")            }        case s, ok := <-cs:            if ok {                fmt.Println(s)            } else {                csClosed = true                fmt.Println("cs closed")            }        default:            fmt.Println("waiting...")        }    }}  

select中有case代码块,用于channel发送或接收消息,任意一个case代码块准备好时,执行其对应内容;多个case代码块准备好时,随机选择一个case代码块并执行;所有case代码块都没有准备好,则等待;还可以有一个default代码块,所有case代码块都没有准备好时执行default代码块。

channel原理

先贴一下channel源码地址,读者可以对照来看。

数据结构

先看channel的结构体:

type hchan struct {    qcount   uint           // total data in the queue    dataqsiz uint           // size of the circular queue    buf      unsafe.Pointer // points to an array of dataqsiz elements    // channel中元素大小    elemsize uint16     // 是否已关闭    closed   uint32    // channel中元素类型    elemtype *_type // element type    sendx    uint   // send index    recvx    uint   // receive index    recvq    waitq  // list of recv waiters    sendq    waitq  // list of send waiters    // lock protects all fields in hchan, as well as several    // fields in sudogs blocked on this channel.    //    // Do not change another G's status while holding this lock    // (in particular, do not ready a G), as this can deadlock    // with stack shrinking.    lock mutex}

channel的缓冲区其实是一个环形队列,qcount表示队列中元素的数量,dataqsiz表示环形队列的总大小,buf表示一个指向循环数组的指针;sendxrecvx分别用来标识当前发送和接收的元素在循环队列中的位置;recvqsendq都是一个列表,分别用于存储当前处于等待接收和等待发送的Goroutine

再看一下waitq的数据结构:

type waitq struct {    first *sudog    last  *sudog}type sudog struct {    // 当前goroutine    g *g    // isSelect indicates g is participating in a select, so    // g.selectDone must be CAS'd to win the wake-up race.    isSelect bool    next     *sudog    prev     *sudog    elem     unsafe.Pointer // data element (may point to stack)    // The following fields are never accessed concurrently.    // For channels, waitlink is only accessed by g.    // For semaphores, all fields (including the ones above)    // are only accessed when holding a semaRoot lock.    acquiretime int64    releasetime int64    ticket      uint32    parent      *sudog // semaRoot binary tree    waitlink    *sudog // g.waiting list or semaRoot    waittail    *sudog // semaRoot    c           *hchan // channel}

其中sudog表示处于等待列表中的Goroutine封装,包含了一些上下文信息,firstlast分别指向等待列表的首位的Goroutine

编译分析

在分析channel的原理之前,我们先使用go tool分析以下代码,看看channel的各种操作在底层调用了什么运行时方法:

ch := make(chan int2)ch <- 2ch <- 1<-chn, ok := <-chif ok {    fmt.Println(n)}close(ch)

编译

go build test.gogo tool objdump -s "main\.main" test | grep CALL

CALL过滤出来:

  test.go:118           0x1092f55               e81612f7ff              CALL runtime.makechan(SB)  test.go:119           0x1092f74               e82714f7ff              CALL runtime.chansend1(SB)  test.go:120           0x1092f8e               e80d14f7ff              CALL runtime.chansend1(SB)  test.go:121           0x1092fa5               e8361ff7ff              CALL runtime.chanrecv1(SB)  test.go:122           0x1092fbd               e85e1ff7ff              CALL runtime.chanrecv2(SB)  test.go:126           0x1092fd7               e8841cf7ff              CALL runtime.closechan(SB)  test.go:124           0x1092fea               e8b156f7ff              CALL runtime.convT64(SB)  print.go:275          0x1093041               e88a98ffff              CALL fmt.Fprintln(SB)  test.go:47            0x1093055               e896c1fbff              CALL runtime.morestack_noctxt(SB)

创建

从上面的编译分析可以看出在创建channel时调用了运行时方法makechan:

func makechan(t *chantype, size int) *hchan {    elem := t.elem    // compiler checks this but be safe.    if elem.size >= 1<<16 {        throw("makechan: invalid channel element type")    }    if hchanSize%maxAlign != 0 || elem.align > maxAlign {        throw("makechan: bad alignment")    }    // 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围    mem, overflow := math.MulUintptr(elem.size, uintptr(size))    if overflow || mem > maxAlloc-hchanSize || size < 0 {        panic(plainError("makechan: size out of range"))    }    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.    // buf points into the same allocation, elemtype is persistent.    // SudoG's are referenced from their owning thread so they can't be collected.    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.    var c *hchan    switch {    case mem == 0:        // 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可        // Queue or element size is zero.        c = (*hchan)(mallocgc(hchanSize, nil, true))        // Race detector uses this location for synchronization.        c.buf = c.raceaddr()    case elem.kind&kindNoPointers != 0:        // 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间 + hchan必需的空间。        // Elements do not contain pointers.        // Allocate hchan and buf in one call.        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))        c.buf = add(unsafe.Pointer(c), hchanSize)    default:        // 元素中包含指针,为hchan和缓冲区分别分配空间        // Elements contain pointers.        c = new(hchan)        c.buf = mallocgc(mem, elem, true)    }    c.elemsize = uint16(elem.size)    c.elemtype = elem    c.dataqsiz = uint(size)    if debugChan {        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")    }    return c}

makechan的代码逻辑还是比较简单的,首先校验元素类型和缓冲区空间大小,然后创建hchan,分配所需空间。这里有三种情况:当缓冲区大小为0,或者channel中元素大小为0时,只需分配channel必需的空间即可;当channel元素类型不是指针时,则只需要为hchan和缓冲区分配一片连续内存空间,空间大小为缓冲区数组空间加上hchan必需的空间;默认情况,缓冲区包含指针,则需要为hchan和缓冲区分别分配内存。最后更新hchan的其他字段,包括elemsizeelemtypedataqsiz

发送

channel的发送操作调用了运行时方法chansend1, 在chansend1内部又调用了chansend,直接来看chansend的实现:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    // channel为nil    if c == nil {        // 如果是非阻塞,直接返回发送不成功        if !block {            return false        }        // 否则,当前Goroutine阻塞挂起        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)        throw("unreachable")    }    if debugChan {        print("chansend: chan=", c, "\n")    }    if raceenabled {        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))    }    // Fast path: check for failed non-blocking operation without acquiring the lock.    // 对于非阻塞且channel未关闭,如果无缓冲区且没有等待接收的Goroutine,或者有缓冲区且缓冲区已满,那么都直接返回发送不成功    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {        return false    }    var t0 int64    if blockprofilerate > 0 {        t0 = cputicks()    }    // 加锁    lock(&c.lock)    // 如果channel已关闭    if c.closed != 0 {        // 解锁,直接panic        unlock(&c.lock)        panic(plainError("send on closed channel"))    }    // 除了以上情况,当channel未关闭时,就有以下几种情况:    // 1、当存在等待接收的Goroutine    if sg := c.recvq.dequeue(); sg != nil {        // Found a waiting receiver. We pass the value we want to send        // directly to the receiver, bypassing the channel buffer (if any).        // 那么直接把正在发送的值发送给等待接收的Goroutine        send(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true    }    // 2、当缓冲区未满时    if c.qcount < c.dataqsiz {        // Space is available in the channel buffer. Enqueue the element to send.        // 获取指向缓冲区数组中位于sendx位置的元素的指针        qp := chanbuf(c, c.sendx)        if raceenabled {            raceacquire(qp)            racerelease(qp)        }        // 将当前发送的值拷贝到缓冲区        typedmemmove(c.elemtype, qp, ep)        // sendx索引加一        c.sendx++        // 因为是循环队列,sendx等于队列长度时置为0        if c.sendx == c.dataqsiz {            c.sendx = 0        }        // 队列中元素总数加一,并解锁,返回发送成功        c.qcount++        unlock(&c.lock)        return true    }    // 3、当既没有等待接收的Goroutine,缓冲区也没有剩余空间,如果是非阻塞的发送,那么直接解锁,返回发送失败    if !block {        unlock(&c.lock)        return false    }    // Block on the channel. Some receiver will complete our operation for us.    // 4、如果是阻塞发送,那么就将当前的Goroutine打包成一个sudog结构体,并加入到channel的发送队列sendq里    gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {        mysg.releasetime = -1    }    // No stack splits between assigning elem and enqueuing mysg    // on gp.waiting where copystack can find it.    mysg.elem = ep    mysg.waitlink = nil    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.waiting = mysg    gp.param = nil    c.sendq.enqueue(mysg)    // 调用goparkunlock将当前Goroutine设置为等待状态并解锁,进入休眠等待被唤醒    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)    // Ensure the value being sent is kept alive until the    // receiver copies it out. The sudog has a pointer to the    // stack object, but sudogs aren't considered as roots of the    // stack tracer.    KeepAlive(ep)    // someone woke us up.    // 被唤醒之后执行清理工作并释放sudog结构体    if mysg != gp.waiting {        throw("G waiting list is corrupted")    }    gp.waiting = nil    if gp.param == nil {        if c.closed == 0 {            throw("chansend: spurious wakeup")        }        panic(plainError("send on closed channel"))    }    gp.param = nil    if mysg.releasetime > 0 {        blockevent(mysg.releasetime-t0, 2)    }    mysg.c = nil    releaseSudog(mysg)    return true}

chansend的执行逻辑,上面的注释已经写得很清楚了,我们再来梳理一下。对于非阻塞发送或者channel已经关闭条件下的几种发送失败的情况,处理逻辑比较简单,读者可以对照注释来看;这里我们重点关注channel未关闭时几种常规情况:

存在等待接收的Goroutine

如果等待接收的队列recvq中存在Goroutine,那么直接把正在发送的值发送给等待接收的Goroutine。示意图如下:

深入理解Golang之channel具体看一下send方法:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    ...    if sg.elem != nil {        // 将发送的值直接拷贝到接收值(比如v = <-ch 中的v)的内存地址        sendDirect(c.elemtype, sg, ep)        sg.elem = nil    }    // 获取等待接收数据的Goroutine    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    if sg.releasetime != 0 {        sg.releasetime = cputicks()    }    // 唤醒之前等待接收数据的Goroutine    goready(gp, skip+1)}

这里有必要说明一下Goroutine在调度过程中的几种状态:

_Gidle = iota // goroutine刚刚分配,还没有初始化_Grunnable // goroutine处于运行队列中, 还没有运行,没有自己的栈_Grunning // goroutine在运行中,拥有自己的栈,被分配了M(线程)和P(调度上下文)_Gsyscall // goroutine在执行系统调用_Gwaiting // goroutine被阻塞_Gdead // goroutine没有被使用,可能是刚刚退出,或者正在初始化中_Gcopystack // 表示g当前的栈正在被移除并分配新栈

当调用goready时,将Goroutine的状态从 _Gwaiting置为_Grunnable,等待下一次调度再次执行。

当缓冲区未满时

当缓冲区未满时,找到sendx所指向的缓冲区数组的位置,将正在发送的值拷贝到该位置,并增加sendx索引以及释放锁,示意图如下:

深入理解Golang之channel
阻塞发送

如果是阻塞发送,那么就将当前的Goroutine打包成一个sudog结构体,并加入到channel的发送队列sendq里。示意图如下:

深入理解Golang之channel

之后则调用goparkunlock将当前Goroutine设置为_Gwaiting状态并解锁,进入阻塞状态等待被唤醒(调用goready);如果被调度器唤醒,执行清理工作并最终释放对应的sudog结构体。

接收

channel的接收有两种形式:

<-chn, ok := <-ch

这两种方式分别调用运行时方法chanrecv1chanrecv2:

func chanrecv1(c *hchan, elem unsafe.Pointer) {    chanrecv(c, elem, true)}func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {    _, received = chanrecv(c, elem, true)    return}

这两个方法最终都会调用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {    if debugChan {        print("chanrecv: chan=", c, "\n")    }    // channel为nil    if c == nil {        // 非阻塞直接返回(falsefalse)        if !block {            return        }        // 阻塞接收,则当前Goroutine阻塞挂起        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)        throw("unreachable")    }    // Fast path: check for failed non-blocking operation without acquiring the lock.    // 非阻塞模式,对于以下两种情况:    // 1、无缓冲区且等待发送队列也为空    // 2、有缓冲区但缓冲区数组为空且channel未关闭    // 这两种情况都是接收失败, 直接返回(falsefalse)    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&        atomic.Load(&c.closed) == 0 {        return    }    var t0 int64    if blockprofilerate > 0 {        t0 = cputicks()    }    // 加锁    lock(&c.lock)    // 如果channel已关闭,并且缓冲区无元素    if c.closed != 0 && c.qcount == 0 {        if raceenabled {            raceacquire(c.raceaddr())        }        unlock(&c.lock)        // 有等待接收的变量(即 v = <-ch中的v)        if ep != nil {            //根据channel元素的类型清理ep对应地址的内存,即ep接收了channel元素类型的零值            typedmemclr(c.elemtype, ep)        }        // 返回(truefalse),即接收到值,但不是从channel中接收的有效值        return truefalse    }    // 除了以上非常规情况,还有有以下几种常见情况:    // 1、等待发送的队列sendq里存在Goroutine,那么有两种情况:当前channel无缓冲区,或者当前channel已满    if sg := c.sendq.dequeue(); sg != nil {        // Found a waiting sender. If buffer is size 0, receive value        // directly from sender. Otherwise, receive from head of queue        // and add sender's value to the tail of the queue (both map to        // the same buffer slot because the queue is full).        // 如果无缓冲区,那么直接从sender接收数据;否则,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)        // 接收成功        return truetrue    }    // 2、缓冲区buf中有元素    if c.qcount > 0 {        // Receive directly from queue        // 从recvx指向的位置获取元素        qp := chanbuf(c, c.recvx)        if raceenabled {            raceacquire(qp)            racerelease(qp)        }        if ep != nil {            // 将从buf中取出的元素拷贝到当前协程            typedmemmove(c.elemtype, ep, qp)        }        // 同时将取出的数据所在的内存清空        typedmemclr(c.elemtype, qp)        // 接收索引 +1        c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }        // buf元素总数 -1        c.qcount--        // 解锁,返回接收成功        unlock(&c.lock)        return truetrue    }    // 3、非阻塞模式,且没有数据可以接受    if !block {        // 解锁,直接返回接收失败        unlock(&c.lock)        return falsefalse    }    // no sender available: block on this channel.    // 4、阻塞模式,获取当前Goroutine,打包一个sudog    gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {        mysg.releasetime = -1    }    // No stack splits between assigning elem and enqueuing mysg    // on gp.waiting where copystack can find it.    mysg.elem = ep    mysg.waitlink = nil    gp.waiting = mysg    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.param = nil    // 加入到channel的等待接收队列recvq中    c.recvq.enqueue(mysg)    // 挂起当前Goroutine,设置为_Gwaiting状态并解锁,进入休眠等待被唤醒    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)    // someone woke us up    // 被唤醒之后执行清理工作并释放sudog结构体    if mysg != gp.waiting {        throw("G waiting list is corrupted")    }    gp.waiting = nil    if mysg.releasetime > 0 {        blockevent(mysg.releasetime-t0, 2)    }    closed := gp.param == nil    gp.param = nil    mysg.c = nil    releaseSudog(mysg)    return true, !closed}

chanrecv方法的处理逻辑与chansend非常类似,我们这里仍然只分析几种常见情况,其他情况上述注释也解释得比较清楚了,读者可对照相应代码和注释查看。

存在等待发送的Goroutine

如果等待发送的队列sendq里存在挂起的Goroutine,那么有两种情况:当前channel无缓冲区,或者当前channel已满。从sendq中取出最先阻塞的Goroutine,然后调用recv方法:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    if c.dataqsiz == 0 {        // 无缓冲区        if raceenabled {            racesync(c, sg)        }        if ep != nil {            // copy data from sender            recvDirect(c.elemtype, sg, ep)        }    } else {        // 缓冲区已满        // Queue is full. Take the item at the        // head of the queue. Make the sender enqueue        // its item at the tail of the queue. Since the        // queue is full, those are both the same slot.        qp := chanbuf(c, c.recvx)        if raceenabled {            raceacquire(qp)            racerelease(qp)            raceacquireg(sg.g, qp)            racereleaseg(sg.g, qp)        }        // copy data from queue to receiver        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }        // copy data from sender to queue        typedmemmove(c.elemtype, qp, sg.elem)        c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz    }    sg.elem = nil    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    if sg.releasetime != 0 {        sg.releasetime = cputicks()    }    // 将等待发送数据的Goroutine的状态从_Gwaiting置为 _Grunnable,等待下一次调度。    goready(gp, skip+1)}

1、如果无缓冲区,那么直接从sender接收数据;2、如果缓冲区已满,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部;3、最后调用goready函数将等待发送数据的Goroutine的状态从_Gwaiting置为_Grunnable,等待下一次调度。

下图示意了当缓冲区已满时的处理过程:

深入理解Golang之channel
缓冲区buf中还有数据

如果缓冲区buf中还有元素,那么就走正常的接收,将从buf中取出的元素拷贝到当前协程的接收数据目标内存地址中。值得注意的是,即使此时channel已经关闭,仍然可以正常地从缓冲区buf中接收数据。这种情况比较简单,示意图就不画了。

阻塞接收

如果是阻塞模式,且当前没有数据可以接收,那么就需要将当前Goroutine打包成一个sudog加入到channel的等待接收队列recvq中,将当前Goroutine的状态置为_Gwaiting,等待唤醒。示意图如下:

深入理解Golang之channel

如果之后当前Goroutine被调度器唤醒,则执行清理工作并最终释放对应的sudog结构体。

关闭

说完收发数据,最后就是关闭channel了:

func closechan(c *hchan) {    // nil channel检查    if c == nil {        panic(plainError("close of nil channel"))    }    lock(&c.lock)    // 已关闭的channel不能再次关闭    if c.closed != 0 {        unlock(&c.lock)        panic(plainError("close of closed channel"))    }    if raceenabled {        callerpc := getcallerpc()        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))        racerelease(c.raceaddr())    }    // 设置关闭状态为1    c.closed = 1    var glist glist    // release all readers    // 遍历recvq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中    for {        sg := c.recvq.dequeue()        if sg == nil {            break        }        if sg.elem != nil {            typedmemclr(c.elemtype, sg.elem)            sg.elem = nil        }        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        gp := sg.g        gp.param = nil        if raceenabled {            raceacquireg(gp, c.raceaddr())        }        glist.push(gp)    }    // release all writers (they will panic)    // 遍历sendq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中    for {        sg := c.sendq.dequeue()        if sg == nil {            break        }        sg.elem = nil        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        gp := sg.g        gp.param = nil        if raceenabled {            raceacquireg(gp, c.raceaddr())        }        glist.push(gp)    }    unlock(&c.lock)    // Ready all Gs now that we've dropped the channel lock.    将glist中所有Goroutine的状态置为_Grunnable,等待调度器进行调度    for !glist.empty() {        gp := glist.pop()        gp.schedlink = 0        goready(gp, 3)    }}

1、关闭channel时,会遍历recvqsendq(实际只有recvq或者sendq),取出sudog中挂起的Goroutine加入到glist列表中,并清除sudog上的一些信息和状态。

2、然后遍历glist列表,为每个Goroutine调用goready函数,将所有Goroutine置为_Grunnable状态,等待调度。

3、当Goroutine被唤醒之后,会继续执行chansendchanrecv函数中当前Goroutine被唤醒后的剩余逻辑。

总结

总结一下,本文先通过channel的基本用法对channel的定义、用法细节进行了介绍,然后对channel的基本操作包括发送、接收和关闭进行了较为详细和深入的探究。细心的读者应该也会发现channel的操作跟协程的调度关系密切,不过这篇文章关于goroutine的调度只是一笔带过,后续时机成熟会对这部分内容再作探究。

参考资料

1、The Nature Of Channels In Go2、Concurrency in Golang

深入理解Golang之channel

转载自:https://juejin.cn/post/6844904016254599176
评论
请登录