likes
comments
collection
share

Go sync.Mutex

作者站长头像
站长
· 阅读数 104

基本介绍

Mutex(互斥锁) 属于悲观锁,是 Go 标准库中sync中提供的一种用于控制多个 goroutine 之间对共享资源的并发访问的机制,通常用于避免多个协程同时访问共享数据,以防止竞态条件和数据竞争问题的发生。

同时 sync.Mutex 是一个不可重入锁,不会记录哪个 goroutine 拥有这把锁,对外提供三个方法用于并发控制,分别是Mutex.TryLock()Mutex.Lock()Mutex.UnLock()

首先,对互斥锁 Mutex 的工作流程做一个整体的介绍:

  1. 在刚开始的时候,是处于 正常模式(非公平锁),请求锁的 goroutine 会自旋去尝试获取锁;

  2. 当自旋超过4次还没有获取到锁的时候,该 goroutine 会按照 FIFO(先入先出) 的顺序加入等待队列的尾部;

  3. 当锁被释放之后,会依次唤醒请求队列头部的 goroutine,但此时该 goroutine 并不会直接获取锁,而是需要和当前自旋的 goroutine 进行竞争,因为当前可能有多个 goroutine 在CPU中运行并自旋请求锁,所以从等待队列中取出来的 goroutine 大概率获取不到锁,失败之后该 goroutine 会被重新放回等待队列头部;

  4. 当一个 goroutine 超过 1ms 时仍未获取到锁,这把锁就会被切换到饥饿模式(公平锁)

  5. 在饥饿模式下,会直接把锁交给等待队列中排在第一位的 goroutine;同时新进来的 goroutine 不会参与抢锁也不会进入自旋状态,而是直接放入等待队列的尾部,

  6. 如果一个 goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式

源码分析

数据结构

type Mutex struct {
   state int32    // 锁的状态 
   sema  uint32   // 信号量
}


const (
   mutexLocked = 1 << iota // mutex is locked
   mutexWoken
   mutexStarving
   mutexWaiterShift = iota
   starvationThresholdNs = 1e6
)

state 是一个 32位整数,通过二进制位表示锁的状态,不同状态下会有不同的处理方式

  • Locked: 低一位,标记互斥锁是否被加锁
  • Woken : 低二位,标记互斥锁上是否有被唤醒的 goroutine
  • Starving : 低三位,标记互斥锁是否处于饥饿模式
  • Waiter : 剩下29位统计互斥锁上等待队列中 goroutine 的数量

Go sync.Mutex

sema 是一个uint32,但是实际上底层是一个 semaRoot 结构体,维持顺序的平衡二叉树,用于定位 mutex 的等待队列,从而实现 goroutine 的阻塞和唤醒

Go sync.Mutex

Lock()

func (m *Mutex) Lock() {
   // Fast path: grab unlocked mutex.
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
         race.Acquire(unsafe.Pointer(m))
      }
      return
   }
   // Slow path (outlined so that the fast path can be inlined)
   m.lockSlow()
}

通过CAS操作加锁失败后,进入 lockSlow 阶段,这里贴一段源码注释

func (m *Mutex) lockSlow() {
    var waitStartTime int64 // 等待时间
    starving := false       // 是否为饥饿模式
    awoke := false          // 是否被重新唤醒
    iter := 0               // 自旋次数
    old := m.state
    for {
        // 尝试自旋
        // 能自旋的条件:正常模式 + runtime_canSpin(iter)返回 true
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 如果没有其他 goroutine 在等待这个互斥锁,设置 mutexWaiter 相应位为1
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                        awoke = true
                    }
                runtime_doSpin()
                iter++   // 每次自旋结束后会累计次数,会影响下一轮runtime_canSpin(iter)的返回值
                old = m.state     // 更新状态
                continue
            }
        new := old
		
        // 计算新状态
        if old&mutexStarving == 0 {  // 如果不是饥饿并且原来锁是空闲的,就直接拿锁,标记mutexLocked
            new |= mutexLocked
        }
        // 如果原来锁忙或者是饥饿状态,则让 waiterCount 加1,表示这个 goroutine 在等这个互斥锁
        if old&(mutexLocked|mutexStarving) != 0 {  
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {  // 设置了饥饿状态,并且上锁了,新状态一定是饥饿状态
            new |= mutexStarving
        }
        if awoke {  // 当前 goroutine 刚结束自旋
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
        // 之前自旋的时候设置了 mutexWoken 避免其他阻塞的 goroutinue 被唤醒,现在结束了自旋,要清除 mutexWoken 位
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 更新状态成功并且 old 中锁闲 new 中锁忙,锁被当前 goroutine 拿到,直接退出
            if old&(mutexLocked|mutexStarving) == 0 {
                break
            }
			
            // 等待时间不为0则把当前 goroutinue 放入等待队列头部
            queueLifo := waitStartTime != 0
            // 等待时间为0说明是新请求拿锁的 gouroutine,初始化等待开始时间,放入队列尾部
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 挂起
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 醒来之后判断一下是否等待超过了 1ms、原来是不是饥饿模式,依据此决定接下来要不要切换为饥饿模式
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果现在不是饥饿状态,直接进入下一次循环去尝试抢锁
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 这个位运算还是很骚的,一步完成了 置mutexLocked为1 和 waitCount - 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // 如果除了当前 goroutine 以外没有其他请求锁的 goroutine,退出饥饿模式
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
            } else {
                // 自己尝试更新锁的状态没成功就说明此时有其他人更新了,获取一下新的锁状态
                old = m.state
            }
        }

        if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

Unlook

func (m *Mutex) Unlock() {
   if race.Enabled {
      _ = m.state
      race.Release(unsafe.Pointer(m))
   }

   // Fast path: drop lock bit.
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      // Outlined slow path to allow inlining the fast path.
      // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
      m.unlockSlow(new)
   }
}

快速解锁失败则调用 m.unlockSlow(new) 慢解锁

func (m *Mutex) unlockSlow(new int32) {
   if (new+mutexLocked)&mutexLocked == 0 {
      fatal("sync: unlock of unlocked mutex")
   }
   
   // 正常模式的处理方法
   if new&mutexStarving == 0 {
      old := new
      for {
         // 没有等待者不用唤醒
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 有等待者唤醒队头 goroutine 移交锁的所有权
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
      runtime_Semrelease(&m.sema, true, 1)
   }
}

TryLock()

判断锁是不是空闲,是不是正常模式,如果是,则尝试拿锁,拿到锁了返回 true, 没拿到锁就返回 false, 不会造成阻塞

func (m *Mutex) TryLock() bool {
   old := m.state
   if old&(mutexLocked|mutexStarving) != 0 {
      return false
   }

   // There may be a goroutine waiting for the mutex, but we are
   // running now and can try to grab the mutex before that
   // goroutine wakes up.
   if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
      return false
   }

   if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
   }
   return true
}
转载自:https://juejin.cn/post/7287420810317496374
评论
请登录