Java互斥锁 与 Golang互斥锁 简单对比
下面会对 Java 和 Golang 互斥锁的源码做简单的解读
Golang 互斥锁,指的是sync.Mutex
包的Lock()
与Unlock()
的实现
互斥锁代码例子。基于 go-1.20.3
func main() {
var mutex sync.Mutex
counter := 0
// 启动多个goroutine进行并发操作
for i := 0; i < 5; i++ {
go func() {
// 加锁
mutex.Lock()
defer mutex.Unlock()
counter++ // 访问共享资源
fmt.Println("Counter:", counter) // 打印结果
}()
}
fmt.Scanln() // 等待所有goroutine执行完成
}
结果:
Counter: 1
Counter: 2
Counter: 3
Counter: 4
Counter: 5
1. Mutex.Lock()
的实现
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// 是否开启数据竞争检测,作用是帮助你发现并解决并发程序中的潜在问题
if race.Enabled { // 默认为false,开启命令:go build -race your_program.go
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
1.1. 首先会进行atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)
判断,使用原子操作尝试将m.state
从未锁定(0)修改为锁定状态 1(mutexLocked)。先来看看m.state
是什么,m
是Mutex
结构体,state
是 32 位二进制 bit 位,每一位都用 0 和 1 标记一个状态。
type Mutex struct {
state int32
sema uint32 // 此信号量用于阻塞和唤醒等待锁的goroutine。
}
1.2. 接下来再看看 32 个 bit 位,每一个的作用是什么
1.3. 所以atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)
要想成功,必须state
的 32 位全为 0,意味着这个锁在第一次进来才会成功,第一次mutexWaiterShift
才能是0
1.4. 如果获取不到则会调用m.lockSlow()
2. lockSlow()
的实现,lockSlow()
方法有点长,直接分段解释,代码会分成4段
-
- 定义方法变量
- cas + 自旋,尝试自选获取锁
- 更改标志位
- 协程阻塞
2.1. 第一段,定义方法变量
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
...
}
waitStartTime
:当前goroutine
如果抢不到锁被挂起(进入了等待队列),记录被挂起的开始时间,用来记录抢这把锁要用多少时间(先记个开始时间)。starving
:是否处于饥饿模式。下面会说这个值什么时候会改成true
。awoke
:是否有被唤醒goroutine
。意思是当前有没有goroutine
正在抢这把锁,如果有就会让这个goroutine
获取这把锁。iter
:记录自旋的次数,自旋 4 轮后就不会再进入自旋。old
:记录一开始m.state
的值,就是锁的原始状态
2.2. 第二段,cas + 自旋,尝试自选获取锁
func (m *Mutex) lockSlow() {
...
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true // 把 mutexWoken 设为1,表示当前 goroutine 处于被唤醒
}
runtime_doSpin() // 执行自旋
iter++ // 增加迭代次数
old = m.state // 更新局部变量锁的状态
continue
}
...
}
2.2.1. 先看if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter)
mutexLocked
是常量1,二进制0001,mutexStarving
是常量4,二进制0100,mutexLocked|mutexStarving
:0101old&(0101)
:只保留 old 的 mutexLocked 位和 mutexStarving 位。old&(0101) == mutexLocked
,意味着old的mutexStarving位 == 0
&&old的mutexLocked位 == 1
,处于正常模式 && 锁已经被获得。runtime_canSpin
:是否可以继续自旋,为 true 的条件是下列全满足
-
- 自旋少于4次
- 不是单核CPU
- 当前 p 队列没有等待执行的 G。
2.2.2. 接着if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)
!awoke
:当前 goroutine 处于被唤醒,awoke 初始化为 false,第一次会命中。old&mutexWoken == 0
:把 old 的 mutexWoken 位拿出来是否等于0,现在没有其他 goroutine 被唤醒。old>>mutexWaiterShift != 0
:mutexWaiterShift 等于常量3,old 右移3位相当于取等待的 goroutine 的数量,最终判断等待队列中不能有等待的 goroutine。atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)
:如果上面的成立,就会用把old
更新为old+mutexWoken
。表示当前 goroutine 被唤醒。- 最后把局部变量
awoke = true
。
2.3. 第三段,更改标志位
func (m *Mutex) lockSlow() {
...
for {
...
new := old
// 取出 mutexStarving 的标识位 == 正常模式
if old&mutexStarving == 0 {
new |= mutexLocked // 把最右侧 bit 位改 1,给new变量加锁
}
// 取出 mutexLocked 和 mutexStarving 标识位,如果有一位不等于0, 意味着锁已经被获取,或者处于饥饿模式
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 增加等待者数量
}
// 如果局部变量 starving(当前协程) 处于饥饿模式,并且锁已经被获取
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 给new变量切换到饥饿模式
}
// 如果当前goroutine处于被唤醒,意味着命中了自旋时候的当前goroutine是否被唤醒的if,注意只会有一个goroutine处于被唤醒状态,不是当前goroutine就是其他goroutine。
if awoke {
if new&mutexWoken == 0 { // 检查被唤醒标志位是否为0,为0就是没有goroutine被唤醒,则报错
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // &^=是位清除操作,把 mutexWoken 置为0,为什么要置0,因为当前goroutine准备去阻塞了,当前goroutine不再是被唤醒的goroutine了
}
...
}
}
2.4. 第四段,协程阻塞
2.4.1. 正常模式 && 成功获取锁
func (m *Mutex) lockSlow() {
...
for {
...
// 把上面设置好的标志位new更新到old(不代表加锁成功,只代表 m.state 在当前 goroutine 执行lockSlow() 期间没有其他 goroutine 率先更改成功)
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// old的锁为0 && old处于正常模式,则获取锁,并退出循环。意味着上一次的锁状态已经无锁状态(锁被释放了) + 正常模式
if old&(mutexLocked|mutexStarving) == 0 {
break
}
...
} else {
old = m.state
}
}
}
2.4.2. 饥饿模式 || 获取锁失败
func (m *Mutex) lockSlow() {
...
for {
...
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime() // 记录阻塞的开始时间
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // waitStartTime有值添加到队列头部,waitStartTime为0添加到等待队列的尾部
// 之前是饥饿模式 || 等待时间>1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// old如果是饥饿模式,当前goroutine必是抢到锁的goroutine,因为饥饿模式是顺序唤醒的
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// mutexLocked置1
// 1<<mutexWaiterShift,等待队列的数量要-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果starving为false(意味着当前 goroutine 的等待时间少于1ms),或者等待队列中只有一个协程,则退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta) // 更新锁的状态
break
}
// 如果是正常模式,所以还需要进入下一个for循环,和自旋的 goroutine 进行竞争
awoke = true // 进入下一轮for前,标识当前 goroutine 是被唤醒的
iter = 0 // 自旋次数置 0
}
}
}
3. 最后是 Unlock() 的实现
func (m *Mutex) Unlock() {
// 是否开启数据竞争检测
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 先尝试把 m.state 加 -1,意味着把m.state的mutexLockedzhi置0,new是改完后的m.state
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 { // 改完后的m.state的标志位不是全为0,进入unlockSlow,意味着-1后还需要处理一些事情,这是事情其实就是需不需要在这里去唤醒一个goroutine去抢锁。
m.unlockSlow(new)
}
}
3.1. unlockSlow() 的实现
func (m *Mutex) unlockSlow(new int32) {
// 把m.state+1再取出mutexLocked位,如果等于0,意味着之前-1的时候mutexLocked位是0,就是没有被锁的时候调用解锁,直接报错
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 正常模式
old := new
for {
// 等待队列 == 0 || 三个标志位有一个不是0,都return
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 能来到这里,意味着等待队列有goroutine,但是3个特殊标志位都是0,这时候就需要唤醒等待队列的一个goroutine去抢锁了
// 1<<mutexWaiterShift = 二进制100
// 意味着等待队列数量-1 + 是否有唤醒goroutine标志位置1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1) // 唤醒等待队列首个goroutine
return
}
old = m.state // cas失败
}
} else {
runtime_Semrelease(&m.sema, true, 1) // 饥饿模式,直接唤醒
}
}
再来看看 Java 的互斥锁
互斥锁代码示例。基于 jdk-17.0.6
public class MutexLock {
static int counter = 0;
static ReentrantLock mutexLock = new ReentrantLock(); // 非公平锁
public static void main(String[] args) throws InterruptedException {
IntStream
.range(0, 5)
.forEach(i ->
CompletableFuture.runAsync(() -> {
// 线程执行
mutexLock.lock();
try {
System.out.println(++counter);
} finally {
mutexLock.unlock();
}
}));
Thread.sleep(1000); // 等待所有线程执行完毕
}
}
输出:
1
2
3
4
5
1. ReentrantLock.lock() 的实现
public void lock() {
sync.lock();
}
@ReservedStackAccess
final void lock() {
if (!initialTryLock())
acquire(1);
}
1.1. 调用的是sync.lock()
,首先会进入initialTryLock()
方法。
2. initialTryLock()
的实现
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // cas 设置状态位
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) { // 如果设置状态位失败,判断锁是否已经被当前线程所拥有
int c = getState() + 1; // 重入次数+1
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false; // 获取锁失败
}
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
2.1. 可以看到initialTryLock()
只是简单的做了 cas 操作去尝试获取锁,获取失败则返回 false
3. initialTryLock()
如果失败则调用acquire(1)
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) { // getState == 无锁 && cas(0,1)
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
3.1. acquire(1)
再次尝试做一次 cas,如果 cas 再失败则调用重载方法acquire(null, arg, false, false, false, 0L)
/**
* 主要的获取资源方法,被所有的公开获取资源方法调用。
*
* @param node 除非是一个重新获取Condition的,否则为null
* @param arg 获取资源的参数
* @param shared 如果是共享模式则为true,否则为独占模式
* @param interruptible 如果中断则退出并返回负值
* @param timed 如果为true则使用有时限的等待
* @param time 如果timed为true,那么这个就是超时的System.nanoTime值
* @return 如果成功获取资源则返回正值,如果超时则返回0,如果中断则返回负值
*/
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
...
}
先看acquire
方法参数的含义,所以acquire(null, arg, false, false, false, 0L)
表示的是无超时、不中断、独占模式。
4. 接着看acquire()
方法的实现
4.1. 先解释局部变量
final int acquire(...) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // spins需要自旋的次数,postSpins用来辅助spins初始次数的
boolean interrupted = false, first = false; // interrupted是否中断,first当前node是否是下一个被唤醒的节点
Node pred = null; // 当加入队列时,node的前节点
...
}
4.2. 进入for循环
final int acquire(...) {
...
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
...
}
}
4.2.1. 先看 for 里的第 1 个 if,这个表达可以拆成!first
、(pred = (node == null) ? null : node.prev) != null
、!(first = (head == pred))
三个部分
第一部分:first
表示node.pred==head
,first 初始化是 false。所以第一轮这个条件是成立。
第二部分:(pred = (node == null) ? null : node.prev) != null
-
-
- node 如果是 null,pred 是 null。
- node 如果不是 null,pred 是 node.prev。
- 一开始形参 node 是 null,所以第一轮这个条件不成立。
-
第三部分:!(first = (head == pred)))
-
-
- pred 不能是 head 节点。第一轮
pred=null
,head 是目前占用着锁的线程,所以第一轮这个条件成立
- pred 不能是 head 节点。第一轮
-
最终这个 if 要找的是 node.pred,并且 pred 不是头节点。
4.2.2. 如果 if 命中,再看if (pred.status < 0)
,pred.status < 0
表示 pred 节点获取锁的动作已经被取消,pred 不需要获取锁了,则调用cleanQueue()
从队列中删除已取消的节点,并在必要时唤醒可能的下一个有效请求者。
4.2.3. 如果pred.status < 0
不命中,再看else if (pred.prev == null)
,pred.prev == null
表示 pred 没有前节点。
-
- 说明 pred 是 head 节点,但是最外层 if 有
!(head == pred)
,pred 不能是 head 节点,所以在这期间有别的线程更改了 head 节点,此时很有可能是 pred 成为了 head,所以调用Thread.onSpinWait()
自旋等待,然后重新进入 for 循环这个方法会调用内联汇编的pause
。
- 说明 pred 是 head 节点,但是最外层 if 有
4.3. 第 2 个if
final int acquire(...) {
...
for (;;) {
...
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
...
}
}
4.3.1. first || pred == null
-
first
表示node.pred
是不是head
。pred == null
表示node==null
或者node.pred==null
,因为赋值语句是pred = (node == null) ? null : node.prev
-
-
- 在第一次进
acquire()
的时候,node=null
,所以说在第一次进入方法的时候,会尝试先抢占锁。 - 在
node
创建并执行下一轮for
的时候,node.pred
还没有被赋值,pred
也为null
,所以说一个线程进入acquire()
方法会先执行 2 次cas
操作,才会进入等待队列,加上执行acquire()
前也会进行 2 次cas
。所以说一个线程获取锁会先尝试做 4 次cas
,如果都失败才会进入等待队列,这与golang
的 4 次cas
是一样的。
- 在第一次进
-
如果成功进入if
,根据独占/共享模式尝试抢锁,假设是独占模式,最终调用tryAcquire(arg)
,tryAcquire(arg)
在上面代码已经给出,就是一个cas
尝试改锁标志位。
if (acquired)
如果抢成功,则把当前node
变更为head
。return 1,表示成功。
4.4. 最后一个if
final int acquire(...) {
...
for (;;) {
...
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
...
}
}
4.4.1. 先拆分第一个if
if (node == null) {
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
}
if (node == null)
表示首次进入acquire()
,创建一个新的node
。
4.4.2. 第二个else if
else if (pred == null) {
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t);
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null);
else
t.next = node;
}
else if (pred == null)
,表示node
还没加进队列,在下一次for
循环中,会设置pred
的值。
if (t == null)
-
- 如果尾节点是
null
,tryInitializeHead()
初始化节点。
- 如果尾节点是
else if (!casTail(t, node))
-
- 如果尾结点不是
null
,则用cas
把node
添加到尾部,如果添加尾部失败node
的前节点撤销 回null
,更新成功t.next = node
尾结点的next
就是node
- 如果尾结点不是
4.4.3. 第三个else if
else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
}
first==true
表示node.pred==head
即node
是队列第二个(第一个是正在使用锁的node
),并且自旋次数不等0,继续自旋。
4.4.4. 第四个else if
else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
}
node.status
是初始化状态,新建node
并且设置pred
后,就会进入这个if
,将node.status
设置为WAITING == 1
,表示等待中。
4.4.5. 第五个else
else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
return cancelAcquire(node, interrupted, interruptible);
postSpins
第一次是0,postSpins << 1
还是0,(postSpins << 1) | 1
,在0位补1,第一次就是1,所以第一次spins = postSpins = 1
。- 第二次
1 << 1
就会是二进制10
,然后0位再补1,最后是二进制11
,十进制3
,spins=3。 - 第三次
11 << 1
就会是二进制110
,然后0位再补1,最后是二进制111
,十进制7
,spins=7。 - 这样设计的目的是,唤醒的线程会跟刚进来的线程一起做
cas
抢锁,唤醒的线程很有可能会失败,因为cpu
时间片在执行的线程手上,所以这样设计的目的是唤醒的线程如果一直cas
失败,就一直提高cas
的次数,直至成功,为了队列的线程能够去抢到锁。 - 根据是否设置了
time
去执行阻塞,阻塞完就会执行node.clearStatus()
把node.status
重置为0。 - 最后,如果出错或者线程触发中断就会调用
cancelAcquire()
,取消当前node
。
================================================
结束语:文章仅为个人的浅见,难免会有错误或遗漏,欢迎大家指出。还有我们常用的读写锁,是在互斥锁上面做了哪些优化,希望大家读源码也有自己的领悟。
================================================
总结:我们可以看到每种语言在设计代码的时候都会有各自的取舍。
我们简单对比一下两种语言在设计互斥锁上的一些差异
- Java 互斥锁区分了公平锁与非公平锁,它更推荐我们显式的去创建某一种锁。而 Golang 互斥锁则推荐我们简单的使用锁,其内部会自动升级为饥饿模式来实现公平锁。
- Java 互斥锁在设计上实现了可重入,而 Golang 互斥锁则是不可重入。
- Java 的非公平锁其实也是部分公平的,因为内部实际还是维护了一个FIFO的队列。
================================================
预告:下一节将会讲述的是分布式锁,这里先留几个问题。
- 为什么在单机锁惯用的 cas + 自旋模式,在分布式锁中却少有耳闻。
- 如何在共识算法的加持下实现分布式锁的强一致性。
转载自:https://juejin.cn/post/7240427838344904759