从源码剖析角度浅谈Golang Channel
从源码剖析角度浅谈Golang Channel
〇、前言
这是《让我们一起Golang》专栏的第45篇文章,本文企图从源码角度剖析Go的管道,不过由于笔者水平和工作经验限制,可能文章存在许多不足或错误,烦请指出斧正!
本专栏的其他文章:
一、通道的底层数据结构
真正了解一个东西需要了解它是怎么实现的,下面是channel的底层数据结构hchan
的源码:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
其中,qcount是通道队列里面元素的数量,dataqsiz是底层循环队列的大小,buf是unsafe.Pointer类型,也就是指向底层循环队列的指针,elemsize指的是通道中的元素大小,closed是表示通道是否被关闭,注意它是uint32类型,elemtype是描述channel中元素类型的,sendx和recvx都指向循环队列,分别指向当前可发送和接收的元素在底层循环队列中的索引,recvq和sendq是waitq类型的,分别表示因为尝试读取而阻塞等待接收的goroutine队列、因为尝试写入而阻塞等待发送的goroutine队列。lock是用来确保channel的读写操作都是原子操作。
二、通道Channel的创建过程
考虑有的大佬时间很宝贵,为了节省大佬们的时间,makechan的源码我贴到下面了:
可以看到,返回的就是上面的hchan类型的指针。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
这一部分是用来检查elem.size、elem.align是否合法,并且计算创建该channel需要的内存大小,判断是否超出最大值。
这里定义了一个hchan的指针c,然后分三种情况分配内存:
- 如果Channel容量或者元素大小为0。
- hchan结构体中不包含指针。
- hchan结构体中包含指针。
第一种表示当前Channel中不存在缓冲区,那么只会给底层队列buf分配一段内存空间。
第二种表示当前的Channel中存储的类型不是指针类型,那么会为Channel和其底层数组分配一段连续的空间。
第三种会单独为runtime.hchan和缓冲区分配内存。
三、通道的接收过程
负责管道的接收操作的chanrecv
代码长度比较长,这里就不一次性全部拿出来了。
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
如果我们c==nil,即我们从一个空Channel接收数据时会调用gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
让出处理器。
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
然后判断是否为非阻塞调用、Channel是否为空。如果Channel为空,且后来观察到的管道没有关闭,因为管道不能再打开,表示之前的管道没有关闭,就会直接return false,表示接收失败。
接下来的if empty(c)
进入此条件,说明管道已经被关闭,此时返回零值即可。
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
此处是管道已经关闭且循环队列中没有元素的情况:
- 非缓存管道关闭
- 有缓存管道关闭and底层数组中没有元素
答案是接收零值。
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
这段代码处理发送队列里面有goroutine的两种情况:
- 非缓存的管道
- 有缓存的管道底层数组满了
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
有缓存的通道正常接收数据,有底层数组索引的变化。
接下来是处理非阻塞接收和阻塞的情况了,不再细讲。
四、通道的发送过程
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
当管道为nil,如果没有阻塞,直接返回false。如果阻塞了,就交出处理器资源。
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
如果进行不阻塞的发送操作,管道没有关闭,且缓存空间满:
- 非缓存管道,且接收队列中没有goroutine
- 有缓存管道,底层数组已满
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
接收队列有人要取,直接给!
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
有缓存的管道,还能存,直接存。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
管道满了,发送goroutine被阻塞。
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
经过一通操作,被唤醒了,继续发送。
大名鼎鼎的panic : panic(plainError("send on closed channel"))
就在那,被唤醒后,管道已经关闭了,就报panic。
参考文献
- Go 语言 Channel 实现原理精要 | Go 语言设计与实现 draveness.me/golang/docs…
- golang channel 最详细的源码剖析 - 知乎 zhuanlan.zhihu.com/p/297053654
- Channel · Go语言中文文档 www.topgoer.com/%E5%B9%B6%E…
本文正在参加「金石计划 . 瓜分6万现金大奖」