likes
comments
collection
share

从源码剖析角度浅谈Golang Channel

作者站长头像
站长
· 阅读数 40

从源码剖析角度浅谈Golang Channel

〇、前言

这是《让我们一起Golang》专栏的第45篇文章,本文企图从源码角度剖析Go的管道,不过由于笔者水平和工作经验限制,可能文章存在许多不足或错误,烦请指出斧正!

本专栏的其他文章:

一、通道的底层数据结构

真正了解一个东西需要了解它是怎么实现的,下面是channel的底层数据结构hchan的源码:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters// lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

其中,qcount是通道队列里面元素的数量,dataqsiz是底层循环队列的大小,buf是unsafe.Pointer类型,也就是指向底层循环队列的指针,elemsize指的是通道中的元素大小,closed是表示通道是否被关闭,注意它是uint32类型,elemtype是描述channel中元素类型的,sendx和recvx都指向循环队列,分别指向当前可发送和接收的元素在底层循环队列中的索引,recvq和sendq是waitq类型的,分别表示因为尝试读取而阻塞等待接收的goroutine队列、因为尝试写入而阻塞等待发送的goroutine队列。lock是用来确保channel的读写操作都是原子操作。

从源码剖析角度浅谈Golang Channel

二、通道Channel的创建过程

考虑有的大佬时间很宝贵,为了节省大佬们的时间,makechan的源码我贴到下面了:

可以看到,返回的就是上面的hchan类型的指针。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
​
    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
​
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }
​
    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
​
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)
​
    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

从源码剖析角度浅谈Golang Channel

这一部分是用来检查elem.size、elem.align是否合法,并且计算创建该channel需要的内存大小,判断是否超出最大值。

从源码剖析角度浅谈Golang Channel

这里定义了一个hchan的指针c,然后分三种情况分配内存:

  • 如果Channel容量或者元素大小为0。
  • hchan结构体中不包含指针。
  • hchan结构体中包含指针。

第一种表示当前Channel中不存在缓冲区,那么只会给底层队列buf分配一段内存空间。

从源码剖析角度浅谈Golang Channel

第二种表示当前的Channel中存储的类型不是指针类型,那么会为Channel和其底层数组分配一段连续的空间。

第三种会单独为runtime.hchan和缓冲区分配内存。

三、通道的接收过程

负责管道的接收操作的chanrecv代码长度比较长,这里就不一次性全部拿出来了。

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

如果我们c==nil,即我们从一个空Channel接收数据时会调用gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)让出处理器。

    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
​

然后判断是否为非阻塞调用、Channel是否为空。如果Channel为空,且后来观察到的管道没有关闭,因为管道不能再打开,表示之前的管道没有关闭,就会直接return false,表示接收失败。

接下来的if empty(c)进入此条件,说明管道已经被关闭,此时返回零值即可。

    lock(&c.lock)
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

此处是管道已经关闭且循环队列中没有元素的情况:

  • 非缓存管道关闭
  • 有缓存管道关闭and底层数组中没有元素

答案是接收零值。

    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
​

这段代码处理发送队列里面有goroutine的两种情况:

  • 非缓存的管道
  • 有缓存的管道底层数组满了
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

有缓存的通道正常接收数据,有底层数组索引的变化。

接下来是处理非阻塞接收和阻塞的情况了,不再细讲。

四、通道的发送过程

    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

当管道为nil,如果没有阻塞,直接返回false。如果阻塞了,就交出处理器资源。

    if !block && c.closed == 0 && full(c) {
        return false
    }
​
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
​
    lock(&c.lock)
​
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
​

如果进行不阻塞的发送操作,管道没有关闭,且缓存空间满:

  • 非缓存管道,且接收队列中没有goroutine
  • 有缓存管道,底层数组已满
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

接收队列有人要取,直接给!

    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
​
    if !block {
        unlock(&c.lock)
        return false
    }
​

有缓存的管道,还能存,直接存。

    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

管道满了,发送goroutine被阻塞。

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    if closed {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    return true

经过一通操作,被唤醒了,继续发送。

大名鼎鼎的panic : panic(plainError("send on closed channel"))就在那,被唤醒后,管道已经关闭了,就报panic。

参考文献

  1. Go 语言 Channel 实现原理精要 | Go 语言设计与实现 draveness.me/golang/docs…
  1. golang channel 最详细的源码剖析 - 知乎 zhuanlan.zhihu.com/p/297053654
  2. Channel · Go语言中文文档 www.topgoer.com/%E5%B9%B6%E…

本文正在参加「金石计划 . 瓜分6万现金大奖」