Go sync.Mutex
基本介绍
Mutex(互斥锁)
属于悲观锁,是 Go 标准库中sync中提供的一种用于控制多个 goroutine 之间对共享资源的并发访问的机制,通常用于避免多个协程同时访问共享数据,以防止竞态条件和数据竞争问题的发生。
同时 sync.Mutex
是一个不可重入锁,不会记录哪个 goroutine 拥有这把锁,对外提供三个方法用于并发控制,分别是Mutex.TryLock()
,Mutex.Lock()
和 Mutex.UnLock()
。
首先,对互斥锁 Mutex
的工作流程做一个整体的介绍:
-
在刚开始的时候,是处于
正常模式(非公平锁)
,请求锁的 goroutine 会自旋去尝试获取锁; -
当自旋超过4次还没有获取到锁的时候,该 goroutine 会按照
FIFO(先入先出)
的顺序加入等待队列的尾部; -
当锁被释放之后,会依次唤醒请求队列头部的 goroutine,但此时该 goroutine 并不会直接获取锁,而是需要和当前自旋的 goroutine 进行竞争,因为当前可能有多个 goroutine 在CPU中运行并自旋请求锁,所以从等待队列中取出来的 goroutine 大概率获取不到锁,失败之后该 goroutine 会被重新放回等待队列头部;
-
当一个 goroutine 超过 1ms 时仍未获取到锁,这把锁就会被切换到
饥饿模式(公平锁)
; -
在饥饿模式下,会直接把锁交给等待队列中排在第一位的 goroutine;同时新进来的 goroutine 不会参与抢锁也不会进入自旋状态,而是直接放入等待队列的尾部,
-
如果一个 goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回
正常模式
源码分析
数据结构
type Mutex struct {
state int32 // 锁的状态
sema uint32 // 信号量
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
state
是一个 32位整数,通过二进制位表示锁的状态,不同状态下会有不同的处理方式
Locked
: 低一位,标记互斥锁是否被加锁Woken
: 低二位,标记互斥锁上是否有被唤醒的 goroutineStarving
: 低三位,标记互斥锁是否处于饥饿模式Waiter
: 剩下29位统计互斥锁上等待队列中 goroutine 的数量
sema
是一个uint32,但是实际上底层是一个 semaRoot
结构体,维持顺序的平衡二叉树,用于定位 mutex 的等待队列,从而实现 goroutine 的阻塞和唤醒
Lock()
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
通过CAS操作加锁失败后,进入 lockSlow
阶段,这里贴一段源码注释
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待时间
starving := false // 是否为饥饿模式
awoke := false // 是否被重新唤醒
iter := 0 // 自旋次数
old := m.state
for {
// 尝试自旋
// 能自旋的条件:正常模式 + runtime_canSpin(iter)返回 true
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果没有其他 goroutine 在等待这个互斥锁,设置 mutexWaiter 相应位为1
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++ // 每次自旋结束后会累计次数,会影响下一轮runtime_canSpin(iter)的返回值
old = m.state // 更新状态
continue
}
new := old
// 计算新状态
if old&mutexStarving == 0 { // 如果不是饥饿并且原来锁是空闲的,就直接拿锁,标记mutexLocked
new |= mutexLocked
}
// 如果原来锁忙或者是饥饿状态,则让 waiterCount 加1,表示这个 goroutine 在等这个互斥锁
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 { // 设置了饥饿状态,并且上锁了,新状态一定是饥饿状态
new |= mutexStarving
}
if awoke { // 当前 goroutine 刚结束自旋
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 之前自旋的时候设置了 mutexWoken 避免其他阻塞的 goroutinue 被唤醒,现在结束了自旋,要清除 mutexWoken 位
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 更新状态成功并且 old 中锁闲 new 中锁忙,锁被当前 goroutine 拿到,直接退出
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 等待时间不为0则把当前 goroutinue 放入等待队列头部
queueLifo := waitStartTime != 0
// 等待时间为0说明是新请求拿锁的 gouroutine,初始化等待开始时间,放入队列尾部
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 挂起
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 醒来之后判断一下是否等待超过了 1ms、原来是不是饥饿模式,依据此决定接下来要不要切换为饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果现在不是饥饿状态,直接进入下一次循环去尝试抢锁
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 这个位运算还是很骚的,一步完成了 置mutexLocked为1 和 waitCount - 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 如果除了当前 goroutine 以外没有其他请求锁的 goroutine,退出饥饿模式
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
// 自己尝试更新锁的状态没成功就说明此时有其他人更新了,获取一下新的锁状态
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
Unlook
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
快速解锁失败则调用 m.unlockSlow(new)
慢解锁
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
// 正常模式的处理方法
if new&mutexStarving == 0 {
old := new
for {
// 没有等待者不用唤醒
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 有等待者唤醒队头 goroutine 移交锁的所有权
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
TryLock()
判断锁是不是空闲,是不是正常模式,如果是,则尝试拿锁,拿到锁了返回 true, 没拿到锁就返回 false, 不会造成阻塞
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// There may be a goroutine waiting for the mutex, but we are
// running now and can try to grab the mutex before that
// goroutine wakes up.
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
转载自:https://juejin.cn/post/7287420810317496374