深入理解 go channel
什么是channel
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
channel提供了一种同步的机制,确保在数据发送和接收之间的正确顺序和时机。通过使用channel,我们可以避免在多个goroutine之间共享数据时出现的竞争条件和其他并发问题。
func main() {
ch := make(chan string)
go func() {
ch <- "test"
}()
msg := <-ch
fmt.Println(msg)
}
Channel 可以看做goroutine之间的桥梁:
channel的特性
单向通道
有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。
Go语言中提供了单向通道来处理这种情况:
// 单向发送 out 通道
func cter(out chan <- int){
for i := 0; i < 10; i++ {
out <- i
}
close(out)
}
// 单向发送 out 通道, 单向接收 in 通道
func sqer(out chan <- int , in <- chan int){
for i := range in{
out <- i * i
}
close(out)
}
// 单向接收 in 通道
func prter(in <-chan int){
for i := range in {
fmt.Println(i)
}
}
func main(){
out := make(chan int)
in := make(chan int)
go cter(out)
go sqer(out, in)
prter(in)
}
chan<- int 是一个只能发送的通道,可以发送但是不能接收;
<-chan int 是一个只能接收的通道,可以接收但是不能发送。
在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。
无缓冲的通道(阻塞)
无缓冲的通道又被称为阻塞的通道。 我们看一下示例:
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
上面这段代码能够通过编译,但是执行时会出现一下错误:
fatal error: all goroutines are asleep - deadlock!
为什么会出现deadlock错误呢?
因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。就像你住的小区没有快递柜和代收点,快递员给你打电话必须要把这个物品送到你的手中,简单来说就是无缓冲的通道必须有接收才能发送。
上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?
一种方法是启用一个goroutine去接收值,例如:
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
ch <- 10
fmt.Println("发送成功")
}
无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
有缓冲的通道(非阻塞)
解决无缓冲通道(阻塞)死锁的问题,就是使用有缓冲的通道。
通过缓存的使用,可以尽量避免阻塞,提供应用的性能。
我们使用 make 函数在初始化的时候为其指定通道的容量(缓冲大小):
func main(){
ch := make(chan int ,1) // 创建一个容量为 1 的有缓冲区的通道
ch <- 10
fmt.Println("发送成功")
}
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。
我们可以使用内置的len函数获取通道内��素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。
Channel 底层结构
实现 Channel 的结构并不神秘,本质上就是一个 mutex 锁加上一个环状缓存、 一个发送方队列和一个接收方队列:
// src/runtime/chan.go
type hchan struct {
qcount uint // 队列中的所有数据数
dataqsiz uint // 环形队列的大小
buf unsafe.Pointer // 指向大小为 dataqsiz 的数组
elemsize uint16 // 元素大小
closed uint32 // 是否关闭
elemtype *_type // 元素类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // recv 等待列表,即( <-ch )
sendq waitq // send 等待列表,即( ch<- )
lock mutex
}
重点字段:
-
buf 指向底层循环数组,只有缓冲型的 channel 才有;
-
sendx,recvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。
-
sendq,recvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。
-
waitq 是 sudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g // 等待发送队列或者等待接收队列的goroutine
next *sudog
prev *sudog
elem unsafe.Pointer // goroutine的栈(发送或者接收的数据的地址)
// ...
}
- lock 用来保证每个读 channel 或写 channel 的操作都是原子的。
例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :
Channel 的创建
Channel 的创建语句由编译器完成如下翻译工作:
make(chan type, n) => makechan(type, n)
或者
make(chan type, n) => makechan64(type, n)
Channel 并不严格支持 int64 大小的缓冲,当 make(chan type, n) 中 n 为 int64 类型时, 运行时的实现仅仅只是将其强转为 int,提供了对 int 转型是否成功的检查:
// src/runtime/chan.go
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic("makechan: size out of range")
}
return makechan(t, int(size))
}
而具体的 makechan 实现的本质是根据需要创建的元素大小, 对 mallocgc 进行封装, 因此,Channel 总是在堆上进行分配,它们会被垃圾回收器进行回收, 这也是为什么 Channel 不一定总是需要调用 close(ch) 进行显式地关闭。
// src/runtime/chan.go
// 将 hchan 的大小对齐
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&7)
func makechan(t *chantype, size int) *hchan {
elem := t.elem
...
// 检查确认 channel 的容量不会溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic("makechan: size out of range")
}
var c *hchan
switch {
case mem == 0:
// 队列或元素大小为零
c = (*hchan)(mallocgc(hchanSize, nil, true))
...
case elem.ptrdata == 0:
// 元素不包含指针
// 在一个调用中分配 hchan 和 buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
...
return c
}
创建 channel 的逻辑主要分为三大块:
-
当前 channel 不存在缓冲区或者元素大小为 0 的情况下,就会调用 mallocgc 方法分配一段连续的内存空间。
-
当前 channel 存储的类型不存指针时,就会分配一段大小为hchanSize+mem连续的内存空间。
-
包含指针的情况下,会为channel 和 底层数组分别分配内存空间。
向channel发送数据
发送数据完成的是如下的翻译过程:
ch <- v => chansend1(ch, v)
而本质上它会去调用更为通用的 chansend:
// go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true)
}
向nil channel发送数据
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 当向 nil channel 发送数据时,会调用 gopark
// 而 gopark 会将当前的 Goroutine 休眠,从而发生死锁崩溃
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan)
throw("unreachable")
}
...
}
在这个部分中,我们可以看到,如果一个 Channel 为零值(比如没有初始化),这时候的发送操作会暂止当前的 Goroutine(gopark)。 而 gopark 会将当前的 Goroutine 休眠,从而发生死锁崩溃。
向关闭的channel发送数据
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
...
lock(&c.lock)
// 持有锁之前我们已经检查了锁的状态,
// 但这个状态可能在持有锁之前、该检查之后发生变化,
// 因此还需要再检查一次 channel 的状态
if c.closed != 0 { // 不允许向已经 close 的 channel 发送数据
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
向关闭的channel发送数据会触发panic。
直接发送到阻塞的goroutine
// 1. channel 上有阻塞的接收方,直接发送
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) })
return true
}
发送时如果recvq有阻塞的goroutine,则直接将数据复制到goroutine的栈上。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
...
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf() // unlock(&c.lock)
gp.param = unsafe.Pointer(sg)
...
// 复始一个 Goroutine,放入调度队列等待被后续调度
goready(gp) // 将 gp 作为下一个立即被执行的 Goroutine
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
... // 为了确保发送的数据能够被立刻观察到,需要写屏障支持,执行写屏障,保证代码正确性
memmove(dst, src, t.size) // 直接写入接收方的执行栈!
}
sg.elem指向goroutine的栈空间,sendDirect函数直接将数据复制到接收方的栈上。goready作用是唤醒阻塞的goroutine。
发送到channel缓存中
// 2. 判断 channel 中缓存是否有剩余空间
if c.qcount < c.dataqsiz {
// 有剩余空间,存入 c.buf
qp := chanbuf(c, c.sendx)
...
typedmemmove(c.elemtype, qp, ep) // 将要发送的数据拷贝到 buf 中
c.sendx++
if c.sendx == c.dataqsiz { // 如果 sendx 索引越界则设为 0
c.sendx = 0
}
c.qcount++ // 完成存入,记录增加的数据,解锁
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
...
如果channel的buf中还有空间,那么会将数据发送到buf中。
阻塞添加到sendq
// 3. 阻塞在 channel 上,等待接收方接收数据
gp := getg()
mysg := acquireSudog()
...
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock)) // 将当前的 g 从调度队列移出
// 因为调度器在停止当前 g 的时候会记录运行现场,当恢复阻塞的发送操作时候,会从此处继续开始执行
...
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 { // 正常唤醒状态,Goroutine 应该包含需要传递的参数,
// 但如果没有唤醒时的参数,且 channel 没有被关闭,则为虚假唤醒
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
...
mysg.c = nil // 取消与之前阻塞的 channel 的关联
releaseSudog(mysg) // 从 sudog 中移除
return true
}
如果发生阻塞则将当前goroutine添加到阻塞队列中,并将挂起goroutine。当goroutine唤起后还要判断channel是否被关闭,如果关闭后则panic。
发送数据流程图
向一个channel中写数据简单过程如下:
-
如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
-
如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
-
如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;
简单流程图如下:
从channel接收数据
接收数据主要是完成以下翻译工作:
v <- ch => chanrecv1(ch, v)
v, ok <- ch => ok := chanrecv2(ch, v)
他们的本质都是调用 chanrecv:
// go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
接收nil channel
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// nil channel,同 send,会导致两个 Goroutine 的死锁
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan)
throw("unreachable")
}
// ...
接收 nil channel 会将当前G挂起,永远阻塞,造成死锁。
接收关闭的channel
lock(&c.lock)
// 1. channel 已经 close,且 channel 中没有数据,则直接返回
if c.closed != 0 && c.qcount == 0 {
...
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
接收关闭的channel,并且缓存里面没有数据,这时只会接收到零值。
接收发送队列里的数据
// 2. channel 上有阻塞的发送方,直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) })
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
...
if ep != nil {
// 直接从对方的栈进行拷贝
recvDirect(c.elemtype, sg, ep)
}
} else {
// 从缓存队列拷贝
qp := chanbuf(c, c.recvx)
...
// 从队列拷贝数据到接收方
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 从发送方拷贝数据到队列
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
...
goready(gp, skip+1)
}
-
如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
-
如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
接收buf里的数据
// 3. channel 的 buf 不空
if c.qcount > 0 {
// 直接从队列中接收
qp := chanbuf(c, c.recvx)
...
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
ep是接收数据的目标地址,如果ep不为nil说明没有忽略接收的数据,接收时需要将数据复制到目标地址。
没有数据挂起
// 4. 没有数据可以接收,阻塞当前 Goroutine
gp := getg()
mysg := acquireSudog()
...
c.recvq.enqueue(mysg) // 加入到recvq队列中
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive) // 挂起
唤起时逻辑
...
// 被唤醒
gp.waiting = nil
...
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
读取流程
从一个channel读数据简单过程如下:
-
如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
-
如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
-
如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
-
将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;
简单流程图如下:
关闭channel
关闭 Channel 主要是完成以下翻译工作:
close(ch) => closechan(ch)
具体的实现中:
-
首先对 Channel 上锁
-
而后依次将阻塞在 Channel 的 g 添加到一个 gList 中
-
当所有的 g 均从 Channel 上移除时
-
可释放锁,并唤醒 gList 中的所有接收方和发送方
加锁关闭
func closechan(c *hchan) {
if c == nil { // close 一个空的 channel 会 panic
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 { // close 一个已经关闭的的 channel 会 panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
c.closed = 1
释放recvq
var glist gList
// 释放所有的接收方
for {
sg := c.recvq.dequeue()
if sg == nil { // 队列已空
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem) // 清零
sg.elem = nil
}
...
gp := sg.g
gp.param = nil
...
glist.push(gp)
}
channel 关闭后所有的接受者都会收到相应的零值,并且被唤起。
释放sendq
// 释放所有的发送方
for {
sg := c.sendq.dequeue()
if sg == nil { // 队列已空
break
}
sg.elem = nil
...
gp := sg.g
gp.param = nil
...
glist.push(gp)
}
// 释放 channel 的锁
unlock(&c.lock)
唤起所有G
// 就绪所有的 G
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。
Channel 操作总结
总结一下操作 channel 的结果:
操作 | nil channel | closed channel | not nil, not closed channel |
---|---|---|---|
close | panic | panic | 正常关闭 |
读 <- ch | 阻塞 | 读到对应类型的零值 | 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞 |
写 ch <- | 阻塞 | panic | 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞 |
发生 panic 的三种情
-
向一个关闭的 channel 进行写操作;
-
关闭一个 nil 的 channel;
-
重复关闭一个 channel。
发生阻塞的三种情况
-
读一个 nil channel 都会被阻塞。
-
写一个 nil channel 都会被阻塞。
-
读写缓冲为空或者已经满了且没有挂起的接收者和发送者。
channel的应用
停止信号
channel 用于停止信号的场景还是挺多的,经常是关闭某个 channel 或者向 channel 发送一个元素,使得接收 channel 的那一方获知道此信息,进而做一些其他的操作。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}()
}
// the receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == MaxRandomNumber-1 {
// the receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
任务定时
与 timer 结合,一般有两种玩法:实现超时控制,实现定期执行某个任务。
有时候,需要执行某项操作,但又不想它耗费太长时间,上一个定时器就可以搞定:
select {
case <-time.After(100 * time.Millisecond):
case <-s.stopc:
return false
}
等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。
定时执行某个任务,也比较简单:
func worker() {
ticker := time.Tick(1 * time.Second)
for {
select {
case <- ticker:
// 执行定时任务
fmt.Println("执行 1s 定时任务")
}
}
}
每隔 1 秒种,执行一次定时任务。
解耦生产方和消费方
服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行:
func main() {
taskCh := make(chan int, 100)
go worker(taskCh)
// 塞任务
for i := 0; i < 10; i++ {
taskCh <- i
}
// 等待 1 小时
select {
case <-time.After(time.Hour):
}
}
func worker(taskCh <-chan int) {
const N = 5
// 启动 5 个工作协程
for i := 0; i < N; i++ {
go func(id int) {
for {
task := <- taskCh
fmt.Printf("finish task: %d by worker %d\n", task, id)
time.Sleep(time.Second)
}
}(i)
}
}
5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。
程序输出:
finish task: 1 by worker 4
finish task: 2 by worker 2
finish task: 4 by worker 3
finish task: 3 by worker 1
finish task: 0 by worker 0
finish task: 6 by worker 0
finish task: 8 by worker 3
finish task: 9 by worker 1
finish task: 7 by worker 4
finish task: 5 by worker 2
控制并发数
有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。
var limit = make(chan int, 3)
func main() {
// …………
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
// …………
}
构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。
参考
转载自:https://juejin.cn/post/7386514632726380578