likes
comments
collection
share

Golang并发编码时绕不开的几大重要组件之——Channel

作者站长头像
站长
· 阅读数 65

在上一篇文章中有介绍Golang实现并发的重要关键字 go,通过这个我们可以方便快速地启动Goroutinue协程。协程之间一定会有通信的需求,而Golang的核心设计思想为:不通过共享内存的方式进行通信,而应该通过通信来共享内存。与其他通过共享内存来进行数据传递的编程语言略有差异,而实现这一方案的正是 Channel。

Channel是一个提供可接收和发送特定类型值的用于并发函数通信的数据类型,满足FIFO(先进先出)原则的队列类型。FIFO在数据类型与操作上都有体现:

  1. Channel类型的元素是先进先出的,先发送到Channel的元素会先被接收
  2. 先向channel发送数据的Goroutinue会优先执行
  3. 先从channel接收数据的Goroutinue会优先执行

Channel使用

语法

channel是Golang中的一种数据类型,相关语法也非常简单

ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType 
  • chan为channel类型关键字
  • <- 操作符用于channel中数据的收发,在声明时用于表示channel数据流动的方向
    • chan 默认为双向传递,即channel既可以接收数据也可以发送数据
    • chan<- 仅可以发送数据的channel
    • <-chan 仅可以接受数据的channel
  • ElementType 代表元素类型,例如 int、string...

初始化

channel数据类型是一种引用类型,类似于map和slice,所以channel的初始化需要使用内建函数make():

make(ChannelType, Capacity)

ch := make(chan int) 
var ch = make(chan int) 
ch := make(chan int, 10) 
ch := make(<-chan int) 
ch := make(chan<- int, 10)
  • ChannelType就是前面介绍的类型
  • Capacity代表缓冲容量。省略时就是为默认0,表示无缓冲的Channel

如果不使用make()函数来初始化channel,则不能执行收发通信操作,并且会造成阻塞,进而造成Goroutinue泄露,示例:

func main() {
	defer func() {
		fmt.Println("goroutines: ", runtime.NumGoroutine())
	}()

	var ch chan int
	go func() {
		<-ch
	}()

	time.Sleep(time.Second)
}

代码执行结果为:

goroutines:  2

可以看到,直到程序退出,Goroutinue数量仍然为2,原因就是channel没有正确的使用make()进行初始化,channel变量实际为nil,进而造成了内存泄露。

数据的接收与发送

channel中数据的接收与发送是通过操作符 <- 来进行操作的:

// 接收数据
ch <- Expression
ch <- 111
ch <- struct{}{}

// 发送数据
<- ch
v := <- ch
f(<-ch)

除了操作符 <- 外,我们还可以使用 for range 持续地从channel中接收数据:

for e := range ch {
    // e逐个读取ch中的元素值
}

持续接收操作与 <- 没有很大区别:

  • 如果ch为nil channel就会阻塞
  • 如果ch没有发送元素也会阻塞

for 会持续读取直到channel执行关闭,关闭后for会将剩余元素全部读取之后结束。那么对已经关闭的channel进行数据的收发会怎样呢?

channel的关闭

channel使用过后要使用内置函数close()来关闭channel。关闭channel的意思是记录该Channel不能再被发送任何元素了,而不是销毁该Channel的意思。也就意味着关闭的Channel是可以继续接收值的

  • 如果向已经关闭的channel发送数据会引发panic
  • 关闭 nil channel 会引发panic
  • 关闭已经关闭的 channel 会引发panic
  • 如果读取已经关闭的channel值,可以接收关闭前发送的全部值;关闭前的值接收完会返回类型的零值和一个false,不会阻塞及panic

以上几种情况可以自己编写一个简单的代码来测试一下。

channel分类

前面有提到,在make一个channel时,第二个参数就代表了缓冲区大小,如果没有第二个参数就为默认的无缓冲channel。具体用法:

  • 缓冲Channel,make(chan T, cap),cap是大于0的值。
  • 无缓冲Channel, make(chan T), make(chan T, 0)

无缓冲channel

无缓冲的channel也称为同步Channel,只有当发送方和接收方都准备就绪时,通信才会成功。

同步操作示例:

func ChannelSync() {
// 初始化数据
ch := make(chan int)
wg := sync.WaitGroup{}

    // 间隔发送
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 5; i++ {
            ch <- i
            println("Send ", i, ".\tNow:", time.Now().Format("15:04:05.999999999"))
            // 间隔时间
            time.Sleep(1 * time.Second)
        }
        close(ch)
    }()

    // 间隔接收
    wg.Add(1)
    go func() {
        defer wg.Done()
        for v := range ch {
            println("Received ", v, ".\tNow:", time.Now().Format("15:04:05.999999999"))
            // 间隔时间,注意与send的间隔时间不同
            time.Sleep(3 * time.Second)
        }
    }()

    wg.Wait()

}

执行结果:

Send  0 .       Now: 17:54:27.772773
Received  0 .   Now: 17:54:27.772795
Received  1 .   Now: 17:54:30.773878
Send  1 .       Now: 17:54:30.773959
Received  2 .   Now: 17:54:33.775132
Send  2 .       Now: 17:54:33.775208
Received  3 .   Now: 17:54:36.775816
Send  3 .       Now: 17:54:36.775902
Received  4 .   Now: 17:54:39.776408
Send  4 .       Now: 17:54:39.776456

代码中,采用同步channel,使用两个goroutine完成发送和接收。每次发送和接收的时间间隔不同。我们分别打印发送和接收的值和时间。可以看到执行结果:发送和接收时间一致;间隔以长的为准,可见发送和接收操作为同步操作。因此,同步Channel适合在gotoutine间用做同步的信号

缓冲Channel

缓冲Channel也称为异步Channel,接收和发送方不用等待双方就绪即可成功。缓冲Channel会存在一个容量为cap的缓冲空间。当使用缓冲Channel通信时,接收和发送操作是在操作Channel的Buffer,是典型的队列操作:

  • 接收时,从缓冲中接收元素,只要缓冲不为空,不会阻塞。反之,缓冲为空,会阻塞,goroutine挂起
  • 发送时,向缓冲中发送元素,只要缓冲未满,不会阻塞。反之,缓冲满了,会阻塞,goroutine挂起

操作示例:

func main() {
	// 初始化数据
	ch := make(chan int, 5)
	wg := sync.WaitGroup{}

	// 间隔发送
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 5; i++ {
			ch <- i
			println("Send ", i, ".\tNow:", time.Now().Format("15:04:05.999999999"))
			// 间隔时间
			time.Sleep(1 * time.Second)
		}
	}()

	// 间隔接收
	wg.Add(1)
	go func() {
		defer wg.Done()
		for v := range ch {
			println("Received ", v, ".\tNow:", time.Now().Format("15:04:05.999999999"))
			// 间隔时间,注意与send的间隔时间不同
			time.Sleep(3 * time.Second)
		}
	}()

	wg.Wait()

}

执行结果:

Send  0 .       Now: 17:59:32.990698
Received  0 .   Now: 17:59:32.99071
Send  1 .       Now: 17:59:33.992127
Send  2 .       Now: 17:59:34.992832
Received  1 .   Now: 17:59:35.991488
Send  3 .       Now: 17:59:35.993155
Send  4 .       Now: 17:59:36.993445
Received  2 .   Now: 17:59:38.991663
Received  3 .   Now: 17:59:41.99184
Received  4 .   Now: 17:59:44.992214

代码中,与同步channel一致,只是采用了容量为5的缓冲channel,使用两个goroutine完成发送和接收。每次发送和接收的时间间隔不同。我们分别打印发送和接收的值和时间。可以看到执行结果:发送和接收时间不同;发送和接收操作不会阻塞,可见发送和接收操作为异步操作。因此,缓冲channel非常适合做goroutine之间的数据通信

Channel原理

源码

在源码包中的 runtime/chan.go 可以看到Channel实现源码:

type hchan struct {
    qcount   uint           // 元素个数,通过len()获取
    dataqsiz uint           // 缓冲队列的长度,即容量,通过cap()获取
    buf      unsafe.Pointer // 缓冲队列指针,无缓冲队列则为nil
    elemsize uint16         // 元素大小
    closed   uint32         // 关闭标志
    elemtype \*\_type // 元素类型
    sendx    uint   // 发送元素索引
    recvx    uint   // 接收元素索引
    recvq    waitq  // 接收Goroutinue队列
    sendq    waitq  // 发送Goroutinue队列

        // lock protects all fields in hchan, as well as several
        // fields in sudogs blocked on this channel.
        //
        // Do not change another G's status while holding this lock
        // (in particular, do not ready a G), as this can deadlock
        // with stack shrinking.
    lock mutex // 锁

}
  • buf 可以理解为一个环形数组,用来缓存Channel中的元素。为何使用环形数组而不使用普通数组呢?因为普通数组更适合指定的空间,弹出元素时,普通数组需要全部都前移,而使用环形数组+下标索引的方式可以在不移动元素的情况下实现数据的高效读写。
  • sendx与recvx 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置。
  • recvq与sendq 用于记录等待接收和发送的goroutine队列,当基于某channel的接收或发送的goroutine无法理解执行时,也就是需要阻塞时,会被记录到Channel的等待队列中。当channel可以完成相应的接收或发送操作时,从等待队列中唤醒goroutine进行操作。
    • 等待队列实际是一个双向链表结构

生命周期

创建策略

  • 无缓冲的直接分配内存
  • 有缓冲的不包含指针,为hchan和底层数组分配连续的地址
  • 有缓冲的channel且包含元素指针,会为hchan和底层数组分配地址

发送策略

  • 发送操作编译时转换为 runtime.chansend函数
  • 阻塞式:block=true;非阻塞式:block=false
  • 向channel中发送数据分为检查和数据发送两块,数据发送:
    • 如果channel的读等待队列存在接受者goroutinue
      • 将数据直接发送给第一个等待的goroutinue,唤醒接收的goroutinue
    • 如果channel读等待队列不存在接收者goroutinue
      • 如果循环数组buf未满,则将数据发送到循环数组buf的队尾
      • 如果循环数组buf已满,这时就会走阻塞发送的流程,将当前goroutinue加入写等待队列,并挂起等待唤醒

接收策略

  • 接收操作编译是转换为 runtime.chanrecv 函数
  • 阻塞式:block=true;非阻塞式:block=false
  • 向channel中接收数据数据接收:
    • 如果channel的写等待队列存在发送者goroutinue:
      • 如果是无缓冲channel,直接从第一个发送者goroutinue将数据拷贝给接收变量,唤醒发送的goroutinue
      • 如果是有缓冲channel(已满),将循环数组buf的队首元素拷贝给接收变量,将第一个发送者goroutinue的数据拷贝到buf循环数组,唤醒发送的goroutinue
    • 如果channel的写等待不存在发送者goroutinue
      • 如果循环数组buf非空,将循环数组buf的队首元素拷贝给接收变量
      • 如果循环数组buf为空,这个时候就会走阻塞接收的流程,将当前 goroutine 加入读等待队列,并挂起等待唤醒

关闭

  • 调用 runtime.closechan 函数

简单的对Channel一些基础用法及原理做了一个解释,可以多写一写并发代码以及阅读源码来加深对Channel的理解。

转载自:https://juejin.cn/post/7239996872005763127
评论
请登录