likes
comments
collection
share

Golang并发编程—锁

作者站长头像
站长
· 阅读数 11

0. 简介

前面我们讲过Gochannel的实现,在这篇博客中,我们来讲一下锁Mutex族的实现。

1. 并发原语

Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond等并发原语,本章节将对sync.Mutexsync.RWMutex进行介绍。

2. 互斥锁 Mutex

Go中,以下一共只占8个字节的结构体表征了一把互斥锁:

type Mutex struct {
   state int32
   sema  uint32
}

其中:

  • state表示锁的状态,用不同的位表示不同的状态类型;
  • sema是控制锁状态的信号量;

2.1 锁状态

Golang并发编程—锁
  • waitersCount:记录当前等待抢这把锁的goroutine数目;
  • starving:当前锁是否处于饥饿状态;0-正常,1-饥饿;
  • woken:当前锁是否有被唤醒;0-没有,1-有;
  • locked:当前锁是否被持有;0-没有,1-有;

2.2 加锁 sync.Mutex.Lock()

下面我们看下加锁过程,如下:

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
   // Fast path: grab unlocked mutex.
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
         race.Acquire(unsafe.Pointer(m))
      }
      return
   }
   // Slow path (outlined so that the fast path can be inlined)
   m.lockSlow()
}

我们忽略race.Enabled类似的代码,这是Go里面做race检测时用到的,可以看到,第一步先使用CASCompareAndSwapInt32,比较并转换,即若此时m.state0比较,相等则将m.state设置为mutexLocked,即1,然后直接返回,即获取锁成功;否则执行m.lockSlow()操作;CAS是原子操作)。

如果没有获取锁成功,则会执行m.lockSlow()函数,我们分段解析该函数:

func (m *Mutex) lockSlow() {
   var waitStartTime int64
   starving := false
   awoke := false
   iter := 0
   old := m.state
   for {
      // Don't spin in starvation mode, ownership is handed off to waiters
      // so we won't be able to acquire the mutex anyway.
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
         // Active spinning makes sense.
         // Try to set mutexWoken flag to inform Unlock
         // to not wake other blocked goroutines.
         if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
         }
         runtime_doSpin()
         iter++
         old = m.state
         continue
      }
      ...
}

以上代码是当前线程尝试自旋以获取锁,自旋是一种多线程的同步机制,当前的进程在进入自旋的过程中会一直保持CPU的占用,持续检查某个条件是否为真。在多核的CPU上,自旋可以避免goroutine的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以goroutine进入自旋的条件非常苛刻:

  • 只有在普通模式下才会进入自旋,在饥饿模式下则不会;
  • runtime.runtime_canSpin满足条件,如下:
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
   // sync.Mutex is cooperative, so we are conservative with spinning.
   // Spin only few times and only if running on a multicore machine and
   // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
   // As opposed to runtime mutex we don't do passive spinning here,
   // because there can be work on global runq or on other Ps.
   if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
      return false
   }
   if p := getg().m.p.ptr(); !runqempty(p) {
      return false
   }
   return true
}

runtime.runtime_canSpin函数最终会linkruntime.sync_runtime_canSpin函数,其返回true的条件有:

  • 运行在多核CPU机器上;
  • 当前goroutine为了获取该锁进入自旋的次数小于四次;
  • 当前机器上至少存在一个正在运行的处理器P,并且该处理器的运行队列为空;

一旦当前goroutine能够进入自旋,则会调用runtime.runtime_doSpin函数来自旋并且消耗CPU时间,该函数知识会执行30次PAUSE指令:

func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

接下来,如果锁此时没有被持有,或者自旋达到了次数,将会进入以下代码:

func (m *Mutex) lockSlow() {
   var waitStartTime int64
   starving := false
   awoke := false
   iter := 0
   old := m.state
   for {
      ...
      new := old
      // Don't try to acquire starving mutex, new arriving goroutines must queue.
      if old&mutexStarving == 0 {
         new |= mutexLocked
      }
      if old&(mutexLocked|mutexStarving) != 0 {
         new += 1 << mutexWaiterShift
      }
      // The current goroutine switches mutex to starvation mode.
      // But if the mutex is currently unlocked, don't do the switch.
      // Unlock expects that starving mutex has waiters, which will not
      // be true in this case.
      if starving && old&mutexLocked != 0 {
         new |= mutexStarving
      }
      if awoke {
         // The goroutine has been woken from sleep,
         // so we need to reset the flag in either case.
         if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
         }
         new &^= mutexWoken
      }
      ...
   }

   ...
}

所以第一个判断if old&mutexStarving == 0,如果当前加了锁,但是没有处于饥饿状态,也会设置new |= mutexLocked,即设置newmutexLocked = 1

如果old还是处于饥饿状态或者已经上锁了,那么需要设置waitersCount+1,表示这个goroutine下面会继续等待。

如果是饥饿状态,那么new也会被设置成饥饿状态。

如果awoke在自旋阶段被设置成功,这里要通过new &^= mutexWoken&^表示清除运算两边数都是1的位)来清除mutexWoken标志位。因为后续流程很有可能当前线程会被挂起,就需要等待其他释放锁的goroutine来唤醒,如果unlock的时候发现mutexWoken的位置不是0,则就不会去唤醒,则该线程就无法再醒来加锁。

继续往下:

func (m *Mutex) lockSlow() {
   var waitStartTime int64
   starving := false
   awoke := false
   iter := 0
   old := m.state
   for {
      ...
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
         if old&(mutexLocked|mutexStarving) == 0 {
            break // locked the mutex with CAS
         }
         // If we were already waiting before, queue at the front of the queue.
         queueLifo := waitStartTime != 0
         if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
         }
         runtime_SemacquireMutex(&m.sema, queueLifo, 1)
         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
         old = m.state
         if old&mutexStarving != 0 {
            // If this goroutine was woken and mutex is in starvation mode,
            // ownership was handed off to us but mutex is in somewhat
            // inconsistent state: mutexLocked is not set and we are still
            // accounted as waiter. Fix that.
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
               throw("sync: inconsistent mutex state")
            }
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            if !starving || old>>mutexWaiterShift == 1 {
               // Exit starvation mode.
               // Critical to do it here and consider wait time.
               // Starvation mode is so inefficient, that two goroutines
               // can go lock-step infinitely once they switch mutex
               // to starvation mode.
               delta -= mutexStarving
            }
            atomic.AddInt32(&m.state, delta)
            break
         }
         awoke = true
         iter = 0
      } else {
         old = m.state
      }
   }

   if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
   }
}

可以看到,后面继续使用atomic.CompareAndSwapInt32来尝试转换m.state状态,如果不成功,说明此时状态发生了变更,可能是其他协程亦做了操作,那么继续自旋。否则:

  1. 如果不成功,那么继续进入循环竞争锁;如果成功,且此时锁未锁定且非饥饿状态,那么获取锁成功,直接break
  2. 如果等待时长不为0,说明已经不是第一次循环进入了,那么说明不是新的goroutine来抢占锁,即queueLifo = true,则会将此goroutine放到休眠队列的头部;而如果是新的goroutine,则会放在尾部;如果等待时长为0,则初始化此时的时刻。
  3. 然后调用runtime_SemacquireMutex(&m.sema, queueLifo, 1)阻塞进行等待,runtime_SemacquireMutex函数最终会调用到以下函数:
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
   semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

其实就是最终会调用到runtime包的sema模块的runtime.semacquire1,在尝试acquire后就将当前goroutine放到队列中,放在队列的头尾取决于第2步。然后调用goparkunlock将协程挂起,等待有释放锁的协程将其唤醒。

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
   gp := getg()
   if gp != gp.m.curg {
      throw("semacquire not on the G stack")
   }

   // Easy case.
   if cansemacquire(addr) {
      return
   }

   // Harder case:
   // increment waiter count
   // try cansemacquire one more time, return if succeeded
   // enqueue itself as a waiter
   // sleep
   // (waiter descriptor is dequeued by signaler)
   s := acquireSudog()
   root := semroot(addr)
   t0 := int64(0)
   s.releasetime = 0
   s.acquiretime = 0
   s.ticket = 0
   if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
      t0 = cputicks()
      s.releasetime = -1
   }
   if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
      if t0 == 0 {
         t0 = cputicks()
      }
      s.acquiretime = t0
   }
   for {
      lockWithRank(&root.lock, lockRankRoot)
      // Add ourselves to nwait to disable "easy case" in semrelease.
      atomic.Xadd(&root.nwait, 1)
      // Check cansemacquire to avoid missed wakeup.
      if cansemacquire(addr) {
         atomic.Xadd(&root.nwait, -1)
         unlock(&root.lock)
         break
      }
      // Any semrelease after the cansemacquire knows we're waiting
      // (we set nwait above), so go to sleep.
      root.queue(addr, s, lifo)
      goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
      if s.ticket != 0 || cansemacquire(addr) {
         break
      }
   }
   if s.releasetime > 0 {
      blockevent(s.releasetime-t0, 3+skipframes)
   }
   releaseSudog(s)
}
  1. 唤醒之后会检查锁是否处于饥饿状态,或者是否唤醒超时,二者满足其一都要设置一下starving变量值;
  2. 如果被唤醒时锁处于饥饿状态,那么当前goroutine会获得互斥锁,如果等待队列中只存在当前goroutine,且等待时间小于1ms,那么还会从饥饿模式中退出;
  3. 如果不处于饥饿状态,如果等待时间超过1ms,那么设置starving标志,继续进入循环去竞争锁。

可以发现,在正常模式下,正在自旋的goroutine很可能打败被唤醒的goroutine,从而造成有些goroutine始终抢不到锁,从而造成饥饿;而在饥饿模式下,被唤醒的goroutine一定会获取到锁,其他新加入竞争的goroutine也不会进入自旋,而是会乖乖去后面排队,从而保证不会有尾延迟。

2.3 解锁 sync.Mutex.Unlock()

func (m *Mutex) Unlock() {
   if race.Enabled {
      _ = m.state
      race.Release(unsafe.Pointer(m))
   }

   // Fast path: drop lock bit.
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      // Outlined slow path to allow inlining the fast path.
      // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
      m.unlockSlow(new)
   }
}

这里主要调用atomic.AddInt32重新设置m.state的状态,如果m.state所有位都是0,那么无需处理直接走了,如果不为0则调用unlockSlow方法进行慢解锁:

func (m *Mutex) unlockSlow(new int32) {
   if (new+mutexLocked)&mutexLocked == 0 {
      throw("sync: unlock of unlocked mutex")
   }
   if new&mutexStarving == 0 {
      old := new
      for {
         // If there are no waiters or a goroutine has already
         // been woken or grabbed the lock, no need to wake anyone.
         // In starvation mode ownership is directly handed off from unlocking
         // goroutine to the next waiter. We are not part of this chain,
         // since we did not observe mutexStarving when we unlocked the mutex above.
         // So get off the way.
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // Grab the right to wake someone.
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
      // Starving mode: handoff mutex ownership to the next waiter, and yield
      // our time slice so that the next waiter can start to run immediately.
      // Note: mutexLocked is not set, the waiter will set it after wakeup.
      // But mutex is still considered locked if mutexStarving is set,
      // so new coming goroutines won't acquire it.
      runtime_Semrelease(&m.sema, true, 1)
   }
}

可以发现,饥饿模式下,直接调用runtime_Semrelease(&m.sema, true, 1)直接唤醒等待协程。

如果是非饥饿模式,如果没有等待的goroutine,或者mutexLockedmutexStarvingmutexWoken有一个不为零(说明已经有其他goroutine在处理了),那么直接返回。

否则将等待的goroutine数目减一,并且置上mutexWoken标记,然后通过 runtime_Semrelease(&m.sema, false, 1)唤醒等待队列中的goroutine

2.4 sync.Mutex.TryLock()

Go 1.18版本中,官方极其不情愿地新增了TryLock方法:

func (m *Mutex) TryLock() bool {
   old := m.state
   if old&(mutexLocked|mutexStarving) != 0 {
      return false
   }

   // There may be a goroutine waiting for the mutex, but we are
   // running now and can try to grab the mutex before that
   // goroutine wakes up.
   if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
      return false
   }

   if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
   }
   return true
}

sync.Mutex.TryLock()的实现也比较简单:

  1. 在饥饿模式和锁定状态下,直接返回false
  2. 尝试加锁,加锁失败则返回false
  3. 成功就返回true

2.5 小结

互斥锁的加锁过程是比较复杂的:

  1. 如果互斥锁处于初始化状态,那么直接通过原子操作置位mutexLocked加锁;
  2. 如果互斥锁处于mutexLocked状态,并且在普通模式下工作,那么会进入自旋,执行30次PAUSE指令消耗CPU时间并且等待锁释放;
  3. 在正常模式下如果获取到锁,那么直接加锁成功;如果处于饥饿模式或者没有拿到锁,那么会通过runtime_SemacquireMutex将尝试获取锁的协程切换至休眠,等到锁的持有者唤醒;
  4. 如果休眠的协程等待超过1ms,那么锁会进入休眠模式,在休眠模式下,新进入竞争锁的协程会被放到获取信号量队列的尾部,被唤醒但是没有获取到锁的会放在头部,然后按照队列顺序获取锁,保证不会发生饥饿;
  5. 如果当前协程是互斥锁的最后一个等待的协程,或者是等地啊的时间小于1ms(其实还和该协程上一个循环是否置上starving标志有关),那么会退出饥饿模式。

互斥锁的解锁相对比较简单:

  1. 当互斥锁处于饥饿模式时,将等待的goroutine数目减一,并且置上mutexWoken标记,然后通过 runtime_Semrelease(&m.sema, false, 1)唤醒等待队列中的goroutine
  2. 当互斥锁处于正常模式时,如果没有协程等待锁的释放,或者锁已经被加锁、被唤醒、被其他协程置为饥饿,说明有其他协程在处理锁,这时候直接返回;否则通过runtime_Semrelease唤醒其他协程起来竞争锁。

3. 读写锁 RWMutex

和互斥锁不同的是,读写锁适用于读多写少的场景,其锁粒度如下:

YN
NN

其结构体如下:

type RWMutex struct {
   w           Mutex  // held if there are pending writers
   writerSem   uint32 // semaphore for writers to wait for completing readers
   readerSem   uint32 // semaphore for readers to wait for completing writers
   readerCount int32  // number of pending readers
   readerWait  int32  // number of departing readers
}
  • w:复用互斥锁提供的能力;
  • writerSemreaderSem:分别用作写等待读和读等待写;
  • readerCount:存储当前正在执行的读操作数量;
  • readerWait:表示当写操作被阻塞时需要等待的读操作个数;

3.1 写加锁 sync.RWMutex.Lock

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   // First, resolve competition with other writers.
   rw.w.Lock()
   // Announce to readers there is a pending writer.
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
   // Wait for active readers.
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
      race.Acquire(unsafe.Pointer(&rw.writerSem))
   }
}

可以看到,在写锁锁定的时候,首先使用rw.w.Lock()操作保证写操作锁定。

然后以下操作将readerCount字段减小一个极大值(rwmutexMaxReaders = 1 << 30),返回的结果再加上这个极大值,如果返回的r == 0,说明readerCount原本就是0,此时无读操作。

r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

如果r != 0,说明此时肯定有读操作占据着锁,那么将readerWait通过原子操作加上r的大小,然后通过runtime_SemacquireMutex(&rw.writerSem, false, 0)使得该写操作协程陷入休眠等待唤醒。

3.2 写解锁 sync.RWMutex.Unlock

func (rw *RWMutex) Unlock() {
   if race.Enabled {
      _ = rw.w.state
      race.Release(unsafe.Pointer(&rw.readerSem))
      race.Disable()
   }

   // Announce to readers there is no active writer.
   r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
   if r >= rwmutexMaxReaders {
      race.Enable()
      throw("sync: Unlock of unlocked RWMutex")
   }
   // Unblock blocked readers, if any.
   for i := 0; i < int(r); i++ {
      runtime_Semrelease(&rw.readerSem, false, 0)
   }
   // Allow other writers to proceed.
   rw.w.Unlock()
   if race.Enabled {
      race.Enable()
   }
}

首先,通过atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)将前面加锁时给readerCount加上的极大值给减回来,如果此时r >= rwmutexMaxReaders,说明并没有写加锁操作就直接解锁了,报panic

然后通过for循环,依次通过runtime_Semrelease(&rw.readerSem, false, 0)唤醒等待的读goroutine

最后再通过rw.w.Unlock()解锁。

3.3 读加锁 sync.RWMutex.RLock

func (rw *RWMutex) RLock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   if atomic.AddInt32(&rw.readerCount, 1) < 0 {
      // A writer is pending, wait for it.
      runtime_SemacquireMutex(&rw.readerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
   }
}

读加锁的操作比较简单,直接通过原子操作atomic.AddInt32(&rw.readerCount, 1)readerCount加1,如果返回值是小于0的,那说明此时已经有写锁已经操作,那么此时goroutine选择休眠,等待唤醒。

可以发现,如果在读锁之前有写锁操作,即使此时此刻写锁还没有获取到锁资源,那么后进入的读锁也是被阻塞挂起的。

3.4 读解锁 sync.RWMutex.RUnlock

func (rw *RWMutex) RUnlock() {
   if race.Enabled {
      _ = rw.w.state
      race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
      race.Disable()
   }
   if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
      // Outlined slow-path to allow the fast-path to be inlined
      rw.rUnlockSlow(r)
   }
   if race.Enabled {
      race.Enable()
   }
}

在解读锁时,直接将readerCount减1,如果返回值小于0,说明此时有写加锁操作在等待锁,所以通过rUnlockSlow函数进入慢解锁过程:

func (rw *RWMutex) rUnlockSlow(r int32) {
   if r+1 == 0 || r+1 == -rwmutexMaxReaders {
      race.Enable()
      throw("sync: RUnlock of unlocked RWMutex")
   }
   // A writer is pending.
   if atomic.AddInt32(&rw.readerWait, -1) == 0 {
      // The last reader unblocks the writer.
      runtime_Semrelease(&rw.writerSem, false, 1)
   }
}

首先是条件判断:

  • 如果r+1 == 0,也就是r == -1,即在解锁前,r == 0,很明显这时候是没有任何读加锁状态的,肯定是对未加读锁的锁进行了解锁操作;
  • 如果r+1 == -rwmutexMaxReaders,即r == -rwmutexMaxReaders - 1,即在解锁前,r == -rwmutexMaxReaders,这说明在解锁前有写锁操作了,但是肯定没有读锁操作,所以也是对未加读锁的锁进行了解锁操作。

然后则是要将写锁需要等待的读锁数readerWait减1,直到减到了0,说明写锁需要等待的读锁都释放结束,然后再通过runtime_Semrelease(&rw.writerSem, false, 1)唤醒写加锁协程。

3.5 小结

虽然读写锁的功能比互斥锁复杂,但是建立在互斥锁的基础上,其实现地反而更加简单:

  • 写锁之间通过互斥锁互相锁定,保证每次只有一个写锁操作和其他读操作协程竞争;
  • 读锁加锁时,很简单地通过原子操作给readerCount加1,只要结果大于等于0(等于0几乎不会出现),那么就直接获取锁;否则通过readerSem的信号量操作休眠;
  • 写加锁时,会直接给readerCount减去一个极大值,保证之后所有的读锁苦哈哈地为readerCount加1的操作得到的结果始终小于0,即陷入休眠,且记录在写锁之前获取到读锁的协程数量是readerWait
  • 而在读锁解锁时,会给readerCount减1,条件判断后再给readerWait减1,当readerWait减小到0时就可以给等待写锁加锁成功了;
  • 而写锁解锁时,则是先给readerCount把极大值加回来,然后记录返回的结果,这个结果就是在写锁持有锁期间阻塞的读锁数量,这时候在针对其一一唤醒;最后再解锁互斥锁。

简单来说,除了互斥锁那部分的逻辑,读写锁的设计就是原子操作+加减法计算

4. 小结

本篇博客总结了Go中互斥锁和读写锁的实现,需要注意的是,本文锁的粒度是针对于协程goroutine,和之前接触的针对系统线程的线程锁没有半毛钱关系。

5. 参考文献

6.2 同步原语与锁

多图详解Go的互斥锁Mutex 

Go并发编程(四) 深入理解 Mutex