Golang并发编码时绕不开的几大重要组件之——Goroutinue
并发指的是程序由若干个独立运行的代码组成,主要依赖于多核CPU的并行运算和调度能力。 Golang在并发方面的能力较为突出,通过Goroutinue实现了典型的协程的概念。Golang的并发理念是:通过通信来共享内存,而不是通过共享内存来通信。
goroutinue与传统线程的区别
主要体现在四个方面:
- 内存占用不同:Goroutinue的创建消耗2kb内存,并且在栈空间不足时会自动扩容;而线程默认会占用较大的栈空间(1-8MB),且栈空间大小不变,会有溢出风险
- 开销不同:goroutinue创建和销毁消耗都非常小,是用户态线程;而线程的创建和销毁都会有巨大的消耗,是内核级交互
- 调度切换不同:goroutinue切换消耗200ns,在1.4后优化至20ns;而线程切换消耗1000-15000纳秒
- 复杂性不同:goroutinue简单易用,M个线程托管N个goroutinue;线程创建和退出复杂,多个线程间通讯复杂,使用网络多路复用,应用服务线程门槛高
如果想实现一个并发程序,要考虑几个方面:
程序代码如何独立运行? 独立运行的代码如何进行通信? 如何做到数据同步、调度同步?
这就引出了Golang并发编程中的几个重要组件:Goroutinue、Channel、Context、Sync
Goroutinue
Golang中并发执行的单元称为Goroutinue,也就是Go协程
使用方法非常简单,使用go
关键字即可启动新的Goroutinue
示例代码:
func main() {
// 输出奇数
printOdd := func() {
for i := 1; i <= 10; i += 2 {
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
// 输出偶数
printEven := func() {
for i := 2; i <= 10; i += 2 {
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
go printOdd()
go printEven()
// 阻塞等待
time.Sleep(time.Second)
}
执行结果:
1 2 4 3 6 5 7 8 10 9
我们只需要一个go关键字就可以非常简便的启动一个Goroutinue协程。最后程序睡眠1秒,原因是主Goroutinue(main函数)需要等待内部Goroutinue运行结束才能结束,否则子Goroutinue程序可能执行一半会被强制停止。
调度的随机性 通过结果可以看到数字的输出顺序并不是按照一定顺序,因为Goroutinue的调度执行是随机的。
Goroutinue的并发规模
Goroutinue本身的数量是无上限的,但是一定会受到栈内存空间以及操作系统的资源限制,可以通过函数 runtime.NumGoroutine()
获取当前Goroutinue数量。前面也提到过,一个Goroutinue初始的栈内存只有2KB,用于保存Goroutinue中的执行数据,且栈内存可以扩容,按需增大或缩小,单个Goroutinue最大可以扩展到1GB。
上面通过time sleep的方法太傻了,我们可以通过官方提供的 sync.WaitGroup 来实现Goroutinue的协同调度。
sync.WaitGroup
sync.WaitGroup用于等待一组Goroutinue执行完毕,其实是一个计数器思想的实现方案,它的核心方法有三个:
- Add():调用此函数用于增加等待的Goroutinue数量,原子操作保证并发安全
- Done():调用此函数用于减去一个计数,原子操作保证并发安全
- Wait():调用此函数用于阻塞,直到所有的Goroutinue完成,也就是计数器归0时,才会解除阻塞状态
现在我们将上面的代码最后一行的 time.Sleep(time.Second)
去掉,再次执行会得到空的输出,原因就是主Goroutinue直接结束,两个子Goroutinue没来得及执行就已经退出了,让我们用 WaitGroup 来改造一下
示例代码:
func main() {
wg := sync.WaitGroup{}
// 输出奇数
printOdd := func() {
defer wg.Done()
for i := 1; i <= 10; i += 2 {
fmt.Printf("%d ", i)
time.Sleep(100 * time.Millisecond)
}
}
// 输出偶数
printEven := func() {
defer wg.Done()
for i := 2; i <= 10; i += 2 {
fmt.Printf("%d ", i)
time.Sleep(100 * time.Millisecond)
}
}
wg.Add(2)
go printOdd()
go printEven()
// 阻塞等待
fmt.Println("waiting...")
wg.Wait()
fmt.Println("\nfinish...")
}
执行结果:
waiting...
2 1 3 4 6 5 7 8 9 10
finish...
这个简单的例子可以比较直观的展示waitGroup的基础用法。waitGroup 适用于一个主Goroutinue需要等待其他Goroutinue全部运行结束后才结束的这种场景,不适用于主Goroutinue需要结束,而通知其他Goroutinue结束的情景。
在这里有个使用上的注意事项,那就是 waitGroup 不要复制使用,因为内部维护的计数器不能修改,否则会造成Goroutinue的泄露,在传值时需要用指针类型来进行传递。
waitGroup的内部结构
可以进入源码查看内部结构:
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers only guarantee that 64-bit fields are 32-bit aligned.
// For this reason on 32 bit architectures we need to check in state()
// if state1 is aligned or not, and dynamically "swap" the field order if
// needed.
state1 uint64
state2 uint32
}
可以看到并不是一个复杂的结构,其中含义:
- noCopy: 用于保证不会被复制
- state1: 以64bit计算机为例,高32bit是计数器
- state2: 以64bit计算机为例,低32bit是等待的Goroutinue
三大关键函数核心代码:
func (wg *WaitGroup) Add(delta int) {
...
state := atomic.AddUint64(statep, uint64(delta)<<32)
...
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
...
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
转载自:https://juejin.cn/post/7239256600750293052