12. 揭秘 Golang Channel 高性能背后的故事:图解源码
前言
在现代并发编程中,Channel 作为传递数据的管道,扮演着至关重要的角色。它允许生产者和消费者在不同的 goroutine 中安全地传递数据,从而实现解耦和并发控制。本文将深入探讨 Channel 的源码实现,以帮助读者更好地理解其内部机制和工作原理。通过了解 Channel 的源码,我们可以更好地理解并发编程中的关键概念,并掌握如何优化和改进我们的代码。
本文源码版本:1.20.7
地址:src/runtime/chan.go
1.Channel 是什么
很多人都看到过这样一段话:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现内存共享。
Go 是第一个将 CSP(Communicating Sequential Processes) 的这些思想引入,并且发扬光大的语言,所以并发编程成为 Go 的一个独特的优势。而 Go 的并发哲学主要靠使用 Goroutine 和 Channel 实现。那 Channel 到底是什么呢?
Channel 可以理解为一种基于通信的并发编程模型,它主要用于在多个 goroutine 之间传递数据。
Channel 的设计理念旨在通过提供一个安全、高效、灵活的通信机制,来简化并发编程中的数据传递和同步问题。
- Channel 的设计理念强调了通信的重要性。在并发编程中,不同的 goroutine 之间需要安全地传递数据,以实现协作和同步。Channel 作为一种通信管道,提供了发送和接收数据的功能,使得不同的 goroutine 可以通过 Channel 进行通信,从而实现了数据的传递和共享。
- Channel 的设计理念注重简洁性和易用性。在 Go 语言中,使用 Channel 非常简单,只需要通过声明一个类型为
chan T
的变量即可。发送和接收操作也非常直观,可以使用<-
运算符进行发送和接收数据。这种简洁的设计使得使用 Channel 进行并发编程变得非常容易,降低了学习和使用的门槛。 - Channel 的设计理念还强调了并发安全性和性能优化。在并发编程中,数据的安全传递和同步是非常重要的。Channel 内部实现了同步机制,确保在并发环境中安全地传递数据。此外,Channel 还支持缓冲和非阻塞操作,以适应不同的并发场景和需求。这些优化措施使得 Channel 在性能方面表现优异,能够满足各种并发编程的需求。
了解了 Channel 是什么,也明白其在并发编程中扮演着重要的角色,这里简单列举一下 Channel 在并发编程中的一些主要作用:
- 实现同步:通过使用 Channel,可以在多个 goroutine 之间实现同步。当一个 goroutine 需要等待另一个 goroutine 完成某个任务后才能继续执行时,可以使用 Channel 来实现这种同步。
- 传递数据:Channel 可以用于在 goroutine 之间传递数据。一个 goroutine 可以将数据发送到Channel,而另一个 goroutine 可以从 Channel 接收数据。这使得在不同 goroutine 之间共享和传递数据变得非常容易。
- 解耦生产者和消费者:Channel 可以作为生产者和消费者之间的桥梁。生产者将数据发送到 Channel,而消费者从 Channel 接收数据。这样生产者和消费者可以独立地运行,无需直接相互调用,从而实现了解耦。
- 支持异步操作:通过使用 Channel,可以实现异步操作。一个 goroutine 可以启动一个异步任务,将结果发送到 Channel,而另一个 goroutine 可以在稍后的时间从 Channel 接收结果。这使得并发编程更加灵活和高效。
- 确保并发安全:Channel 内部实现了同步机制,确保在并发环境中安全地传递数据。这避免了常见的并发问题,如数据竞争和死锁。
聊了这么多,是不是对 Channel 很感兴趣了呢?接下来就让我们进入 Channel 的源码世界一探究竟吧!
2.Channel 源码概述
本节对 Channel 中的重要结构和函数、方法进行简单的介绍,为后续理解代码打好基础。
2.1 Channel 重要数据结构
2.1.1 hchan 结构
hchan
对象表示运行时的 channel
。
hchan.buf
循环数组,用于有缓冲区的 channel 存储。
hchan.recvq
接收 goroutine 等待队列 (数据结构是双向链表)。
hchan.sendq
发送 goroutine 等待队列 (数据结构是双向链表)。
hchan
源码:
type hchan struct {
qcount uint // channel 元素数量
dataqsiz uint // channel 缓冲区环形队列长度
buf unsafe.Pointer // 指向缓冲区的底层数组 (针对有缓冲的 channel)
elemsize uint16 // channel 元素大小
closed uint32 // channel 是否关闭
elemtype *_type // channel 元素类型
sendx uint // 当前已发送元素在队列中的索引
recvx uint // 当前已接收元素在队列中的索引
recvq waitq // 接收 goroutine 等待队列 (数据结构是链表)
sendq waitq // 发送 goroutine 等待队列 (数据结构是链表)
// lock 保护结构体中的所有字段,以及 sudogs 对象中被当前 channel 阻塞的几个字段
lock mutex
}
2.1.2 waitq
waitq 为 goroutine 等待队列,底层是由 sudog 数据结构的双向链表实现的。first 为队列头部,last 为队列尾部,图中也画出了相应的数据结构形式,很好理解。
type waitq struct {
first *sudog
last *sudog
}
sudog 是对 goroutine 和 channel 对应关系的一层封装抽象,以便于 goroutine 可以同时阻塞在不同的 channel 上(比如:select 没有 default,且所有 channel 阻塞),其中 elem 用于读取/写入 channel 的数据的容器。sudog 中所有字段都受 hchan.lock
保护。
type sudog struct {
g *g // 绑定 goroutine
next *sudog // 下一个节点
prev *sudog // 上一个节点
elem unsafe.Pointer // data element (may point to stack)
// isSelect = true 表示 g(协程) 正在参与 select,阻塞在多个 chan 中
// select 只能通过一个 case,所以需要使用 g.selectDone 标志已经有 case 通过了
// 其余 case 可以从等待列表中删除了,g.selectDone 必须经过 CAS 才能执行成功。
isSelect bool
success bool // 表示通道c通信是否成功。
c *hchan // 绑定 channel
}
2.1.3 常量
const (
// 内存对齐的最大值,这个等于 64 位 CPU 下的 cacheline 的大小
maxAlign = 8
// 计算 unsafe.Sizeof(hchan{}) 最接近的 8 的倍数(对齐内存使用)
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
// 是否开启 debug 模式
debugChan = false
)
2.2 Channel 重要函数和方法
本文将主要介绍以下重要函数和方法的具体实现代码,这里整体预览一下。
3.创建 Channel
编译器会将应用层代码中的 make(chan type, N)
语句转换为 makechan
函数调用。源码中主要对不同类型的 channel 申请内存的方式做了区分:
hchan.buf
大小为 0 的 channel- 无缓冲区型 chan(没有 buf,因此不用分配内存)
- 元素类型为
struct{}
的 chan(只使用游标 sendx 或 recvx,不会拷贝元素到缓冲区,因此 buf 也不用分配内存)
hchan.buf
大小不为 0- 元素类型不含指针的 channel,此时为 hchan 结构体和 buf 字段分配一段连续的内存,GC 不会扫描 buf 内的元素。
- 元素类型包含指针的 channel,分别为 hchan 结构体和 buf 字段单独分配内存空间。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 由编译器检查保证元素大小不能大于等于 64K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 检测 hchan 结构体大小是否是 maxAlign = 8 的整数倍
// 并且元素的对齐单位不能超过最大对齐单位
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 判断申请内存空间大小是否越界
// mem = elem.size * size(内存大小 = 元素大小 * 元素个数)
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 当存储在 buf 中的元素不包含指针时,可以消除 GC 扫描
var c *hchan
switch {
case mem == 0:
// 1. 无缓冲型 chan
// 2. 元素类型:struct{} 的 chan(只使用游标,不会拷贝元素)
// 只分配 hchan 结构体大小内存
// 因为以上两种情况都用不到 c.buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素类型不含指针 - GC 就不会扫描 chan 中的元素
// 只进行一次内存分配,分配大小 hchanSize+mem
// chan 结构体和循环数组内存地址连续
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// c.buf 指向循环数组起始位置
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素类型含指针
// 分别申请 chan 和 buf 的空间(两者内存地址不连续)
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化其余字段
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
4.Channel 发送数据
编译器会根据 channel 发送数据的使用方式转换为 chansend
函数调用:
c <- x
语句会被转换为chansend1
函数。 block = true (阻塞发送)select + case c <- v + default
语句会被转换为 selectnbsend 函数。block = false(非阻塞发送)
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
// compiler implements
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
4.1 chansend 函数
chansend
函数向 channel 发送数据,并返回是否发送成功。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// chan 为 nil
if c == nil {
// 非阻塞
if !block {
return false
}
// 挂起当前 goroutine,永久阻塞
// 调用 gopark 时传入的 unlockf 为 nil,会被一直休眠
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 非阻塞 && 未关闭 && 未做好发送准备 -> 直接返回
if !block && c.closed == 0 && full(c) {
return false
}
// 加锁
lock(&c.lock)
// channel 关闭
if c.closed != 0 {
// 解锁
unlock(&c.lock)
// 抛出 panic - 向一个已经关闭的 channel 发送数据会导致 panic
panic(plainError("send on closed channel"))
}
// 出队一个等待接收的 goroutine
if sg := c.recvq.dequeue(); sg != nil {
// 将数据发送给等待接收的 sudog,绕过 buf 缓冲区
// 唤醒 goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
// 返回成功
return true
}
// qcount 是队列当前元素数量
// dataqsiz 是队列总长度
// 当前元素数量小于队列总长度时,说明 buf 还有空闲空间可供使用
if c.qcount < c.dataqsiz {
// 获取下一个可放置缓冲数据的 buf 地址
qp := chanbuf(c, c.sendx)
// 将发送的数据拷贝到缓冲区
typedmemmove(c.elemtype, qp, ep)
// 发送索引 +1
c.sendx++
// 循环队列,当 sendx = 队列长度时,重置为 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 缓冲区元素数量 +1
c.qcount++
// 解锁
unlock(&c.lock)
// 返回成功
return true
}
if !block {
// 非阻塞 + 缓冲区没有空闲空间可以使用 - 解锁、返回失败
unlock(&c.lock)
return false
}
// 以下为阻塞发送情况代码:阻塞发送 && buf 没有空闲空间
// 获取当前发送数据的 goroutine
gp := getg()
// 从 sudogcache 中获取 sudog
mysg := acquireSudog()
// 构造封装当前 goroutine 的 sudog 对象
// 建立 sudog、goroutine、channel 之间的指向关系
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 把 sudog 添加到当前 channel 的发送队列
c.sendq.enqueue(mysg)
// 挂起协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 其他协程通过 c 接收数据,唤醒该协程 gp
// 确保正在发送的值保持活动状态,直到接收者将其复制出来。
KeepAlive(ep)
// 此时被唤醒 gp.waiting 不是当前的 mysg 直接 panic(说明遭受破坏)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil // 等待任务结束,重置为 nil
gp.activeStackChans = false
// success 表示通道c通信是否成功。
// 如果 goroutine 因通过通道 c 传递值而被唤醒,则为 true;
// 如果因为 c 被关闭而被唤醒,则为 false。
// closed 为其相反的值
closed := !mysg.success
gp.param = nil // 重置唤醒时传递的参数(传递为 sudog)
// 取消和 channel 的绑定关系
mysg.c = nil
// 释放 Sudog
releaseSudog(mysg)
// 通道 c 通信失败,抛出失败原因
if closed {
// channel 发送失败 -- 抛出 panic
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
// 返回发送成功
return true
}
4.2 send 函数
send
函数用于处理 channel
数据的发送操作,发送到阻塞接收的 receiver 中,分为两步:
- 调用
sendDirect
直接将发送方的数据(ep)拷贝到接收方持有的目标内存地址上(sg.elem 所指的地方) ,这里看代码或许会存在割裂感,需要配合chanrecv
函数内容进行理解,等阅读完全部代码,我们再整体回顾一下(在接收函数chanrecv
中,ep 参数指向接收目的地址,被赋值到 mysg.elem = ep )。 - 将等待接收的
goroutine
唤醒。
sendDirect
函数用于 channel 具体的发送数据操作,将发送方的数据直接写入到接收方的目的地址中。
// sg 表示接收方 goroutine
// ep 表示要发送的数据
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 接收数据的地址不为空,则拷贝数据 (sg.elem 用来接收数据)
if sg.elem != nil {
// 直接拷贝数据到接收方 sudog.elem 所指向的地址
// 接收方会从这个地址获取 chan 传递的内容
sendDirect(c.elemtype, sg, ep)
// sudog 中 elem 置为 nil(elem 的任务结束了,可以重置为 nil)
// 这里置为 nil,不会影响上一步数据的拷贝,只是指针换了指向
sg.elem = nil
}
// 从 sudog 获取 goroutine
gp := sg.g
// 解锁 hchan (结合 chansend 函数加锁)
unlockf()
gp.param = unsafe.Pointer(sg)
// 设置接收成功
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 调用 goready 函数将接收方 goroutine 唤醒并标记为可运行状态
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
// ...
// 拷贝数据
memmove(dst, src, t.size)
}
4.3 小结
读完代码,简单的画个图加深一下印象吧!chanrecv 和 closechan 函数暂时还没有讲到,等看完相关源码,再返回来看这个图,应该会一目了然。
5.Channel 接收数据
编译器会根据 channel 接收数据的使用方式转换为 chanrecv
函数调用:
<- ch
语句转换为chanrecv1
函数调用。(阻塞接收)x, ok <- ch
语句转换为chanrecv2
函数调用。(阻塞接收)select + v, ok = <-c + default
语句会被转换为 selectnbrecv 函数。block = false(非阻塞接收)
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// compiler implements
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
5.1 chanrecv 函数
chanrecv
函数用于在 channel 上接收数据,并将接收到的数据写入参数 ep (ep 可以设置为 nil,这种情况下接收到的数据将会被忽略,本函数和 4.2 小节中 send 函数都有使用),并有两个返回值(selected, received bool)
- selected:select 中是否会选择该分支
- received:是否接收到了数据
chanrecv
函数中需要注意,即使 channel 已经关闭,仍然可以继续接收数据,直到 buf 为空为止。而且读取已经关闭的 channel 中的数据,也不会 panic。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
// 非阻塞直接返回 false,false
if !block {
return
}
// 阻塞情况:在 nil channel 接收数据,永久阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 非阻塞 && c 数据为空
if !block && empty(c) {
// 判断 channel 是否已经关闭
if atomic.Load(&c.closed) == 0 {
// chan 未关闭,还没有数据,返回 false,false
return
}
// chan 已关闭,二次判断是否有待接收的数据数据
// 后边就不可能再有数据进来
if empty(c) {
// chan 为空 && 已关闭
if ep != nil {
// 清理 ep 的内存
typedmemclr(c.elemtype, ep)
}
// 读一个已关闭&&数据为空的 chan,返回 true, false
return true, false
}
}
// 加锁
lock(&c.lock)
if c.closed != 0 {
// chan 已关闭 && 缓冲区也没有数据了,返回 true, false
// 并发环境下,随时可能关闭 chan
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
// 判断有阻塞在等待发送的 goroutine
if sg := c.sendq.dequeue(); sg != nil {
// 拿到第一个等待的 sender
// 如果缓冲区大小为 0,那第一个 sender 数据可以直接拷贝到 ep
// 如果缓冲区大小不为 0,则表明缓冲区中有更早的数据,ep 应该取缓冲区头部数据
// 而该 sender 数据应该写入缓冲区尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 缓冲区有数据
if c.qcount > 0 {
// 获取数据地址
qp := chanbuf(c, c.recvx)
if ep != nil {
// 拷贝数据,ep 为 nil 不用动
typedmemmove(c.elemtype, ep, qp)
}
// 清理缓冲区数据
typedmemclr(c.elemtype, qp)
// 循环数组更新下一个 recvx 索引
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount-- // 缓冲区元素计数 -1
unlock(&c.lock) // 解锁
return true, true
}
// 缓冲区没有数据 && 非阻塞
if !block {
// 解锁,直接返回 false, false
unlock(&c.lock)
return false, false
}
// 代码执行到这里表示:没有可用的 sender && 缓冲区没有数据 && 阻塞
gp := getg()
mysg := acquireSudog()
// 目标地址保存到 mysg.elem 中
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 塞入等待接收队列
c.recvq.enqueue(mysg)
// 阻塞协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg) // 释放 mysg
return true, success
}
5.2 recv 函数
recv
函数用于处理 channel 的数据接收操作,接收一个阻塞发送的 sender 的数据到 buf 或者 receiver 中。
- 当缓冲区大小为 0:直接拷贝阻塞发送的数据到 ep 中;
- 当缓冲区大小不为 0(缓冲区已塞满,否则不会阻塞发送协程):本着队列先进先出的原则,接收方获取 buf 的头部数据,而发送方发送数据到 buf 的尾部,同时更新 c.sendx = c.recvx;
- 唤醒阻塞的 sender 协程。
recvDirect 函数用于 channel 具体的接收数据操作,将发送方的数据直接写入到接收方的目的地址中。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 获取队列首元素
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
// 更新 recvx 索引
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 队列是满的,所以 c.sendx = c.recvx
// 或者使用 c.sendx = (c.sendx+1) % c.dataqsiz 更新 sendx 也行
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
unlockf() // 解锁
gp.param = unsafe.Pointer(sg)
sg.success = true
// 唤醒阻塞的 sender 协程
goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
memmove(dst, src, t.size)
}
5.3 小结
看到这里,发送和接收数据已经看完了,这个图应该可以看懂 90% 了。
6.关闭 Channel
clsoe(chan)
语句对应转换为 closechan 函数调用。关闭 channel 时需要注意,如果 sendq 中还存在 sender 协程阻塞等待,当唤醒 sender 协程后,会发生 panic。
func closechan(c *hchan) {
// 关闭一个 nil channel, 抛出 panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock) // 加锁
if c.closed != 0 {
unlock(&c.lock)
// 关闭一个已经关闭的 channel, 抛出 panic
panic(plainError("close of closed channel"))
}
// chan 关闭标志
c.closed = 1
// goroutine 列表
// 用于存放发送+接收队列中的所有 goroutine
// gList 为栈结构
var glist gList
// 释放所有 readers
for {
// readers 出队
sg := c.recvq.dequeue()
// 说明接收队列为空,直接跳出循环
if sg == nil {
break
}
// 清理接收数据的地址内存
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
// 将绑定的 goroutine 存入 glist
glist.push(gp)
}
// 释放所有 writers (they will panic)
// 发送 goroutine 被唤醒后会导致 panic
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// 解锁
unlock(&c.lock)
// 将所有 goroutine 唤醒
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
closechan 函数的流程就相对简单很多了:
7.其他函数和方法
7.1 full 和 empty 函数
full 函数检测 sender 向 channel 发送数据是否应该阻塞,或者说是否做好了发送数据的准备:
- 如果 channel 没有缓冲区,查看等待接收队列是否存在接收者等待,不存在则 sender 应该阻塞;
- 如果 channel 有缓冲区,比较元素数量和缓冲区长度是否一致,一致表明缓冲区已满,sender 应该阻塞。
func full(c *hchan) bool {
if c.dataqsiz == 0 {
return c.recvq.first == nil
}
return c.qcount == c.dataqsiz
}
empty 函数检测 receiver 从 channel 读取数据是否应该阻塞,或者说是否做好了读取数据的准备:
- 如果 channel 没有缓冲区,查看等待发送队列是否存在发送者等待,不存在则 receiver 应该阻塞。
- 如果 channel 有缓冲区,检查缓冲区元素数量是否等于 0,等于 0 表示没有数据等待接收,receiver 应该阻塞。
func empty(c *hchan) bool {
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}
chanrecv 函数进行快速检查时,检查中的状态不能发生变化,否则容易导致返回结果错误。因此,为了防止错误的检查结果,c.closed 和 empty() 都必须使用原子检查。
// 非阻塞 && c 数据为空
if !block && empty(c) {
// 判断 channel 是否已经关闭
if atomic.Load(&c.closed) == 0 {
// chan 未关闭,还没有数据,返回 false,false
return
}
// chan 已关闭,二次判断是否有待接收的数据数据
// 后边就不可能再有数据进来
if empty(c) {
// chan 为空 && 已关闭
if ep != nil {
// 清理 ep 的内存
typedmemclr(c.elemtype, ep)
}
// 读一个已关闭&&数据为空的 chan,返回 true, false
return true, false
}
}
这里总共检查了 2 次 empty()。因为第一次检查时,channel 可能还没有关闭,但是第二次检查的时候关闭了,在 2 次检查之间可能有待接收的数据到达了,所以需要 2 次 empty() 检查。
7.2 enqueue 和 dequeue 方法
enqueue 入队 enqueue 方法用于将 sudog 放入 channel 的发送/接收队列,内部实现就是双向链表操作。
func (q *waitq) enqueue(sgp *sudog) {
sgp.next = nil
x := q.last // 链表尾部
if x == nil {
// 尾部为 nil,表示只有一个 sudog 节点
sgp.prev = nil
q.first = sgp
q.last = sgp
return
}
// 在队列尾部插入一个新的 sudog
sgp.prev = x
x.next = sgp
// 更新尾部指向
q.last = sgp
}
dequeue 出队 dequeue 方法用于出队 channel 的发送/接收队列的一个元素 sudog,内部实现就是双向链表操作。
func (q *waitq) dequeue() *sudog {
for {
// 取队列头部元素
sgp := q.first
if sgp == nil {
return nil
}
// 头部元素不为 nil
// 设置队列中的 first\last 指向
y := sgp.next
if y == nil {
// 队列只有头部一个元素
q.first = nil
q.last = nil
} else {
// 头部后边最少还有一个元素
// 设置 y 为新的头部
y.prev = nil
q.first = y
sgp.next = nil // 标记为已删除(头部被取走)
}
// 如果一个 goroutine 因为在 select 阻塞在多个 chan 等待队列中,
// select 中只有一个 case 可以顺利执行,所以需要使用 g.selectDone 标志是否已经有
// case 赢得了比赛;其他 case 只执行删除操作,不返回 sudog;
// 只有 g.selectDone 经过 CAS 才能获取到执行权,从 0 写到 1
// 否则都 continue 掉,即可从等待列表中删除
// isSelect = true 代码在 selectgo 中
if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
continue
}
return sgp
}
}
为什么要用自旋的方式获取 waitq 中的 sudog?
因为 sudog 进入 waitq 队列分为两种情况:
- 在 chansend 和 chanrecv 函数中因为 读取/写入 阻塞进入 waitq 队列中;
- 在 select 的 case 中阻塞进入 waitq,因此 goroutine 会同时阻塞在不同的 channel 中。源码在:
src/runtime/select.go
中,select 的实现函数是 selectgo,这里不详细进行展开,我们看一个代码片段,在代码中陷入阻塞的 select 会把阻塞的 goroutine 同时塞入每一个 case 下的 channel 中,从而进入等待队列中。
针对因为 select 陷入阻塞的情况,sudog 中使用字段IsSelect = true
进行表示,由于 select 使用中只有一个 case 能够通过阻塞并唤醒 goroutine 最终执行,因此使用 g.selectDone
对此进行标识,标志已经有 case 通过竞争获取了执行权,其余等待的 sudog 可以自行消亡了,于是就出现了 dequeue 方法的自旋销毁方式,没有更改 g.selectDone 值成功的 sudog 都进行链表删除操作。
总结
通过阅读源码,我们可以总结下面这一张 Channel 的操作规则:
操作 | nil | 已关闭 channel | 未关闭有缓冲区的 channel | 未关闭无缓冲区的 channel |
---|---|---|---|---|
关闭 | panic | panic | 成功关闭,然后可以读取缓冲区的值,读取完之后,继续读取到的是 channel 类型的默认值 | 成功关闭,之后读取到的是 channel 类型的默认值 |
接收 | 阻塞 | 不阻塞,读取到的是 channel 类型的默认值 | 不阻塞,正常读取值 | 阻塞 |
发送 | 阻塞 | panic | 不阻塞,正常写入值 | 阻塞 |
Channel 是 Go 语言中用于实现并发安全通道的一种数据结构,具有以下优点:
- 线程安全:Channel 提供了一种并发安全的通信机制,可以在多个 goroutine 之间进行安全的消息传递。
- 支持多种类型的消息传递:Channel 可以传递任意类型的数据,包括基本类型、结构体、接口等。
- 阻塞和非阻塞操作:Channel 支持阻塞和非阻塞操作,可以根据需要选择合适的操作方式。
- 缓冲和非缓冲:Channel 可以配置为缓冲或非缓冲的,以满足不同的需求。
- 解耦生产者和消费者:使用 Channel 可以解耦生产者和消费者,使得生产者和消费者之间的交互更加灵活和可配置。
- 支持select语句:在 Go 语言中,可以使用 select 语句来同时监听多个 Channel 的消息,从而实现多路复用。
Channel 和锁在并发编程中都可以用来解决并发问题,但它们各自有不同的侧重点和适用场景。
- 关注点:Channel 关注的是并发问题的数据流动,适用于数据在多个协程中流动的场景。而锁关注的是某一场景下只给一个协程访问数据的权限,适用于数据位置固定的场景。
- 解决问题的方式:Channel 通过数据流动来解决并发问题,可以在协程之间传递数据,实现协程之间的通信。而锁则是通过互斥访问来解决并发问题,同一时间只允许一个协程访问共享数据,避免出现数据竞争的情况。
Channel 和锁都是解决并发问题的有效工具,但应根据具体的场景和需求来选择使用哪种方式,下边具体列举一下:
锁的使用场景
- 访问共享数据结构中的缓存信息
- 保存应用程序上下文和状态信息
- 保护某个结构内部状态和完整性
- 高性能要求的临界区代码
channel 的使用场景
- 线程 (goroutine) 通信
- 并发通信
- 异步操作
- 任务分发
- 传递数据所有权
- 数据逻辑组合 (如 Pipeline, FanIn FanOut 等并发模式)
Channel 在 Go 语言中有许多应用场景,以下是其中一些常见的应用场景:
- 生产者-消费者模式:Channel 可以用于实现生产者-消费者模式。生产者将数据发送到 Channel,消费者从 Channel 接收数据。这种模式可以有效地解耦生产者和消费者,使得它们可以独立地运行,无需直接相互调用。
- 管道模式:Channel 可以作为管道,用于在不同 goroutine 之间传递数据。例如,可以使用 Channel 来将数据从一个 goroutine 传递到另一个 goroutine,从而实现数据的流动和处理。
- 超时处理:通过使用带有超时的 Channel,可以实现超时处理。例如,可以使用 select 语句和
time.After()
或time.Tick()
函数,来设置超时时间,并在超时后执行相应的操作。 - 控制并发数:在需要控制并发数的场景中,可以使用 Channel 来控制并发规模。例如,可以使用一个带有缓冲的 Channel 来限制同时进行的任务数量。当 Channel 中有足够的空间时,可以启动新的任务;当 Channel 已满时,可以等待已有的任务完成后再启动新的任务。
- 同步和协调:Channel 可以用于实现 goroutine 之间的同步和协调。例如,可以使用带有缓冲的 Channel 来确保多个 goroutine 在某个点上达到同步。当所有的 goroutine 都向 Channel 发送了数据后,可以从 Channel 接收数据,从而确保所有 goroutine 都完成了相应的操作。
Channel 在 Go 语言中具有广泛的应用场景,可以用于实现各种并发模式和同步机制,通过合理地使用Channel,可以提高并发编程的效率和可靠性。
以上就是本文的全部内容,如果觉得还不错的话欢迎点赞,转发和关注,感谢支持。
转载自:https://juejin.cn/post/7312241785542393871