Golang并发编程—锁
0. 简介
前面我们讲过Go
中channel
的实现,在这篇博客中,我们来讲一下锁Mutex
族的实现。
1. 并发原语
Go 语言在 sync
包中提供了用于同步的一些基本原语,包括常见的sync.Mutex
、sync.RWMutex
、sync.WaitGroup
、sync.Once
和sync.Cond
等并发原语,本章节将对sync.Mutex
和sync.RWMutex
进行介绍。
2. 互斥锁 Mutex
在Go
中,以下一共只占8个字节的结构体表征了一把互斥锁:
type Mutex struct {
state int32
sema uint32
}
其中:
state
表示锁的状态,用不同的位表示不同的状态类型;sema
是控制锁状态的信号量;
2.1 锁状态

waitersCount
:记录当前等待抢这把锁的goroutine
数目;starving
:当前锁是否处于饥饿状态;0-正常,1-饥饿;woken
:当前锁是否有被唤醒;0-没有,1-有;locked
:当前锁是否被持有;0-没有,1-有;
2.2 加锁 sync.Mutex.Lock()
下面我们看下加锁过程,如下:
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
我们忽略race.Enabled
类似的代码,这是Go
里面做race
检测时用到的,可以看到,第一步先使用CAS
(CompareAndSwapInt32
,比较并转换,即若此时m.state
和0
比较,相等则将m.state
设置为mutexLocked
,即1
,然后直接返回,即获取锁成功;否则执行m.lockSlow()
操作;CAS
是原子操作)。
如果没有获取锁成功,则会执行m.lockSlow()
函数,我们分段解析该函数:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}
以上代码是当前线程尝试自旋以获取锁,自旋是一种多线程的同步机制,当前的进程在进入自旋的过程中会一直保持CPU
的占用,持续检查某个条件是否为真。在多核的CPU
上,自旋可以避免goroutine
的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以goroutine
进入自旋的条件非常苛刻:
- 只有在普通模式下才会进入自旋,在饥饿模式下则不会;
runtime.runtime_canSpin
满足条件,如下:
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
runtime.runtime_canSpin
函数最终会link
到runtime.sync_runtime_canSpin
函数,其返回true
的条件有:
- 运行在多核CPU机器上;
- 当前
goroutine
为了获取该锁进入自旋的次数小于四次; - 当前机器上至少存在一个正在运行的处理器P,并且该处理器的运行队列为空;
一旦当前goroutine
能够进入自旋,则会调用runtime.runtime_doSpin
函数来自旋并且消耗CPU时间,该函数知识会执行30次PAUSE
指令:
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
接下来,如果锁此时没有被持有,或者自旋达到了次数,将会进入以下代码:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
...
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
...
}
...
}
所以第一个判断if old&mutexStarving == 0
,如果当前加了锁,但是没有处于饥饿状态,也会设置new |= mutexLocked
,即设置new
的mutexLocked = 1
。
如果old
还是处于饥饿状态或者已经上锁了,那么需要设置waitersCount+1
,表示这个goroutine
下面会继续等待。
如果是饥饿状态,那么new
也会被设置成饥饿状态。
如果awoke
在自旋阶段被设置成功,这里要通过new &^= mutexWoken
(&^
表示清除运算两边数都是1的位)来清除mutexWoken
标志位。因为后续流程很有可能当前线程会被挂起,就需要等待其他释放锁的goroutine
来唤醒,如果unlock
的时候发现mutexWoken
的位置不是0,则就不会去唤醒,则该线程就无法再醒来加锁。
继续往下:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
...
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
可以看到,后面继续使用atomic.CompareAndSwapInt32
来尝试转换m.state
状态,如果不成功,说明此时状态发生了变更,可能是其他协程亦做了操作,那么继续自旋。否则:
- 如果不成功,那么继续进入循环竞争锁;如果成功,且此时锁未锁定且非饥饿状态,那么获取锁成功,直接
break
; - 如果等待时长不为0,说明已经不是第一次循环进入了,那么说明不是新的
goroutine
来抢占锁,即queueLifo = true
,则会将此goroutine
放到休眠队列的头部;而如果是新的goroutine
,则会放在尾部;如果等待时长为0,则初始化此时的时刻。 - 然后调用
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
阻塞进行等待,runtime_SemacquireMutex
函数最终会调用到以下函数:
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
其实就是最终会调用到runtime
包的sema
模块的runtime.semacquire1
,在尝试acquire
后就将当前goroutine
放到队列中,放在队列的头尾取决于第2步。然后调用goparkunlock
将协程挂起,等待有释放锁的协程将其唤醒。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// Easy case.
if cansemacquire(addr) {
return
}
// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}
- 唤醒之后会检查锁是否处于饥饿状态,或者是否唤醒超时,二者满足其一都要设置一下
starving
变量值; - 如果被唤醒时锁处于
饥饿状态
,那么当前goroutine
会获得互斥锁,如果等待队列中只存在当前goroutine
,且等待时间小于1ms,那么还会从饥饿模式中退出; - 如果不处于
饥饿状态
,如果等待时间超过1ms,那么设置starving
标志,继续进入循环去竞争锁。
可以发现,在正常模式下,正在自旋的goroutine
很可能打败被唤醒的goroutine
,从而造成有些goroutine
始终抢不到锁,从而造成饥饿;而在饥饿模式下,被唤醒的goroutine
一定会获取到锁,其他新加入竞争的goroutine
也不会进入自旋,而是会乖乖去后面排队,从而保证不会有尾延迟。
2.3 解锁 sync.Mutex.Unlock()
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
这里主要调用atomic.AddInt32
重新设置m.state
的状态,如果m.state
所有位都是0,那么无需处理直接走了,如果不为0则调用unlockSlow
方法进行慢解锁:
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
可以发现,饥饿模式下,直接调用runtime_Semrelease(&m.sema, true, 1)
直接唤醒等待协程。
如果是非饥饿模式,如果没有等待的goroutine
,或者mutexLocked
、mutexStarving
和mutexWoken
有一个不为零(说明已经有其他goroutine
在处理了),那么直接返回。
否则将等待的goroutine
数目减一,并且置上mutexWoken
标记,然后通过 runtime_Semrelease(&m.sema, false, 1)
唤醒等待队列中的goroutine
。
2.4 sync.Mutex.TryLock()
在Go 1.18
版本中,官方极其不情愿地新增了TryLock
方法:
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// There may be a goroutine waiting for the mutex, but we are
// running now and can try to grab the mutex before that
// goroutine wakes up.
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
sync.Mutex.TryLock()
的实现也比较简单:
- 在饥饿模式和锁定状态下,直接返回
false
; - 尝试加锁,加锁失败则返回
false
; - 成功就返回
true
。
2.5 小结
互斥锁的加锁过程是比较复杂的:
- 如果互斥锁处于初始化状态,那么直接通过原子操作置位
mutexLocked
加锁; - 如果互斥锁处于
mutexLocked
状态,并且在普通模式下工作,那么会进入自旋,执行30次PAUSE
指令消耗CPU时间并且等待锁释放; - 在正常模式下如果获取到锁,那么直接加锁成功;如果处于饥饿模式或者没有拿到锁,那么会通过
runtime_SemacquireMutex
将尝试获取锁的协程切换至休眠,等到锁的持有者唤醒; - 如果休眠的协程等待超过1ms,那么锁会进入休眠模式,在休眠模式下,新进入竞争锁的协程会被放到获取信号量队列的尾部,被唤醒但是没有获取到锁的会放在头部,然后按照队列顺序获取锁,保证不会发生饥饿;
- 如果当前协程是互斥锁的最后一个等待的协程,或者是等地啊的时间小于1ms(其实还和该协程上一个循环是否置上
starving
标志有关),那么会退出饥饿模式。
互斥锁的解锁相对比较简单:
- 当互斥锁处于饥饿模式时,将等待的
goroutine
数目减一,并且置上mutexWoken
标记,然后通过runtime_Semrelease(&m.sema, false, 1)
唤醒等待队列中的goroutine
; - 当互斥锁处于正常模式时,如果没有协程等待锁的释放,或者锁已经被加锁、被唤醒、被其他协程置为饥饿,说明有其他协程在处理锁,这时候直接返回;否则通过
runtime_Semrelease
唤醒其他协程起来竞争锁。
3. 读写锁 RWMutex
和互斥锁不同的是,读写锁适用于读多写少的场景,其锁粒度如下:
读 | 写 | |
---|---|---|
读 | Y | N |
写 | N | N |
其结构体如下:
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
w
:复用互斥锁提供的能力;writerSem
和readerSem
:分别用作写等待读和读等待写;readerCount
:存储当前正在执行的读操作数量;readerWait
:表示当写操作被阻塞时需要等待的读操作个数;
3.1 写加锁 sync.RWMutex.Lock
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
可以看到,在写锁锁定的时候,首先使用rw.w.Lock()
操作保证写操作锁定。
然后以下操作将readerCount
字段减小一个极大值(rwmutexMaxReaders = 1 << 30
),返回的结果再加上这个极大值,如果返回的r == 0
,说明readerCount
原本就是0
,此时无读操作。
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
如果r != 0
,说明此时肯定有读操作占据着锁,那么将readerWait
通过原子操作加上r
的大小,然后通过runtime_SemacquireMutex(&rw.writerSem, false, 0)
使得该写操作协程陷入休眠等待唤醒。
3.2 写解锁 sync.RWMutex.Unlock
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
首先,通过atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
将前面加锁时给readerCount
加上的极大值给减回来,如果此时r >= rwmutexMaxReaders
,说明并没有写加锁操作就直接解锁了,报panic
。
然后通过for
循环,依次通过runtime_Semrelease(&rw.readerSem, false, 0)
唤醒等待的读goroutine
。
最后再通过rw.w.Unlock()
解锁。
3.3 读加锁 sync.RWMutex.RLock
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
读加锁的操作比较简单,直接通过原子操作atomic.AddInt32(&rw.readerCount, 1)
给readerCount
加1,如果返回值是小于0的,那说明此时已经有写锁已经操作,那么此时goroutine
选择休眠,等待唤醒。
可以发现,如果在读锁之前有写锁操作,即使此时此刻写锁还没有获取到锁资源,那么后进入的读锁也是被阻塞挂起的。
3.4 读解锁 sync.RWMutex.RUnlock
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
在解读锁时,直接将readerCount
减1,如果返回值小于0,说明此时有写加锁操作在等待锁,所以通过rUnlockSlow
函数进入慢解锁过程:
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
首先是条件判断:
- 如果
r+1 == 0
,也就是r == -1
,即在解锁前,r == 0
,很明显这时候是没有任何读加锁状态的,肯定是对未加读锁的锁进行了解锁操作; - 如果
r+1 == -rwmutexMaxReaders
,即r == -rwmutexMaxReaders - 1
,即在解锁前,r == -rwmutexMaxReaders
,这说明在解锁前有写锁操作了,但是肯定没有读锁操作,所以也是对未加读锁的锁进行了解锁操作。
然后则是要将写锁需要等待的读锁数readerWait
减1,直到减到了0,说明写锁需要等待的读锁都释放结束,然后再通过runtime_Semrelease(&rw.writerSem, false, 1)
唤醒写加锁协程。
3.5 小结
虽然读写锁的功能比互斥锁复杂,但是建立在互斥锁的基础上,其实现地反而更加简单:
- 写锁之间通过互斥锁互相锁定,保证每次只有一个写锁操作和其他读操作协程竞争;
- 读锁加锁时,很简单地通过原子操作给
readerCount
加1,只要结果大于等于0(等于0几乎不会出现),那么就直接获取锁;否则通过readerSem
的信号量操作休眠; - 写加锁时,会直接给
readerCount
减去一个极大值,保证之后所有的读锁苦哈哈地为readerCount
加1的操作得到的结果始终小于0,即陷入休眠,且记录在写锁之前获取到读锁的协程数量是readerWait
; - 而在读锁解锁时,会给
readerCount
减1,条件判断后再给readerWait
减1,当readerWait
减小到0时就可以给等待写锁加锁成功了; - 而写锁解锁时,则是先给
readerCount
把极大值加回来,然后记录返回的结果,这个结果就是在写锁持有锁期间阻塞的读锁数量,这时候在针对其一一唤醒;最后再解锁互斥锁。
简单来说,除了互斥锁那部分的逻辑,读写锁的设计就是原子操作+加减法计算
。
4. 小结
本篇博客总结了Go
中互斥锁和读写锁的实现,需要注意的是,本文锁的粒度是针对于协程goroutine
,和之前接触的针对系统线程的线程锁没有半毛钱关系。
5. 参考文献
转载自:https://juejin.cn/post/7232550589963731002