likes
comments
collection
share

简要说明go的sync.mutex是啥样的

作者站长头像
站长
· 阅读数 12

先说两个相关的东西

    第一个是信号量:     所谓信号量又叫信号标,和大众认知的信号不一样,他是一个值,用于保持在0至指定最大值之间的一个计数值。所以这玩意可以简单的理解为一个计数器,信号量的作用可以简单的当做——当计数器达到一个你预定的值的时候发出“到了”这个信号,通俗的来说,就是一个具有"信号功能"的"变量"。     第二个就是PV原语: 锁被称为一种同步原语,锁的操作也被称为PV原语操作,这啥意思呢,这其实是一个玩意衍生出来的东西。P就是 single,V 是wait,刚才说信号量是一个数值,那么P就是给这个数值加数值的操作,V自然就是减数值操作,对信号量进行加减的操作叫PV操作。     所以,简单的来说,我们用一个数值来表示一个状态,对这个数值进行操作叫PV原语操作,数值反应的这个状态是一个标志或者说信号,不同的数值代表不同的信号。     通俗的来讲,你喊你朋友出去玩,你朋友说太早了,现在才6点,等8点再叫我。用信号量和pv原语来说,就是时间是一个信号量,当P操作把6加到8,那么就到了一个你朋友设定的临界区,这时候就向你朋友发信号,8点了,走起~     而放到go的mutex锁里面,信号量的作用就是通知操作锁的协程是等待、休眠 还是 唤醒。而实现这个计数器的就是sema锁

先来看下sync.mutex的结构

type Mutex struct {
    //互斥锁的状态,一个4字节的uint32位数,不同的位表示不同的意思
	state int32
    //sema锁,底层是semaroot
	sema  uint32
}

这里面有2个元素。 第一个就是state: state 状态是一个int32的整数,具体来说,这个整数的低三位表示锁的状态,可以分别是0【000】(未加锁)、1【001】(已加锁)、2【010】(饥饿模式)或3【100】(唤醒模式)。高29位表示当前锁的等待队列中的goroutine数量。 简要说明go的sync.mutex是啥样的 从低位到高位分别是:

  • mutexLockedstate中的低1位,用二进制表示为0001,它代表该互斥锁是否被加锁。
  • mutexWoken低2位,用二进制表示为0010,它代表互斥锁上是否有被唤醒的goroutine。
  • mutexStarving低3位,用二进制表示为0100,它代表当前互斥锁是否处于饥饿模式。
  • state剩下的29位用于统计在互斥锁上的等待队列中goroutine数目(waiter)。 第二个是sema锁:

        sema锁是一个uint32,但是实际上底层是一个semaRoot结构体

type semaRoot struct {
	lock  mutex
	treap *sudog // root of balanced tree of unique waiters.
	nwait uint32 // Number of waiters. Read w/o the lock.
}

关于这个semaRoot其实也可以多说一些,主要是semaRoot并不是直接拿来用的,而是在runtime包里,项目启动的时候,会有一个全局变量semaTable用来存储 semTabSize = 251个semaRoot节点(AVL树)的数组。(为什么是251,go的文档里是做过解释的,因为要节省内存,同时go不提倡使用颗粒度太大的锁)

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
 // 跟节点
 root semaRoot
 // cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{}) 这一行的意思是处理伪共享,无需太关注伪共享,其实就是为了填充cpu的L1 L2 L3 缓存,就像内存对齐一样,cpu的缓存也需要对齐
 pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

使用的时候,每次程序需要一个sema锁,sema锁里的等待的协程g会被包装成sudog,那么就会通过sema字段的地址计算映射到sematable中的某个平衡树上,找到对应的节点。

semaTable数组作为一个全局变量,所有休眠的g信息都保存在这个semaTable中,semaTable原理类似HashMap,其固定大小为251Hash方法为将addr的值右移3位再除以251,sudogvalue。所有hash到同一个槽的value则再通过Treap(树堆)数据结构保存。每个Treap节点又是一个链表,表示等待这个信号量的所有结构体。

        而映射的过程就在sema.go 的 semroot 方法里。

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    ...

	s := acquireSudog()
	root := semroot(addr)

    ...
}
func semroot(addr *uint32) *semaRoot {
	return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

每次使用sema的时候,都会根据seam的内存地址去分配一个semtable里的节点给它。而所有阻塞在sema里面的队列协程g,都会存储在该sema对应semtable节点的semaroot结构体里面的treap树中。


饥饿模式

饥饿模式要单独拉出来先提一下,因为后面的各种加锁解锁都绕不开饥饿模式。 在上面的mutex状态中可以得知,mutex的一个指标是 mutexStraving,这就是锁是否处于饥饿状态。

所谓饥饿模式是针对正常模式来说的,正常模式下,锁的等待就是按照先进先出(FIFO)的顺序获取的。但是这里有个问题,就是锁进来的时候不会立刻进入到等待队列,而是先尝试拿锁,那么等待队列里面的Gorouting还需要一个唤醒的过程,那刚被唤醒的Gorouting明显要比新来的慢,大概率拿不到锁,然后就产生了锁的“饥饿”。

为了避免这种锁饥饿的情况,就产生了另一种模式,饥饿模式:当Gorouting超过1ms没有获取到锁(别小看1ms,对人来说很快,对计算机来说算慢的了,通常你感受到的电脑慢是因为网络慢或者硬盘IO堵塞导致),他就会标记mutexStaving,然后让锁进入饥饿状态,使得所有新来的G都直接进入队列,让锁进入严格的先入先出的顺序,以此来防止部分g被饿死。

这部分具体操作后面在讲加锁和解锁的时候再细说。

MUTEX 加锁

其实锁的过程是比较简单的:

简要说明go的sync.mutex是啥样的 以go1.20版本为例子(其他版本只要不是很老的,都差不多):

func (m *Mutex) Lock() {
	// 最理想的状态,当前没有任何其他goroutine持有该锁,当前协程将快速获取
	// CAS修改mutex的state的mutexLocked 位,直接上锁
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// 这是竞争检测工具,一般普通开发者无需关心
		// 他的功能就是获取锁成功后,往竞争检测器里上报这个锁已经被获取了
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// 如果快速获取失败,即有其他 Goroutine 已经获取了互斥锁,
	// 就会执行这个慢速获取,即调用 m.lockSlow() 方法
	m.lockSlow()
}

func (m *Mutex) lockSlow() {
	// 等待开始时间,用于记录互斥锁的等待时间,这个时间和饥饿模式有关
	var waitStartTime int64
	// 是否处于饥饿模式,默认值是false
	starving := false
	// 当前操作锁的goroutine是否处于唤醒状态,等待的时候会处于休眠状态
	awoke := false
	// 获取锁的时候自旋的次数
	iter := 0
	// 复制锁的当前状态
	old := m.state
	// 循环尝试获取锁
	for {
		// 检查是否可以自旋等待
		// 如果处于锁定状态(mutexLocked 被其他g通过lock方法的cas操作修改成0),且锁不处于饥饿状态 (mutexStarving不等于true),且自旋次数没有超额 runtime_canSpin(iter),则可以进行自旋等待
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 非唤醒状态,且,锁的唤醒标志位也是非唤醒状态,
			// old>>mutexWaiterShift 是向右移3位,得到等待这把锁的g的数量不为空(!=0),
			// 然后cas设置锁的唤醒标志位为1,最后设置awoke为true
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			// 执行自旋等待的操作
			runtime_doSpin()
			// 自旋次数+1
			iter++
			// 更新old状态
			old = m.state
			continue
		}
		// 自旋超过规定的数量后,就无法继续自旋,自旋次数是active_spin,默认值是4
		new := old
		// 检查是否处于饥饿状态,如果是非饥饿状态,给mutex上锁
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		// 如果处于饥饿状态或者mutex已经被上锁,则当前的g进入等待队列,等待数量+1
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// 如果处于饥饿状态,且已经被上锁 则new被设定为饥饿状态
		// 就是说,如果上一个g释放锁的时候,锁已经处于饥饿状态,
		// 这个状态会持续下去,他最终会在所有等待的g获取锁并且释放锁成功后,才能取消饥饿状态
		// 换句话说,饥饿状态只要不停的有新g进来,等待队列一直有g,那么就一直处于饥饿状态
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		// 处于唤醒状态。
		// 这个唤醒状态是指锁的唤醒状态,也可以说是当前g被唤醒
		// 但是严谨的说,这其实是锁的唤醒状态被设置成1了
		// 唤醒状态主要是说至少有一个等待的g被唤醒了,至于能不能拿到锁还得看后续操作
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
		// cas 更新mutex的state
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break 
			}
			// 判断等待时间是否为0
			queueLifo := waitStartTime != 0
			// 如果等待时间为0,则说明为第一次等待,记录当前时间为等待开始时间
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			// 让g阻塞休眠,休眠于sema的队列里面
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// 计算等待时间是否超时,starvationThresholdNs是1e6纳秒,等于1毫秒
			// 也就说当等待时间超过1ms,mutex进入starving状态,开启饥饿模式
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// 这里有个判断,第一个条件是old&(mutexLocked|mutexWoken) != 0
				// mutexLocked  和  mutexWoken  是互斥锁状态的标志位。
				// 如果  old  的状态中包含这两个标志位中的任意一个,就表示互斥锁处于锁定状态或已经被唤醒。 

				// 第二个条件是:old>>mutexWaiterShift == 0 
				// 这个条件判断互斥锁的状态中等待者数是否为0。 mutexWaiterShift  是等待者数的位移值,用于在互斥锁的状态中存储等待者数。如果  old  的状态中等待者数为0,就表示当前没有任何等待者。 

				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 这里比较抽象,是两个操作,第一个是给state加上mutexLocked,意思是上锁
				// 第二个操作是给state加上  -1<<mutexWaiterShift, 意思是等待g的数量-1
				// 但是这里具体是怎么算的,我还没完全想明白
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				// 如果不在饥饿模式,或者old>>mutexWaiterShift算出等待锁的g的数量为1,说明这是最后一个等待的g
				if !starving || old>>mutexWaiterShift == 1 {
					// 退出饥饿模式
					// 这里多说一句,官方认为饥饿模式效率差,而且容易引起死锁,因此,在锁的设计上是尽量用正常模式,因此在普通开发的过程中,要减少锁的颗粒度,尽量在关键地方锁,然后快速释放,避免长时间占用
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

MUTEX 解锁

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// 原子操作,给state 加上  -mutexLocked 去掉加锁状态
	// 这时候其实已经算释放锁了
	new := atomic.AddInt32(&m.state, -mutexLocked)
	// 如果new不是0,说明state有其他情况,比如还有其他g在等待锁
	if new != 0 {
		// 调用unlockSlow
		m.unlockSlow(new)
	}
}
func (m *Mutex) unlockSlow(new int32) {
	// new+mutexLocked结果是否为0,如果为0,则表示一个未加锁的互斥锁进行了解锁,直接报错
	if (new+mutexLocked)&mutexLocked == 0 {
		fatal("sync: unlock of unlocked mutex")
	}
	// 如果不处于饥饿模式
	if new&mutexStarving == 0 {
		old := new
		for {
			// 如果 等待互斥锁的g的数量为0
			// 或者 互斥锁处于mutexLocked|mutexWoken|mutexStarvin 这个状态
			// 这里为什么再次有饥饿状态,是因为当前执行该函数的g是释放了锁,但是其他g可能这时候已经获取锁了
			// 所以这里判断其他g获取锁之后就return掉了
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 提前设置新的state状态,让state中等待数量-1,进入唤醒
			// 注意~这里没有更新state状态,只是提前计算好标量
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			// cas 修改互斥锁状态,这里才是把刚才的标量给算进去
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 成功后从sema里唤醒一个协程
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// 处于饥饿模式,那么就直接从sema的队列里唤醒下一个g,直到饥饿模式退出
		runtime_Semrelease(&m.sema, true, 1)
	}
}