likes
comments
collection
share

sync包(一)——Mutex/RWMutex

作者站长头像
站长
· 阅读数 33

sync,为synchronize的缩写,意为同步、并发协同,由此可见,sync包中大多为用于解决并发问题的API。本系列主要讲述MutexRWMutexOncePool以及WaitGroup部分内容。

Mutex实现

结构体

type Mutex struct {
   state int32
   sema  uint32
}

Mutex的结构只有两个字段:

  • state:用于标识当前锁的状态,所谓加锁,就是把该字段修改为某个值;解锁亦然
  • sema:用于处理沉睡、唤醒的信号量,依赖于runtime的函数调用

加锁

锁的实现一般都是依赖于以下两点:

  • 自旋作为快路径
  • 等待队列作为慢路径
func (m *Mutex) Lock() {
   // 一次性自旋,如果锁处于释放状态,则加锁成功,直接返回
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
         race.Acquire(unsafe.Pointer(m))
      }
      return
   }
   // 加锁失败,则进入慢路径
   m.lockSlow()
}

func (m *Mutex) lockSlow() {
   ...
   
   for {
      // 真正的自旋部分(自旋判断锁的状态,只有锁处于非饥饿状态且已被其他人持有的时候,才会执行)
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
         // 设置Woken标志位,以此来让Unlock方法不去唤醒其他goroutine
         if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
         }
         runtime_doSpin()
         iter++
         old = m.state
         continue
      }
      
      // 锁的状态计算...
      
      // 尝试占用锁
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
         // 跳出自旋后的一些处理
         if old&(mutexLocked|mutexStarving) == 0 {
             break // 通过上面的自旋,CAS已经抢到锁
         }
         ...
         // 没有抢到锁,将goroutine放入等待队列并执行相应的调度,会在这里进行阻塞
         runtime_SemacquireMutex(&m.sema, queueLifo, 1)
         // 更新饥饿标志位
         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
         old = m.state
         // 锁处于饥饿模式的一些处理
         if old&mutexStarving != 0 {
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
               throw("sync: inconsistent mutex state")
            }
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            if !starving || old>>mutexWaiterShift == 1 {
               delta -= mutexStarving
            }
            atomic.AddInt32(&m.state, delta)
            break
         }
         // 正常模式,重置迭代次数,重新执行自旋获取锁
         awoke = true
         iter = 0
      } else {
         old = m.state
      }
   }

   if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
   }
}

Lock方法中,首先会尝试直接加锁,如果没有人持有锁,也没有人抢占,那么一个CAS操作就能成功(这里为一次性自旋),否则进入慢路径——lockSlow方法。 在lockSlow方法中,会进入一个真正的自旋状态:首先会判断当前锁的模式,如果可以自旋,则进入在for循环中不断尝试获取锁。虽然这里是慢路径,但是这里的自旋依然执行的很快,因为这里并没有加入队列中。

值得一提的是,Go的锁有两种模式:正常模式饥饿模式

sync包(一)——Mutex/RWMutex

如上图所示,一个锁已经本地持有一个goroutine队列,如果锁进行释放时,刚好又来了一个新的goroutine —— G1,那么,Mutex该如何调度呢?

  • 从公平性讲,让G2持有锁,因为G2排在等待队列的队头,等待的时间最长
  • 从执行效率上讲,让G1G2竞争,但大概率G1会持有锁,因为G1作为刚刚申请锁的goroutine,还没有进入休眠状态,此时它是占有CPU的,而G2已经进入休眠队列并不占有CPU,唤醒G2让它与G1竞争会有goroutine的调度过程

go的正常模式采用的就是G1G2竞争锁的方式,但是如果每次G2G1竞争锁,锁都被新来的G1持有的话,G2就会陷入饥饿状态,所以,Mutex有了饥饿模式: 如果等待时间超过1ms,锁进入该模式,在该模式下,锁会由等待队列中的goroutine持有。

解锁

互斥锁的解锁过程 Mutex.Unlock 与加锁过程相比就很简单:

  • 没有其他goroutine竞争锁,直接解锁成功
  • 有其他goroutine竞争锁,进入慢路径解锁
func (m *Mutex) Unlock() {
   if race.Enabled {
      _ = m.state
      race.Release(unsafe.Pointer(m))
   }

   // 直接一个原子操作进行解锁(atomic),达到的效果与CAS是一样的
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      // 正常来讲,如果该锁只被一个goroutine持有,且等待队列为空,new的值会为0
      // new不为0,表示锁的等待队列中存在等待的goroutine或正在有goroutine自旋等待锁
      // 此时进入慢路径解锁(即对goroutine进行调度)
      m.unlockSlow(new)
   }
}

func (m *Mutex) unlockSlow(new int32) {
   if (new+mutexLocked)&mutexLocked == 0 {
      throw("sync: unlock of unlocked mutex")
   }
   // 此处进行判断是否处于饥饿模式
   if new&mutexStarving == 0 {
      old := new
      for {
         // 此处进行判断—— 锁的等待队列是否为空 || 锁是否被锁住(因为锁已经在Unlock方法中进行了释
         // 放,在这期间,极有可能已经有新来的goroutine重新持有了锁) || 是否已有唤醒的
         // goroutine(在上面的lock方法中,在自旋过程中,新来的goroutine会将唤醒标志位激活) || 是
         // 否处于饥饿模式(同样的如果唤醒的goroutine在竞争过程中产生了饥饿,则优先给当前goroutine)
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 唤醒goroutine
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
      // 处于饥饿模式,唤醒队头goroutine
      runtime_Semrelease(&m.sema, true, 1)
   }
}

RWMutex实现

结构体

type RWMutex struct {
   w           Mutex  
   writerSem   uint32 
   readerSem   uint32 
   readerCount int32  
   readerWait  int32  
}
  • w — 复用互斥锁提供的能力;
  • writerSem 和 readerSem — 分别用于写等待读和读等待写:
  • readerCount 存储了当前正在执行的读操作数量;
  • readerWait 表示当写操作被阻塞时等待的读操作个数

读锁

func (rw *RWMutex) RLock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   if atomic.AddInt32(&rw.readerCount, 1) < 0 {
      runtime_SemacquireMutex(&rw.readerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
   }
}

可以看到,读锁的加锁非常简单,直接对readerCount进行加法,如果返回负数表示已经有其他 goroutine 获得了写锁,当前 goroutine 就会陷入休眠等待锁的释放;如果结果为非负数表示没有 goroutine 获得写锁,当前方法会成功返回。

func (rw *RWMutex) RUnlock() {
   if race.Enabled {
      _ = rw.w.state
      race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
      race.Disable()
   }
   if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
      rw.rUnlockSlow(r)
   }
   if race.Enabled {
      race.Enable()
   }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
   if r+1 == 0 || r+1 == -rwmutexMaxReaders {
      race.Enable()
      throw("sync: RUnlock of unlocked RWMutex")
   }
   // 对写锁之前的读锁数量减一,当数量为0的时候,出发信号量唤醒写锁的 goroutine
   if atomic.AddInt32(&rw.readerWait, -1) == 0 {
      runtime_Semrelease(&rw.writerSem, false, 1)
   }
}

读锁的解锁也较为简单,与加锁类似,解锁就是对readerCount做减法,返回负数表示已有其他 goroutine 获得写锁,调用 rUnlockSlow 方法作进一步处理;返回的结果为正数,则正常解锁

写锁

func (rw *RWMutex) Lock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   // 加写锁,防止其他 goroutine 抢占
   rw.w.Lock()
   // 通知后续的读锁已经有写锁进入,后续读锁需要休眠
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
   // 等待写锁前面的读锁完全释放,没有全部释放则写锁的 goroutine 陷入休眠
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
      race.Acquire(unsafe.Pointer(&rw.writerSem))
   }
}

写锁的加锁主要复用Mutex的加锁逻辑,同时兼顾对读锁的调度

func (rw *RWMutex) Unlock() {
   if race.Enabled {
      _ = rw.w.state
      race.Release(unsafe.Pointer(&rw.readerSem))
      race.Disable()
   }

   // 计算写锁后面读锁等待的数量
   r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
   if r >= rwmutexMaxReaders {
      race.Enable()
      throw("sync: Unlock of unlocked RWMutex")
   }
   // 将后面等待的读锁全部唤醒
   for i := 0; i < int(r); i++ {
      runtime_Semrelease(&rw.readerSem, false, 0)
   }
   // 解锁
   rw.w.Unlock()
   if race.Enabled {
      race.Enable()
   }
}

写锁的解锁与加锁正好相反,解锁之后,在 for 循环中将后面的读锁全部唤醒。 获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死。

锁的应用

Mutex可以看做是锁,RWMutex是读写锁,一般的用法是将Mutex或者RWMutex和需要被保护住的资源封装在一个结构体中,当多个goroutine同时访问该资源时,利用Mutex或者RWMutex锁住。当然,使用锁的时候,优先使用RWMutex。 例如,下面是使用读写锁,对map的一个封装:

type SafeMap struct {
   m     map[string]string
   mutex sync.RWMutex
}

func (s *SafeMap) LoadOrStore(key string, newVale string) (val string, loaded bool) {
   // 取值
   s.mutex.RLock()
   val, ok := s.m[key]
   s.mutex.RUnlock()
   if ok {
      return val, true
   }
   // 存值
   s.mutex.Lock()
   defer s.mutex.Unlock()
   // DoubleCheck
   val, ok = s.m[key]
   if ok {
      return val, true
   }
   s.m[key] = newVale
   return newVale, false
}

在上面的代码中,将读写锁与map放在了一起,组合成了SafeMap,实现了LoadOrStore方法,在该方法中,先加读锁用于判断map中是否存在key,根据返回结果进行进一步处理:存在则返回值,不存在则将新的值存入map。

需要注意的是,LoadOrStotre中采用了DoubleCheck的策略,即在存值之前,再次检查一遍map中是否有存入key,这样做的目的是为了处理并发问题:在并发情况下,如果map中没有key的值,有可能有多个goroutine都进入了存值阶段,因锁竞争阻塞在存值阶段,最终顺序执行map的写入操作(即使前面已经有goroutine写入成功,后面的goroutine也会对其进行覆盖);因此引入DoubleCheck策略,防止map的重复写入问题。

参考资料

draveness.me/golang/docs…