likes
comments
collection
share

Go 同步原语 sync.Mutex 源码详解

作者站长头像
站长
· 阅读数 12

一、概述

互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。

golang的sync.mutex就是一种互斥锁的实现。

Go 号称是为了高并发而生的,在高并发场景下,势必会涉及到对公共资源的竞争。互斥锁是在并发程序中对共享资源进行访问控制的主要手段。对此 Go 语言提供了简单易用的 Mutex。Mutex 和 Goroutine 合作紧密,概念容易混淆,一定注意要区分各自的概念。

二、原理

mutex 的源码主要是在 src/sync/mutex.go文件里。在源码里可以看到如下的解释:

// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.

这段解释是整个mutex的核心(划重点,面试必考),整个mutex的大体实现思路就是这样。翻译一下,大致如下

// 互斥锁的公平性。
//
// 互斥锁有两种运行模式:正常模式和饥饿模式。
// 在正常模式下,请求锁的goroutine会按 FIFO(先入先出)的顺序排队,依次被唤醒,但被唤醒的goroutine并不能直接获取锁,而是要与新请求锁的goroutines去争夺锁的所有权。
// 但是这其实是不公平的,因为新请求锁的goroutines有一个优势:他们正在cpu上运行且数量可能比较多,所以新唤醒的goroutine在这种情况下很难获取锁。在这种情况下,如果这个goroutine获取失败,会直接插入到队列的头部。
// 如果一个等待的goroutine超过1ms时仍未获取到锁,会将这把锁转换为饥饿模式。
//
// 在饥饿模式下,互斥锁的所有权会直接从解锁的goroutine转移到队首的goroutine。
// 并且新到达的goroutines不会尝试获取锁,会直接插入到队列的尾部。
//
// 如果一个等待的goroutine获得了锁的所有权,并且满足以下两个条件之一:
// (1) 它是队列中的最后一个goroutine
// (2) 它等待的时间少于 1 毫秒(hard code在代码里)
// 它会将互斥锁切回正常模式。
//
// 普通模式具有更好的性能,因为即使有很多阻塞的等待锁的goroutine,一个goroutine也可以尝试请求多次锁。
// 而饥饿模式则可以避免尾部延迟这种bad case。 

从这段解释中可以得出:

  1. go中的mutex是一把公平的锁。
  2. 互斥锁有两种运行模式:正常模式和饥饿模式。

并详细解释了两种模式的运行原理。

后面会讲到这两种模式。

三、数据结构

在源码中,有这样一段解释:

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
//
// In the terminology of the Go memory model,
// the n'th call to Unlock “synchronizes before” the m'th call to Lock
// for any n < m.
// A successful call to TryLock is equivalent to a call to Lock.
// A failed call to TryLock does not establish any “synchronizes before”
// relation at all.

// Mutex是一种互斥锁。
// Mutex的零值是一个未锁定的互斥锁。
//
// Mutex在首次使用后不得复制。
//
// 在Go存储器模型的术语中,
// 第n次解锁调用“同步于”第m次锁定调用
// 对于任何n<m。
// 对TryLock的成功调用相当于对Lock的调用。
// 对TryLock的失败调用未建立任何“之前同步”
// 关系。

type Mutex struct {
	state int32
	sema  uint32
}

sync.mutex的数据结构非常简单,只有两个字段,

  1. state字段就是这把mutex的状态,二进制低3位对应锁的状态,将state右移3位代表mutex的数量。
  2. sema(信号量)用来唤醒goroutine。

这里有两个注释非常重要:

  1. 零值保证,sync.mutex符合golang里的0值语义,即sync.mutex的0值就是一把开启状态的互斥锁。
  2. mutex 数据结构禁止拷贝,拷贝mutex的结果是未定义的。

四、关键常量

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota

	// Mutex fairness.
	//
	// Mutex can be in 2 modes of operations: normal and starvation.
	// In normal mode waiters are queued in FIFO order, but a woken up waiter
	// does not own the mutex and competes with new arriving goroutines over
	// the ownership. New arriving goroutines have an advantage -- they are
	// already running on CPU and there can be lots of them, so a woken up
	// waiter has good chances of losing. In such case it is queued at front
	// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
	// it switches mutex to the starvation mode.
	//
	// In starvation mode ownership of the mutex is directly handed off from
	// the unlocking goroutine to the waiter at the front of the queue.
	// New arriving goroutines don't try to acquire the mutex even if it appears
	// to be unlocked, and don't try to spin. Instead they queue themselves at
	// the tail of the wait queue.
	//
	// If a waiter receives ownership of the mutex and sees that either
	// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
	// it switches mutex back to normal operation mode.
	//
	// Normal mode has considerably better performance as a goroutine can acquire
	// a mutex several times in a row even if there are blocked waiters.
	// Starvation mode is important to prevent pathological cases of tail latency.
	starvationThresholdNs = 1e6
)

可以看到源码中用到了iota去初始化各个常量(ps:这里吐槽一下,iota这玩意出现在语言的设计里挺离谱的,尤其是在const里各种iota搭配起来用,对读代码的人,确实是一项挑战)

注解如下:

常量值注解
mutexLocked = 1 << iotamutex是一个加锁状态,对应0b001
mutexWoken是否有goroutine被唤醒or新来的goroutine,在尝试获取锁, 对应0b010
mutexStarving当前锁处于饥饿状态, 对应0b10
mutexWaiterShift = iotawaiter的数量移位,通过mutex.state>> mutexWaiterShift可以得到waiter的数量。
starvationThresholdNs = 1e6这里就是上文中提到的转换成饥饿模式的时间限制,在源码里写死为1e6 ns,也就是1ms)

五、接口

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()
	Unlock()
}

sync.Mutex就实现了上述的这个接口(sync.Locker),值得一提的是后续会讲到的sync.RWMutex也实现了这个接口。

同时sync.Mutex又有自己的包内私有(小写开头)的函数,在下文会详细讲到。

func (m *Mutex) lockSlow() {
    ...
}

func (m *Mutex) unlockSlow(new int32) {
    ...
}

而mutex还有一些runtime的调用,这些函数的具体实现在runtime里,分别是如下几个函数。

//确定当前goroutine是否可以自旋(因为自旋是非常消耗cpu的,所以对自旋操作也有一定的次数限制)
func runtime_canSpin(i int) bool

//执行自旋操作
func runtime_doSpin()

//获取当前毫秒时间戳
func runtime_nanotime() int64

//信号量的实现,对应信号量的P原语操作,s的值代表信号量,包含一个fifo等待队列,如果lifo为true,则放入队首。skipframes只有在开启tracing时有效。
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

//信号量的实现,对应信号量的V原语操作,如果handoff为true,则会将锁直接转移给队首goroutine
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

上述自旋锁、信号量、以及后文中会提到的原子操作,目前只需要知道函数的作用即可。

六、方法

func (*Mutex) Lock

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

Lock 方法锁住 m,如果 m 已经加锁,则阻塞直到 m 解锁。

func (*Mutex) Unlock

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

Unlock 方法解锁 m,如果 m 未加锁会导致运行时错误。

注意

  • 在一个 goroutine 获得 Mutex 后,其他 goroutine 只能等到这个 goroutine 释放该 Mutex
  • 使用 Lock() 加锁后,不能再继续对其加锁,直到利用 Unlock() 解锁后才能再加锁
  • 在 Lock() 之前使用 Unlock() 会导致 panic 异常
  • 已经锁定的 Mutex 并不与特定的 goroutine 相关联,这样可以利用一个 goroutine 对其加锁,再利用其他 goroutine 对其解锁
  • 在同一个 goroutine 中的 Mutex 解锁之前再次进行加锁,会导致死锁
  • 适用于读写不确定,并且只有一个读或者写的场景

七、计数器小案例

package main
import (
	"fmt"
	"sync"
)
var wg sync.WaitGroup
// 定义计数器
type SafeCounter struct {
	mu sync.Mutex
	v  map[string]int64
}
// 计数器生成
func (s *SafeCounter) Inc(key string) {
	defer wg.Done()
	// 加锁
	s.mu.Lock()
	s.v[key]++  // 对计数器数值 增值+1
	// 解锁
	s.mu.Unlock()
}
// 获取计数器
func (s *SafeCounter) Value(key string) int64 {
	defer wg.Done()
	// 加锁
	s.mu.Lock()
	d := s.v[key]
	// 解锁
	s.mu.Unlock()
	return d
}
func main() {
   // 初始化计数器
	c := SafeCounter{v: make(map[string]int64)}
	// 开启1000个数生成计数器
	for i := 0; i < 1000; i++ {
		wg.Add(i)
		go c.Inc("aaa")
	}
	// 打印计数器
	fmt.Println(c.Value("aaa"))
}

八、参考

九、 结束语

本篇文章介绍说明了:

什么是 sync.Mutex

sync.Mutex 的原理

sync.Mutex 接口定义

sync.Mutex 方法实现

sync.Mutex 案例

希望本篇文章对你有所帮助,谢谢。