likes
comments
collection
share

并发编程(二) - Mutex 互斥锁

作者站长头像
站长
· 阅读数 45

在上篇文档中介绍了原子操作相关的包 sync/atomic。然而,atomic 仅支持单个变量的原子操作「Load, Store, Swap, CompareAndSwap」。如果需要一块程序片段在任意时刻最多只能有一个 CPU 执行,需要怎么做呢?我们将这块程序片段叫做临界区。也就是说,临界区内的代码在同一时间只能被一个 CPU 执行,且不会被打断。

什么是 Mutex

Mutex(Mutual exclusion) 又叫互斥锁,用于保护临界区,防止多个 goroutine 同时访问临界区。仍然以计数器为例,现在使用 Mutex 创建一个计数器。其中 count 就是临界区,使用 Mutex 的 Lock() 和 Unloack() 方法 保证了任意时刻至多只有一个 goroutine 执行 c.count++ 操作,从而保证了计数的准确性。WaitGroup 后续会详细介绍,这里可以先忽略。

type Counter struct {
    m     sync.Mutex
    count int
}

func (c *Counter) Incr() {
    c.m.Lock()
    c.count++
    c.m.Unlock()
}

func main() {
    c := Counter{}
    wg := &sync.WaitGroup{}
    for i := 0; i < 1000; i++ {
       wg.Add(1)
       go func() {
          defer wg.Done()
          c.Incr()
       }()
    }
    // 等待所有 goroutine 执行完毕
    wg.Wait()
    fmt.Println(c.count)
}

在 Mutex 中,如果临界区正在被访问,则后续的请求会被阻塞,直到资源被释放,获得锁。

func main() {
    var m sync.Mutex
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
       wg.Add(1)
       go func(i int) {
          // 后续的 goroutine 会一直阻塞
          m.Lock()
          time.Sleep(time.Second)
          fmt.Printf("%d get lock\n", i)
          m.Unlock()
          wg.Done()
       }(i)
    }
    wg.Wait()
}

对于有些场景,如果临界区正在被访问,则放弃访问临界区,转而执行其他操作。这时就可以使用 mutex 提供的 TryLock() 方法,如果临界区正在被访问则返回 false。

func main() {
    var m sync.Mutex
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          // 拿不到锁直接执行 else
          if m.TryLock() {
             fmt.Printf("%d get lock\n", i)
             time.Sleep(time.Second * 3)
             m.Unlock()
          } else {
             fmt.Printf("%d not get lock\n", i)
          }
       }(i)
    }
    wg.Wait()
}

Mutex 实现

上节说到,Lock() 方法会一直阻塞,直到上一个 goroutine 释放资源。如果有多个 goroutine 同时等待释放临界区资源,那谁会先获得资源?goroutine 阻塞期间是否会持续占用 cpu 资源?这一切都需要看 Mutex 的具体实现。

整体来说,go 中的 mutex 的演进共经历了四个阶段。

并发编程(二) - Mutex 互斥锁

初版

在第一版实现中,Mutex 共有两个字段组成。key: 标识锁是否被持有,值 0 表示锁处于空闲状态,值 1 表示锁被持有且没有等待者,值 n「n>1」表示有 n-1 个 goroutine 等待获取锁。sema 等待者队列使用的信号量,用于阻塞和唤醒 goroutine,后面会详细介绍信号量。

Lock():获取锁,执行原子操作「+1」;如果原子操作返回的结果为 1 则抢锁成功;如果原子操作返回的结果不为 1 则表示已经有其他 goroutine 抢锁成功,则调用 semacquire() 让当前的 goroutine 进入等待状态。

Unlock(): 释放锁,执行原子操作「-1」;如果原子操作返回结果为 0 则没有其他等待者,直接返回;否则即有其他 goroutine 在阻塞等待锁释放,调用 semrelease() 唤醒阻塞的 goroutine。

   // CAS操作,当时还没有抽象出atomic包
    func cas(val *int32, old, new int32) bool
    func semacquire(*int32)
    func semrelease(*int32)
    // 互斥锁的结构,包含两个字段
    type Mutex struct {
        key  int32 // 锁是否被持有的标识
        sema int32 // 信号量专用,用以阻塞/唤醒goroutine
    }
    
    // 保证成功在val上增加delta的值
    func xadd(val *int32, delta int32) (new int32) {
        for {
            v := *val
            if cas(val, v, v+delta) {
                return v + delta
            }
        }
        panic("unreached")
    }
    
    // 请求锁
    func (m *Mutex) Lock() {
        if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
            return
        }
        semacquire(&m.sema) // 否则阻塞等待,当信号量大于 0 时,信号量 -1, return
    }
    
    func (m *Mutex) Unlock() {
        if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
            return
        }
        semrelease(&m.sema) // 唤醒其它阻塞的goroutine,信号量 +1
    }    

给新人机会

在初版 Mutex 实现中,如果锁已经被持有,后续的 Lock 操作将会让 goroutine 阻塞。goroutine 的阻塞和唤醒是利用信号量来实现,采用的是先进先出的机制。也就是先被阻塞 goroutine 最先被唤醒。然而,在高并发场景,将阻塞的 goroutine 唤醒涉及一定的资源开销。那有没有方法让正在运行的 goroutine 优先获取锁,从而避免将正在运行的 gorotine 切换到阻塞状态造成额外开销。

Go 开发者在 2011 年 6 月 30 日的 commit 中对 Mutex 做了一次大的调整,调整后的 Mutex 实现如下:

   type Mutex struct {
        state int32
        sema  uint32
    }


    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexWaiterShift = iota
    )

在这个版本中,state 一个字段表示了三个数据,第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的 goroutine,剩余的位数代表的是等待此锁的 goroutine 数。

并发编程(二) - Mutex 互斥锁

下面看一下 Lock 方法。如果 state 为零,表示锁处于空闲状态,通过 CAS 操作设置为持有锁,成功直接返回。如果锁已经被持有,在需要通过 for 循环不断检查是否可以获取锁。和初版的区别在于,初版直接通过 semacquire 进入阻塞等待。在这个版本中,新创建的 goroutine 和被唤醒的 goroutine 同时抢锁。从而有一定的几率让正在运行的 goroutine 直接获取到锁,而不用进入阻塞等待。

   func (m *Mutex) Lock() {
        // Fast path: 幸运case,能够直接获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }
        // 新建的 goroutine 抢锁
        awoke := false
        for { // 新建的 goroutine 和唤醒的 goroutine 同时抢锁
            old := m.state // 获取 state
            new := old | mutexLocked // 新状态加锁
            if old&mutexLocked != 0 {// 旧状态为锁已经被持有
                new = old + 1<<mutexWaiterShift //等待者数量加一
            }
            if awoke {
                // goroutine是被唤醒的,
                // 新状态清除唤醒标志
                new &^= mutexWoken
            }
            // 原子操作,保证在此次操作之间没有其他 goroutine 更改状态
            if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
                if old&mutexLocked == 0 { // 锁原状态未加锁,抢锁成功
                    break
                }
                runtime.Semacquire(&m.sema) // 请求信号量,进入阻塞等待
                awoke = true
            }
        }
    }

看一下 Unlock 方法。通过 -1 来去掉锁标志,这里是因为 state 的其他位置的数据是不确定的,所以只能通过 -1 来去掉锁标记。如果本来就没有加锁,调用 Unlock 会直接 panic。除了将锁标记位置位 0 之外,还需要唤醒阻塞的 goroutine 并设置唤醒标记位。

   func (m *Mutex) Unlock() {
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
        if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
            panic("sync: unlock of unlocked mutex")
        }
    
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime.Semrelease(&m.sema)
                return
            }
            old = m.state
        }
    }

多给些机会

在 2015 年 2 月的改动中,如果新来的 goroutine 或者是被唤醒的 goroutine 首次获取不到锁,它们就会通过自旋(spin,通过循环不断尝试,spin 的逻辑是在runtime 实现的)的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑。

因为临界区的代码耗时很短,锁很快就能释放,而抢夺锁的 goroutine 不用通过休眠唤醒方式等待调度,直接 spin 几次,可能就获得了锁。

   func (m *Mutex) Lock() {
        // Fast path: 幸运之路,正好获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        iter := 0
        for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
            old := m.state // 先保存当前锁的状态
            new := old | mutexLocked // 新状态设置加锁标志
            if old&mutexLocked != 0 { // 锁还没被释放
                if runtime_canSpin(iter) { // 还可以自旋
                    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                        awoke = true
                    }
                    runtime_doSpin()
                    iter++
                    continue // 自旋,再次尝试请求锁
                }
                new = old + 1<<mutexWaiterShift
            }
            if awoke { // 唤醒状态
                if new&mutexWoken == 0 {
                    panic("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
                    break
                }
                runtime_Semacquire(&m.sema) // 阻塞等待
                awoke = true // 被唤醒
                iter = 0
            }
        }
    }

解决饥饿

在 go 的 Mutex 设计中,既要考虑锁的性能,又要考虑锁的公平性。在上面的版本中,可能会导致阻塞队列中的 goroutine 一致获取不到锁,也就是饥饿问题。

2016 年 Go 1.9 中 Mutex 增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在 1 毫秒,并且修复了一个大 Bug:总是把唤醒的 goroutine 放在等待队列的尾部,会导致更加不公平的等待时间。这里就不对源码进行分析了。

   type Mutex struct {
        state int32
        sema  uint32
    }
    
    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexStarving // 从state字段中分出一个饥饿标记
        mutexWaiterShift = iota
    
        starvationThresholdNs = 1e6
    )
    
    func (m *Mutex) Lock() {
        // Fast path: 幸运之路,一下就获取到了锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }
        // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
        m.lockSlow()
    }
    
    func (m *Mutex) lockSlow() {
        var waitStartTime int64
        starving := false // 此goroutine的饥饿标记
        awoke := false // 唤醒标记
        iter := 0 // 自旋次数
        old := m.state // 当前的锁的状态
        for {
            // 锁是非饥饿状态,锁还没被释放,尝试自旋
            if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
                continue
            }
            new := old
            if old&mutexStarving == 0 {
                new |= mutexLocked // 非饥饿状态,加锁
            }
            if old&(mutexLocked|mutexStarving) != 0 {
                new += 1 << mutexWaiterShift // waiter数量加1
            }
            if starving && old&mutexLocked != 0 {
                new |= mutexStarving // 设置饥饿状态
            }
            if awoke {
                if new&mutexWoken == 0 {
                    throw("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            // 成功设置新状态
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
                if old&(mutexLocked|mutexStarving) == 0 {
                    break // locked the mutex with CAS
                }
                // 处理饥饿状态

                // 如果以前就在队列里面,加入到队列头
                queueLifo := waitStartTime != 0
                if waitStartTime == 0 {
                    waitStartTime = runtime_nanotime()
                }
                // 阻塞等待
                runtime_SemacquireMutex(&m.sema, queueLifo, 1)
                // 唤醒之后检查锁是否应该处于饥饿状态
                starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                old = m.state
                // 如果锁已经处于饥饿状态,直接抢到锁,返回
                if old&mutexStarving != 0 {
                    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                        throw("sync: inconsistent mutex state")
                    }
                    // 有点绕,加锁并且将waiter数减1
                    delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    if !starving || old>>mutexWaiterShift == 1 {
                        delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
                    }
                    atomic.AddInt32(&m.state, delta)
                    break
                }
                awoke = true
                iter = 0
            } else {
                old = m.state
            }
        }
    }
    
    func (m *Mutex) Unlock() {
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {
            m.unlockSlow(new)
        }
    }
    
    func (m *Mutex) unlockSlow(new int32) {
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
        if new&mutexStarving == 0 {
            old := new
            for {
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false, 1)
                    return
                }
                old = m.state
            }
        } else {
            runtime_Semrelease(&m.sema, true, 1)
        }
    }

总结

可以看到,为了同时保证锁的性能和公平性。Mutex 一共经历了四次大的变更。第一次是最简单的实现,goroutine 按照队列的形式抢锁,如果不能获取到锁则需要进入阻塞队列。由于正在运行的 goroutine 有更多的本地缓存,执行速度快。因此,在第二次版本中让新创建的 goroutine 和唤醒的 goroutine 同时抢锁。在第三次变更中,引入自旋操作,让正在运行的 goroutine 有更多的机会获取锁。然后,正在运行的 goroutine 获取锁的几率越大,意味着阻塞的 goroutine 会一致在等待。为了保证锁的公平性,对于等待时长大于 1ms,优先让等待者获取锁。

注意事项

Mutex 对使用者屏蔽了复杂的实现细节,只暴露了 Lock() 和 Unlcok() 方法。然后,在具体使用的过程中也需要注意以下问题。

Lock / Unlcok 不是成对出现

最常见的问题就是 Lock 和 Unlock 没有成对的出现。这就会导致互斥锁无法释放/重复释放导致程序异常。例如下面的例子,忘记写 Unlock,程序运行时会检测出死锁。

func main() {
    var sum = 0
    var m sync.Mutex
    var wg sync.WaitGroup
    for i := 0; i < 2; i++ {
       wg.Add(1)
       go func() {
          defer wg.Done()
          m.Lock()
          sum++
          //m.Unlock()
}()
    }

    wg.Wait()
    fmt.Println(sum)
}

//fatal error: all goroutines are asleep - deadlock!

//goroutine 1 [semacquire]:
//sync.runtime_Semacquire(0x140000021a0?)
//        /Users/bytedance/go/go1.21.4/src/runtime/sema.go:62 +0x2c
//sync.(*WaitGroup).Wait(0x1400009c030)
//        /Users/bytedance/go/go1.21.4/src/sync/waitgroup.go:116 +0x74
//main.main()
//        /Users/bytedance/Desktop/code/tetris/demo/c1/d.go:33 +0xd8

//goroutine 19 [sync.Mutex.Lock]:
//sync.runtime_SemacquireMutex(0x0?, 0x0?, 0x0?)
//        /Users/bytedance/go/go1.21.4/src/runtime/sema.go:77 +0x28
//sync.(*Mutex).lockSlow(0x1400009c020)
//        /Users/bytedance/go/go1.21.4/src/sync/mutex.go:171 +0x174
//sync.(*Mutex).Lock(...)
//        /Users/bytedance/go/go1.21.4/src/sync/mutex.go:90
//main.main.func1()
//        /Users/bytedance/Desktop/code/tetris/demo/c1/d.go:27 +0xac
//created by main.main in goroutine 1
//        /Users/bytedance/Desktop/code/tetris/demo/c1/d.go:25 +0x60
//exit status 2

如果重复进行 Unlock 则直接 panic,在分析源码的时候就已经看到了 panic 代码。

func main() {
    var sum = 0
    var m sync.Mutex
    var wg sync.WaitGroup
    for i := 0; i < 2; i++ {
       wg.Add(1)
       go func() {
          defer wg.Done()
          m.Lock()
          sum++
          m.Unlock()
          // fatal error: sync: unlock of unlocked mutex
m.Unlock()
       }()
    }

    wg.Wait()
    fmt.Println(sum)
}

Copy 已经使用 Mutex

通过 Mutex 的源码分析得知,互斥锁的实现是依赖 Mutex 变量的内部状态「state 和 sema」控制的。这就表示在使用 Mutex 时,在多个 goroutine 中只能使用同一个 Mutex 变量,而不能使用变量的值拷贝。

在下面的例子中,在对 sum++ 加锁时使用的是 Mutex 变量的值 copy 而导致锁逻辑失败。

func main() {
    var sum = 0
    var m sync.Mutex
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
       wg.Add(1)
       go func(m sync.Mutex) {
          defer wg.Done()
          m.Lock()
          sum++
          m.Unlock()
       }(m)
    }

    wg.Wait()
    fmt.Println(sum)
}

那么最直接的修复方法就是传递一个指针,不同的 goroutine 使用是同一个 Mutex。或者使用闭包的形式,直接访问匿名函数的外部变量 m。

func main() {
    var sum = 0
    var m sync.Mutex
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
       wg.Add(1)
       go func(m *sync.Mutex) {
          defer wg.Done()
          m.Lock()
          sum++
          m.Unlock()
       }(&m)
    }

    wg.Wait()
    fmt.Println(sum)
}

此外,go 提供了 vet 工具,可以帮助我们检查程序中 copy Mutex 的问题。运行 go vet d.go,会直接告诉我们函数在传递 Mutex 时,使用了一个值,而不是指针。

# command-line-arguments
./d.go:24:5: call of func(m sync.Mutex) {
        defer wg.Done()
        m.Lock()
        sum++
        m.Unlock()
} copies lock value: sync.Mutex
./d.go:19:13: func passes lock by value: sync.Mutex

go vet 是 Go 语言的一个静态分析工具,它可以检查 Go 语言源码中可能存在的错误,这些错误通常是编译器无法检查出来的。不仅可以检查语法错误,更重要的是可以检查出一些逻辑错误,比如常见的变量未使用、函数调用参数个数及类型错误、格式化字符串与参数不匹配、可能的空指针解引用等问题。可以通过 go vet ./... 命令运行 vet 检查当前目录及其子目录下的所有 Go 文件。

不可重入

可重入锁,又称为递归锁,是指在同一个线程内,外层函数获得锁之后,内层递归函数仍然有获取该锁的权限,这避免了递归调用时的死锁现象。

在一些多线程的编程环境中,可重入锁是必要的,因为它可以避免一个线程在获取了一个已经由它自己持有的锁时产生死锁。例如,一个线程在调用某个需要获取锁的函数后,由于某些原因(如满足某个条件或者发生了异常等),需要再次调用这个函数,如果这个锁是不可重入的,那么这个线程就会阻塞,等待它自己释放这个锁,这显然是不可能的,结果就会导致死锁。

而可重入锁在这种情况下就可以避免死锁,因为它允许一个线程多次获取同一把锁。不过,每获取一次,锁的计数器就会增加一次,只有当计数器的值降为0时,其他线程才能获取该锁。

需要注意的是,虽然可重入锁解决了上述问题,但是也带来了一些新的问题,比如可能会导致一个线程长时间持有锁,从而降低系统的并发性能。

毫无疑问,Mutex 是不可重入锁。它本身并没有存储 goroutine 的任何信息,更别说支持可重入了。

功能扩展

原生的 Mutex 体现了 go 的设计哲学:大道至简。只提供 Lock 和 Unlock 方法,极大的降低了使用者的学习成本。对于一些否则的功能则需要开发者自己去实现。这里将实现可重入锁和并发安全的队列,来巩固学习 Mutex。

可重入锁

可重入锁实现的关键是在 Mutex 中保存 goroutine 信息,用于区分不同的 goroutine,从而实现可重入的能力。在具体实现时,有两种方案,一种是通过 hack 的方式获取 goroutine id;另一种就是调用 Lock 和 Unlock 显性的传入一个 token,用于标识不同的 goroutine。这里使用第一种方式实现一个可重入锁。

封装一个 struct RecursiveMutex,里面包含一个互斥锁 Mutex 用于不同 goroutine 获取锁;owner 记录获取锁的 goroutine 的信息;rec 记录重入的次数。这里需要注意的是,对于 owner 的操作是不同 goroutine 的并发读写,因此需要使用原子操作。

 // RecursiveMutex 可重入锁
type RecursiveMutex struct {
    sync.Mutex
    owner int64 // 持有锁的 goroutine id
rec   int32 // 重入次数
}

func (rm *RecursiveMutex) Lock() {
    // goroutine 重复获取锁
if atomic.LoadInt64(&rm.owner) == goid.Get() {
       // 增加重入次数,只有持有锁的 goroutine 才会写 rec, 不需要进行原子操作
rm.rec++
       return
    }
    // goroutine 第一次获取锁,设置 owner 和 rec
rm.Mutex.Lock()
    atomic.StoreInt64(&rm.owner, goid.Get())
    rm.rec = 1
}

func (rm *RecursiveMutex) Unlock() {
    if atomic.LoadInt64(&rm.owner) == goid.Get() {
       // 先减少重入次数
rm.rec--
       if rm.rec != 0 {
          return
       }
       // 可重入锁全部释放,重置 owner
atomic.StoreInt64(&rm.owner, -1)
       rm.Mutex.Unlock()
       return
    }
    // 非持有 mutex 的 goroutine 释放锁
panic("wrong owner")
}

func main() {
    rl := RecursiveMutex{}
    wg := sync.WaitGroup{}
    signal := make(chan struct{})
    wg.Add(2)
    // 第一个 goroutine 重复获取锁
go func() {
       defer wg.Done()
       rl.Lock()
       signal <- struct{}{}
       fmt.Println("goroutine 1 get lock with rec = ", rl.rec)
       rl.Lock()
       fmt.Println("goroutine 1 get lock with rec = ", rl.rec)
       rl.Unlock()
       rl.Unlock()
    }()
    go func() {
       defer wg.Done()
       <-signal
       rl.Lock()
       fmt.Println("goroutine 2 get lock with rec = ", rl.rec)
       rl.Unlock()
    }()
    wg.Wait()
}

并发安全的队列

在 go 的源码中,并没有提供并发安全的队列。我们可以基于 Mutex 实现一个并发安全的队列。

type SyncQueue[T any] struct {
    data []T
    m    sync.Mutex
}

func NewSyncQueue[T any](cap int) *SyncQueue[T] {
    return &SyncQueue[T]{
       data: make([]T, 0, cap),
       m:    sync.Mutex{},
    }

}

func (q *SyncQueue[T]) Enqueue(v T) {
    q.m.Lock()
    defer q.m.Unlock()
    q.data = append(q.data, v)
}

func (q *SyncQueue[T]) Dequeue() *T {
    q.m.Lock()
    defer q.m.Unlock()
    if len(q.data) == 0 {
       return nil
    }
    r := q.data[0]
    q.data = q.data[1:]
    return &r
}

func main() {
    q := NewSyncQueue[int](10)
    wg := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          q.Enqueue(i)
       }(i)
    }
    wg.Wait()
    for i := 0; i < 10; i++ {
       fmt.Println(*q.Dequeue())
    }
}
转载自:https://juejin.cn/post/7337153453896794123
评论
请登录