likes
comments
collection
share

Go-Channel的使用和底层原理(上)

作者站长头像
站长
· 阅读数 19

前言

channel是Go语言的核心类型,可以理解为管道,通过channel并发核心单元就可以发送或者接收数据进行通讯。Go的Goroutine是实际并发执行的实体,Goroutine通过channel来实现通信。通道的特性像队列,遵循先进先出(FIFO)规则,保证收发数据的顺序。

写之前在列举关于channel的章节时,发现一篇文章出来的话会篇幅过长,所以打算分为上下两篇进行整理,上篇主要是一些使用和底层结构以及channel的创建,下篇主要在收发以及关闭channel的代码和逻辑上面。

1:声明和初始化

通道的声明如下:

var 通道变量 chan 通道类型
var ch1 chan int

声明后的通道类型是空值nil,给一个nil channel发送数据,造成永远阻塞、从一个nil channel接收数据,造成永远阻塞,就像 map 和 slice 数据类型一样, channel必须进行make后才能使用

//通道实例 = make(chan 数据类型, 容量)
ch1 := make(chan int, 10)

容量代表Channel容纳的最多的元素的数量,代表Channel的缓存的大小,如果没有设置容量,或者容量设置为0, 说明Channel没有缓存,在后面的channel的类型中会说到不同类型channel的区别。

2:基本操作

channel的操作符是<-,箭头的方向就是数据流向

// 发送值v到Channel ch1中
ch1 <- v    

// 从Channel ch1中接收数据,并将数据赋值给v
v := <-ch1 
 
// 关闭channel
close(ch1)

3:channel的类型

3.1:无缓冲channel

无缓冲channel表示发送者必须等待数据被接收者接收才会继续发送到channel中

// make(chan int) 创建的就是无缓冲通道
func main() {
 ch := make(chan int)
 ch <- 1
}
//上面的代码运行会报错:fatal error: all goroutines are asleep - deadlock!

deadlock表示程序中的 goroutine 都被挂起导致程序死锁了,无缓冲通道必须至少有一个接收方才能发送成功,同理至少有一个发送放才能接收成功,可以将上面代码稍加改造就可以解决这个问题了,如下:

// 解决无缓冲通道死锁问题
// 这里也是常考题,不懂channel的特新很容易出错
func recv(c chan int) {
 ret := <-c
}

func main() {
 ch := make(chan int)
 // 创建一个 goroutine 从通道接收值
 go recv(ch) 
 ch <- 10
}

3.2:有缓冲channel

有缓冲通道顾名思义,就是有缓冲区接收发送者的数据,除非缓冲区已满,继续往缓冲区发送才会阻塞

 func main() {
 // 创建一个容量为3的有缓冲channel
 ch := make(chan int, 2) 
 // 此时有缓冲channel容量为3,当前元素个数为1,可用存放元素个数是2
 ch <- 1
}

3.3:单向通道

// 只能接收(只能往外边取,只读)
<- chan int 
// 只能发送(只能往里写,只写)
chan <- int 

4:底层数据结构hchan

我一开始在看关于channel的时候总是不知道它的底层数据结构是什么样,在哪里。后面看了Go的编译链接过程才知道,make是一个内置函数,在编译的时候会将一些关键字和内建函数转换成函数调用,比如make(chan int)会转为 makechan64或者makechan(在src/runtime/chan.go文件中)【makechan64()函数是处理缓冲区大小大于 2 的 32 次方的情况】

type hchan struct {
    qcount   uint           // 当前队列中剩余元素个数
    dataqsiz uint           // 环形队列长度,即可以存放的元素个数
    buf      unsafe.Pointer // 环形队列指针
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识关闭状态
    elemtype *_type         // 元素类型
    sendx    uint           // 队列下标,指示元素写入时存放到队列中的位置
    recvx    uint           // 队列下标,指示元素从队列的该位置读出
    recvq    waitq          // 等待读消息的goroutine队列,即等待接收队列
    sendq    waitq          // 等待写消息的goroutine队列,即等待发送队列
    lock     mutex          // 互斥锁,chan不允许并发读写
}

并且 makechan是返回一个channel的指针 *chan,这就是为什么channel是指针类型

func makechan64(t *chantype, size int64) *hchan {
 if int64(int(size)) != size {
  panic(plainError("makechan: size out of range"))
 }

 return makechan(t, int(size)) //调用的也是makechan
}

func makechan(t *chantype, size int) *hchan {
 ...
}

4.1:图解hchan

怎么理解上面这个结构体呢,画个图来看看吧,理解下hchan结构体在channel初始化时字段的意思,如何组成一个环形队列的。

Go-Channel的使用和底层原理(上)

通过图进一步说明:

  1. qcount = 4表示channel中队列剩余4个元素
  2. datasiz = 8表示channel的容量是8,可以存放8个元素
  3. buf 是指向队列的内存 closed: 用来标识channel的状态,0:关闭,!0 标识已关闭,如果关闭,那就不能发送数据

4.2:什么是 recvq 和 sendq

为什么会出现等待读消息的 recvq 队列和等待写消息的 sendq队列呢?通过源码我们知道这两个元素都是双向链表 waitq,链表中的元素都是sudog结构

type waitq struct {
 first *sudog
 last  *sudog
}

1:向channel发送数据的时候,如果缓冲区为满了,并且没有任何接收者等待,当前goroutine会被阻塞,被阻塞的goroutine会被挂起到 hchan的 sendq,等待从channel读数据的goroutine唤醒

2:从channel的缓冲队列读数据时,如果缓冲队列为空,当前goroutine会被阻塞,被阻塞的goroutine会被挂起到 hchan的 recvq,等待向channel写数据的 goroutine 唤醒

这样写可能理解起来思路不够清晰,在后面的channel的读写原理中将结合代码将具体的读写流程解释清楚

5:channel是怎么创建的

Go在编译的时候,会将一些关键字和内建函数转换成函数调用,channel的创建是调用了makechan 函数,最终返回一个hchan指针类型的对象,方法在

src/runtime/chan.go。

func makechan(t *chantype, size int) *hchan {
 elem := t.elem

 // 检查hchan的size大小
 if elem.size >= 1<<16 {
  throw("makechan: invalid channel element type")
 }
 //检查对齐
 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  throw("makechan: bad alignment")
 }

 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
 if overflow || mem > maxAlloc-hchanSize || size < 0 {
  panic(plainError("makechan: size out of range"))
 }
 var c *hchan
 switch {
 case mem == 0:
  //  队列或者元素大小为 zero 时,
  c = (*hchan)(mallocgc(hchanSize, nil, true))
  // 用这个地址进行同步操作
  c.buf = c.raceaddr()
 case elem.ptrdata == 0:
  // hchan后面在内存里紧跟着就是buf环形队列
  c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
  c.buf = add(unsafe.Pointer(c), hchanSize)
 default:
  // 元素包含指针,分配环形队列地址给 hchan.buf
  c = new(hchan)
  c.buf = mallocgc(mem, elem, true)
 }
 // 设置 hchan的元素个数、类型、容量
 c.elemsize = uint16(elem.size)
 c.elemtype = elem
 c.dataqsiz = uint(size)
 lockInit(&c.lock, lockRankHchan)

 if debugChan {
  print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
 }
 return c
}

总结

上篇就介绍这些,通过对channel的使用和有缓冲和无缓冲的接收,大家对channel的使用有比较清楚的印象,下篇文章将会更加精彩,对收发流程和代码实现进行更深层的探讨