likes
comments
collection
share

Go 同步原语 WaitGroup 详解

作者站长头像
站长
· 阅读数 16

WaitGroup 概述

WaitGroup在go语言中,用于线程同步,单从字面意思理解,wait等待的意思,group组、团队的意思,WaitGroup就是指等待一组,等待一个系列执行完成后才会继续向下执行。

WatiGroupsync包中的一个struct类型,用来收集需要等待执行完成的goroutine。下面是它的定义:

// WaitGroup用于等待一组线程的结束。 
// 父线程调用Add方法来设定应等待的线程的数量。 
// 每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。 
type WaitGroup struct { 
// 包含隐藏或非导出字段 
} 
// Add方法向内部计数加上delta,delta可以是负数; 
// 如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。 
// 注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。 
// 一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。 
func (wg *WaitGroup) Add(delta int) 

// Done方法减少WaitGroup计数器的值,应在线程的最后执行。 
func (wg *WaitGroup) Done() 

// Wait方法阻塞直到WaitGroup计数器减为0。 
func (wg *WaitGroup) Wait()

sync.WaitGroup3 个方法

  • Add():每次激活想要被等待完成的goroutine之前,先调用Add(),用来设置或添加要等待完成的goroutine数量

例如Add(2) 或者两次调用Add(1) 都会设置等待计数器的值为2,表示要等待2个goroutine完成

  • Done():每次需要等待的goroutine在真正完成之前,应该调用该方法来人为表示goroutine完成了,该方法会对等待计数器减1

  • Wait():在等待计数器减为0之前,Wait() 会一直阻塞当前的goroutine

综上所述:

Add() 用来增加要等待的goroutine的数量

Done() 用来表示goroutine已经完成了,减少一次计数器

Wait() 用来等待所有需要等待的goroutine完成。

使用示例

一个常见的使用场景是:批量发出 RPC 或者 HTTP 请求:

requests := []*Request{...}
wg := sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        // res, err := service.call(r)
    }(request)
}
wg.Wait()

我们可以通过 sync.WaitGroup 将原本顺序执行的代码在多个Goroutine中并发,如下图:

Go 同步原语 WaitGroup 详解

结构体

sync.WaitGroup 结构体中只包含两个成员:

type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}
  • noCopy —— 保证 sync.WaitGroup 不会被开发者通过再赋值的方式赋值

  • state1 —— 存储状态和信号量

sync.noCopy 是一个特殊的私有结构体,源码包中的分析器会在编译期间检查被复制的变量中是否含有 sync.noCopy 或者实现了 LockUnlock 方法,如果包含有该结构体或者实现了对应的方法,就会抛出错误:

func main(){
    wg := sync.WaitGorup{}
    yawg := wg
    fmt.Println(wg, yawg)
}tGorp

$ go vet proc.go
./proc.go:10:10: assignment copies lock value to yawg: sync.WaitGroup

除了 sync.noCopysync.WaitGroup 结构体中还包含一个总共占用 12 字节的数组,该数组会存储当前结构体的状态,在 64 位与 32 位机器中表现也不同,如下图:

Go 同步原语 WaitGroup 详解

sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能够帮我们从 state1 字段中取出对应的状态和信号量。

接口

sync.WaitGroup 对外总共暴露3个方法:

  • sync.WaitGroup.Add
  • sync.WaitGroup.Wait
  • sync.WaitGroup.Done

sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法中传入了 -1,因此咱们主要分析另外两个方法:sync.WaitGroup.Addsync.WaitGroup.Wait

func (wg *sync.WaitGroup) Add (delta int){
    statep, semap := wg.state()
    state := atomic.AddUint64(statep, uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if V > 0 || w == 0 {
        return
    }
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的计数器 counter

虽然 sync.WaitGroup.Add 方法传入的参数可以为负数,但是计数器只能是非负数,一旦出现负数程序就会崩溃。

当调用的计数器归零,即所有任务都执行完成时,才会通过 runtime_Semrelease 唤醒处于等待状态的 Goroutine

sync.WaitGroup 的另一个方法 sync.WaitGroup.Wait 会在计数器大于0 并且不存在等待的Goroutine时,调用runtime.sync_runtime_Semacquire 陷入睡眠状态:

func (wg *sync.WaitGroup) Wait () {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        if v == 0 {
            return
        }
        if atomic.CompareAndSwapUint64(statep, state, state + 1) {
            runtime_Semacquire(semap)
            if +statep != 0 {
                panic("sync: WaitGroup is resused before previous Wait has returned")
            }
            return
        }
    }
}

sync.WaitGroup的计数器归零时,陷入睡眠状态的Goroutine会被唤醒,sync.WaitGroup.Wait方法也会立刻放回。

小结

通过 sync.WaitGroup 的分析和研究,可以得出以下结论:

  • sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能重新使用
  • sync.WaitGroup.Done 只是对 sync.WaitGroup.Add 方法的简单封装,我们可以通过 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负),快速将计数器归零以唤醒等待的Goroutine
  • 可以同时有多个Goroutine等待当前的sync.WaitGroup计数器归零,这些Goroutine会被同时唤醒