sync包(一)——Mutex/RWMutex
sync
,为synchronize
的缩写,意为同步、并发协同,由此可见,sync
包中大多为用于解决并发问题的API。本系列主要讲述Mutex
、RWMutex
、Once
、Pool
以及WaitGroup
部分内容。
Mutex实现
结构体
type Mutex struct {
state int32
sema uint32
}
Mutex
的结构只有两个字段:
state
:用于标识当前锁的状态,所谓加锁,就是把该字段修改为某个值;解锁亦然sema
:用于处理沉睡、唤醒的信号量,依赖于runtime的函数调用
加锁
锁的实现一般都是依赖于以下两点:
- 自旋作为快路径
- 等待队列作为慢路径
func (m *Mutex) Lock() {
// 一次性自旋,如果锁处于释放状态,则加锁成功,直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 加锁失败,则进入慢路径
m.lockSlow()
}
func (m *Mutex) lockSlow() {
...
for {
// 真正的自旋部分(自旋判断锁的状态,只有锁处于非饥饿状态且已被其他人持有的时候,才会执行)
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 设置Woken标志位,以此来让Unlock方法不去唤醒其他goroutine
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 锁的状态计算...
// 尝试占用锁
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 跳出自旋后的一些处理
if old&(mutexLocked|mutexStarving) == 0 {
break // 通过上面的自旋,CAS已经抢到锁
}
...
// 没有抢到锁,将goroutine放入等待队列并执行相应的调度,会在这里进行阻塞
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 更新饥饿标志位
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 锁处于饥饿模式的一些处理
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 正常模式,重置迭代次数,重新执行自旋获取锁
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
在Lock
方法中,首先会尝试直接加锁,如果没有人持有锁,也没有人抢占,那么一个CAS操作就能成功(这里为一次性自旋),否则进入慢路径——lockSlow
方法。
在lockSlow
方法中,会进入一个真正的自旋状态:首先会判断当前锁的模式,如果可以自旋,则进入在for循环中不断尝试获取锁。虽然这里是慢路径,但是这里的自旋依然执行的很快,因为这里并没有加入队列中。
值得一提的是,Go的锁有两种模式:正常模式
和饥饿模式
:
如上图所示,一个锁已经本地持有一个goroutine队列,如果锁进行释放时,刚好又来了一个新的goroutine —— G1,那么,Mutex
该如何调度呢?
- 从公平性讲,让
G2
持有锁,因为G2
排在等待队列的队头,等待的时间最长- 从执行效率上讲,让
G1
与G2
竞争,但大概率G1
会持有锁,因为G1
作为刚刚申请锁的goroutine,还没有进入休眠状态,此时它是占有CPU的,而G2
已经进入休眠队列并不占有CPU,唤醒G2
让它与G1
竞争会有goroutine的调度过程
go的正常模式
采用的就是G1
与G2
竞争锁的方式,但是如果每次G2
与G1
竞争锁,锁都被新来的G1
持有的话,G2
就会陷入饥饿状态,所以,Mutex
有了饥饿模式
:
如果等待时间超过1ms,锁进入该模式,在该模式下,锁会由等待队列中的goroutine持有。
解锁
互斥锁的解锁过程 Mutex.Unlock
与加锁过程相比就很简单:
- 没有其他goroutine竞争锁,直接解锁成功
- 有其他goroutine竞争锁,进入慢路径解锁
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 直接一个原子操作进行解锁(atomic),达到的效果与CAS是一样的
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 正常来讲,如果该锁只被一个goroutine持有,且等待队列为空,new的值会为0
// new不为0,表示锁的等待队列中存在等待的goroutine或正在有goroutine自旋等待锁
// 此时进入慢路径解锁(即对goroutine进行调度)
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 此处进行判断是否处于饥饿模式
if new&mutexStarving == 0 {
old := new
for {
// 此处进行判断—— 锁的等待队列是否为空 || 锁是否被锁住(因为锁已经在Unlock方法中进行了释
// 放,在这期间,极有可能已经有新来的goroutine重新持有了锁) || 是否已有唤醒的
// goroutine(在上面的lock方法中,在自旋过程中,新来的goroutine会将唤醒标志位激活) || 是
// 否处于饥饿模式(同样的如果唤醒的goroutine在竞争过程中产生了饥饿,则优先给当前goroutine)
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒goroutine
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 处于饥饿模式,唤醒队头goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
RWMutex实现
结构体
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
w
— 复用互斥锁提供的能力;writerSem
和readerSem
— 分别用于写等待读和读等待写:readerCount
存储了当前正在执行的读操作数量;readerWait
表示当写操作被阻塞时等待的读操作个数
读锁
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
可以看到,读锁的加锁非常简单,直接对readerCount
进行加法,如果返回负数表示已经有其他 goroutine 获得了写锁,当前 goroutine 就会陷入休眠等待锁的释放;如果结果为非负数表示没有 goroutine 获得写锁,当前方法会成功返回。
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 对写锁之前的读锁数量减一,当数量为0的时候,出发信号量唤醒写锁的 goroutine
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
读锁的解锁也较为简单,与加锁类似,解锁就是对readerCount
做减法,返回负数表示已有其他 goroutine 获得写锁,调用 rUnlockSlow
方法作进一步处理;返回的结果为正数,则正常解锁
写锁
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 加写锁,防止其他 goroutine 抢占
rw.w.Lock()
// 通知后续的读锁已经有写锁进入,后续读锁需要休眠
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 等待写锁前面的读锁完全释放,没有全部释放则写锁的 goroutine 陷入休眠
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
写锁的加锁主要复用Mutex
的加锁逻辑,同时兼顾对读锁的调度
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// 计算写锁后面读锁等待的数量
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 将后面等待的读锁全部唤醒
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 解锁
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
写锁的解锁与加锁正好相反,解锁之后,在 for 循环中将后面的读锁全部唤醒。 获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死。
锁的应用
Mutex可以看做是锁,RWMutex是读写锁,一般的用法是将Mutex或者RWMutex和需要被保护住的资源封装在一个结构体中,当多个goroutine同时访问该资源时,利用Mutex或者RWMutex
锁住。当然,使用锁的时候,优先使用RWMutex
。
例如,下面是使用读写锁,对map的一个封装:
type SafeMap struct {
m map[string]string
mutex sync.RWMutex
}
func (s *SafeMap) LoadOrStore(key string, newVale string) (val string, loaded bool) {
// 取值
s.mutex.RLock()
val, ok := s.m[key]
s.mutex.RUnlock()
if ok {
return val, true
}
// 存值
s.mutex.Lock()
defer s.mutex.Unlock()
// DoubleCheck
val, ok = s.m[key]
if ok {
return val, true
}
s.m[key] = newVale
return newVale, false
}
在上面的代码中,将读写锁与map放在了一起,组合成了SafeMap,实现了LoadOrStore
方法,在该方法中,先加读锁用于判断map中是否存在key
,根据返回结果进行进一步处理:存在则返回值,不存在则将新的值存入map。
需要注意的是,LoadOrStotre
中采用了DoubleCheck的策略,即在存值之前,再次检查一遍map中是否有存入key
,这样做的目的是为了处理并发问题:在并发情况下,如果map中没有key
的值,有可能有多个goroutine都进入了存值阶段,因锁竞争阻塞在存值阶段,最终顺序执行map的写入操作(即使前面已经有goroutine写入成功,后面的goroutine也会对其进行覆盖);因此引入DoubleCheck策略,防止map的重复写入问题。
参考资料
转载自:https://juejin.cn/post/7146025646521581575