likes
comments
collection
share

Go语言读写锁 RWMutex 源码详解

作者站长头像
站长
· 阅读数 18

前言

前面两篇文章 初见 Go MutexGo Mutex 源码详解 ,我们学习了 Go语言 中的 Mutex,它是一把互斥锁,每次只允许一个 goroutine 进入临界区,这种可以保证临界区资源的状态正确性。但是有的情况下,并不是所有 goroutine 都会修改临界区状态,可能只是读取临界区的数据,如果此时还是需要每个 goroutine 拿到锁依次进入的话,效率就有些低下了。例如房间里面有一幅画,有人想修改,有人只是想看一下,完全可以放要看的一部分人进去,等他们看完再让修改的人进去修改,这样既提高了效率,也保证了临界区资源的安全。看和修改,对应的就是读和写,本篇文章我们就一起来学习下 Go语言 中的读写锁 sync.RWMutex

说明:本文中的示例,均是基于Go1.17 64位机器

RWMutex 总览

RWMutex 是一个读/写互斥锁,在某一时刻只能由任意数量reader 持有 或者 一个 writer 持有。也就是说,要么放行任意数量的 reader,多个 reader 可以并行读;要么放行一个 writer,多个 writer 需要串行写

RWMutex 对外暴露的方法有五个:

  • RLock():读操作获取锁,如果锁已经被 writer 占用,会一直阻塞直到 writer 释放锁;否则直接获得锁;
  • RUnlock():读操作完毕之后释放锁;
  • Lock():写操作获取锁,如果锁已经被 reader 或者 writer 占用,会一直阻塞直到获取到锁;否则直接获得锁;
  • Unlock():写操作完毕之后释放锁;
  • RLocker():返回读操作的 Locker 对象,该对象的 Lock() 方法对应 RWMutexRLock()Unlock() 方法对应 RWMutexRUnlock() 方法。

一旦涉及到多个 reader 和 writer ,就需要考虑优先级问题,是 reader 优先还是 writer 优先:

  • reader优先:只要有 reader 要进行读操作,writer 就一直等待,直到没有 reader 到来。这种方式做到了读操作的并发,但是如果 reader 持续到来,会导致 writer 饥饿,一直不能进行写操作;

  • writer优先:只要有 writer 要进行写操作,reader 就一直等待,直到没有 writer 到来。这种方式提高了写操作的优先级,但是如果 writer 持续到来,会导致 reader 饥饿,一直不能进行读操作;

  • 没有优先级:按照先来先到的顺序,没有谁比谁更优先,这种相对来说会更公平。

我们先来看下 RWMutex 的运行机制,就可以知道它的优先级是什么了。

可以想象 RWMutex 有两个队伍,一个是包含 所有reader 和你获得准入权writer队列A,一个是还没有获得准入权 writer 的 队列B

  1. 队列 A 最多只允许有 一个writer,如果有其他 writer,需要在 队列B 等待;
  2. 当一个 writer 到了 队列A 后,只允许它 之前的reader 执行读操作,新来的 reader 需要在 队列A 后面排队;
  3. 当前面的 reader 执行完读操作之后,writer 执行写操作;
  4. writer 执行完写操作后,让 后面的reader 执行读操作,再唤醒队列B 的一个 writer 到 队列A 后面排队。

初始时刻 队列A 中 writer W1 前面有三个 reader,后面有两个 reader,队列B中有两个 writer

Go语言读写锁 RWMutex 源码详解

并发读 多个 reader 可以同时获取到读锁,进入临界区进行读操作;writer W1队列A 中等待,同时又来了两个 reader,直接在 队列A 后面排队

Go语言读写锁 RWMutex 源码详解

写操作 W1 前面所有的 reader 完成后,W1 获得锁,进入临界区操作

Go语言读写锁 RWMutex 源码详解

获得准入权 W1 完成写操作退出,先让后面排队的 reader 进行读操作,然后从 队列B 中唤醒 W2队列A 排队。W2队列B队列A 的过程中,R8 先到了 队列A,因此 R8 可以执行读操作。R9R10R11W2 之后到的,所以在后面排队;新来的 W4 直接在队列B 排队。

Go语言读写锁 RWMutex 源码详解

从上面的示例可以看出,RWMutex 可以看作是没有优先级,按照先来先到的顺序去执行,只不过是 多个reader 可以 并行 去执行罢了。

深入源码

数据结构

type RWMutex struct {
	w           Mutex  // 控制 writer 在 队列B 排队
	writerSem   uint32 // 写信号量,用于等待前面的 reader 完成读操作
	readerSem   uint32 // 读信号量,用于等待前面的 writer 完成写操作
	readerCount int32  // reader 的总数量,同时也指示是否有 writer 在队列A 中等待
	readerWait  int32  // 队列A 中 writer 前面 reader 的数量
}

// 允许最大的 reader 数量
const rwmutexMaxReaders = 1 << 30

上述中的几个变量,比较特殊的是 readerCount ,不仅表示当前 所有reader 的数量,同时表示是否有 writer 在队列A中等待。当 readerCount 变为 负数 时,就代表有 writer 在队列A 中等待了。

  • 当有 writer 进入 队列A 后,会将 readerCount 变为负数,即 readerCount = readerCount - rwmutexMaxReaders,同时利用 readerWait 变量记录它前面有多少个 reader;
  • 如果有新来的 reader,发现 readerCount 是负数,就会直接去后面排队;
  • writer 前面的 reader 在释放锁时,会将 readerCountreaderWait都减一,当 readerWait==0 时,表示 writer 前面的所有 reader 都执行完了,可以让 writer 执行写操作了;
  • writer 执行写操作完毕后,会将 readerCount 再变回正数,readerCount = readerCount + rwmutexMaxReaders

举例:假设当前有两个 reader,readerCount = 2;允许最大的reader 数量为 10

  • 当 writer 进入队列A 时,readerCount = readerCount - rwmutexMaxReaders = -8,readerWait = readerCount = 2
  • 如果再来 3 个reader,readerCount = readerCount + 3 = -5
  • 获得读锁的两个reader 执行完后,readerCount = readerCount - 2 = -7,readerWait = readerWait-2 =0,writer 获得锁
  • writer 执行完后,readerCount = readerCount + rwmutexMaxReaders = 3,当前有 3个 reader

RLock()

reader 执行读操作之前,需要调用 RLock() 获取锁

func (rw *RWMutex) RLock() {
	
  // reader 加锁,将 readerCount 加一,表示多了个 reader
  if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		
    // 如果 readerCount<0,说明有 writer 在自己前面等待,排队等待读信号量
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

RUnlock()

reader 执行完读操作后,调用 RUnlock() 释放锁

func (rw *RWMutex) RUnlock() {

  // reader 释放锁,将 readerCount 减一,表示少了个 reader
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		
    // 如果readerCount<0,说明有 writer 在自己后面等待,看是否要让 writer 运行
		rw.rUnlockSlow(r)
	}
}
func (rw *RWMutex) rUnlockSlow(r int32) {

  // 将 readerWait 减一,表示前面的 reader 少了一个
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
    
		// 如果 readerWait 变为了0,那么自己就是最后一个完成的 reader
    // 释放写信号量,让 writer 运行
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

Lock()

writer 执行写操作之前,调用 Lock() 获取锁

func (rw *RWMutex) Lock() {

   // 利用互斥锁,如果前面有 writer,那么就需要等待互斥锁,即在队列B 中排队等待;如果没有,可以直接进入 队列A 排队
   rw.w.Lock()

  // atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 将 readerCount 变成了负数
  // 再加 rwmutexMaxReaders,相当于 r = readerCount,r 就是 writer 前面的 reader 数量
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
   
   // 如果 r!= 0 ,表示自己前面有 reader,那么令 readerWait = r,要等前面的 reader 运行完
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }

}

Lock()RUnlock() 是会并发进行的:

  1. 如果 Lock() 将 readerCount 变为负数后,假设 r=3,表示加入的那一刻前面有三个 reader,还没有赋值 readerWait CPU 就被强占了,readerWait = 0;
  2. 假设此时三个 reader 的 RUnlock() 会进入到 rUnlockSlow() 逻辑,每个 reader 都将 readerWait 减一, readerWait 会变成负数,此时不符合唤醒 writer 的条件;
  3. 三个 reader 运行完之后,此时 readerWait = -3, Lock() 运行到 atomic.AddInt32(&rw.readerWait, r) = -3+3 =0,也不会休眠,直接获取到锁,因为前面的 reader 都运行完了。

这就是为什么 rUnlockSlow() 要判断 atomic.AddInt32(&rw.readerWait, -1) == 0 以及 Lock() 要判断 atomic.AddInt32(&rw.readerWait, r) != 0 的原因。

Unlock()

writer 执行写操作之后,调用 Lock() 释放锁

func (rw *RWMutex) Unlock() {

  // 将 readerCount 变为正数,表示当前没有 writer 在队列A 等待了
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

  // 将自己后面等待的 reader 唤醒,可以进行读操作了
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	
  // 释放互斥锁,如果队列B有writer,相当于唤醒一个来队列A 排队
	rw.w.Unlock()

}

writer 对 readerCount 一加一减,不会改变整体状态,只是用正负来表示是否有 writer 在等待。当然,如果在 writer 将 readerCount变为负数后,来了很多 reader,将 readerCount 变为了正数,此时reader 在 writer 没有释放锁的时候就获取到锁了,是有问题的。但是 rwmutexMaxReaders 非常大,可以不考虑这个问题。

常见问题

  1. 不可复制

    和 Mutex 一样,RWMutex 也是不可复制。不能复制的原因和互斥锁一样。一旦读写锁被使用,它的字段就会记录它当前的一些状态。这个时候你去复制这把锁,就会把它的状态也给复制过来。但是,原来的锁在释放的时候,并不会修改你复制出来的这个读写锁,这就会导致复制出来的读写锁的状态不对,可能永远无法释放锁。

  2. 不可重入

    不可重入的原因是,获得锁之后,还没释放锁,又申请锁,这样有可能造成死锁。比如 reader A 获取到了读锁,writer B 等待 reader A 释放锁,reader 还没释放锁又申请了一把锁,但是这把锁申请不成功,他需要等待 writer B。这就形成了一个循环等待的死锁。

  3. 加锁和释放锁一定要成对出现,不能忘记释放锁,也不能解锁一个未加锁的锁。

实战一下

Go 中的 map 是不支持 并发写的,我们可以利用 读写锁 RWMutex 来实现并发安全的 map。在读多写少的情况下,使用 RWMutex 要比 Mutex 性能高。

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type ConcurrentMap struct {
	m     sync.RWMutex
	items map[string]interface{}
}

func (c *ConcurrentMap) Add(key string, value interface{}) {
	c.m.Lock()
	defer c.m.Unlock()
	c.items[key] = value
}

func (c *ConcurrentMap) Remove(key string) {
	c.m.Lock()
	defer c.m.Unlock()
	delete(c.items, key)
}
func (c *ConcurrentMap) Get(key string) interface{} {
	c.m.RLock()
	defer c.m.RUnlock()
	return c.items[key]
}

func NewConcurrentMap() ConcurrentMap {
	return ConcurrentMap{
		items: make(map[string]interface{}),
	}
}

func main() {
	m := NewConcurrentMap()

	var wait sync.WaitGroup

	wait.Add(10000)
	for i := 0; i < 10000; i++ {

		key := fmt.Sprintf("%d", rand.Intn(10))
		value := fmt.Sprintf("%d", rand.Intn(100))
		if i%100 == 0 {
			go func() {
				defer wait.Done()
				m.Add(key, value)
			}()
		} else {
			go func() {
				defer wait.Done()
				fmt.Println(m.Get(key))
				time.Sleep(time.Millisecond * 10)
			}()
		}

	}

	wait.Wait()
}

总结

本文以图文并茂的方式介绍了RWMutex的运行机制,对源码进行逐行分析,学习了 RWMutex 底层是如何实现的,同时列举了一些 RWMutex 的常见错误。如果本篇文章对有所帮助,点个关注 + 转发哦 ^_^

更多

个人博客: lifelmy.github.io/

微信公众号:漫漫Coding路

转载自:https://juejin.cn/post/7149782692148543495
评论
请登录