likes
comments
collection
share

当读遇上写:RWMutex读写锁解析

作者站长头像
站长
· 阅读数 6

在Go中,使用Mutex同步原语来保证共享资源的只能由一个goroutine访问,但某些场景下这种只能由一个goroutine访问的情况事实上效率是比较低的,例如,在读多写少的情况下,即使没有读操作,大量的并发读访问也会因为Mutex而导致串行访问,实际上对性能的影响其实算比较大的。

面对这种读多写少的情况,一种有效的办法就是区分读写操作

具体来说:

  • 如果某个读操作的goroutine持有了锁,此时若其他读操作的goroutine也想要获取这把锁不会被阻塞,可以并发访问共享资源,从而将串行读变成并发读,提高读操作的性能。
  • 当写操作的goroutine持有锁时,这个锁相当于是排他锁,其他读写操作的goroutine想要获取锁时会被阻塞并等待直到锁被释放。
  • 这一类并发读写问题叫作readers-writers问题。

在Go标准库中,RWMutex(读写锁)便是解决这类 readers-writers 问题的神兵利器。

一、RWMutex介绍及使用场景

读写锁RWMutex是一个读写互斥锁,即reader/writer互斥锁,RWMutex锁在某一时刻只能由任意数量的reader持有,或者是被单个writer持有。

RWMutex的零值为未加锁状态,当使用RWMutex的时候,无论是声明该类型的变量,还是将其嵌入到struct中,都无需显示地初始化它。

RWMutex提供了如下几个方法:

方法说明
Lock/Unlock写操作调用的方法。如果锁已经被reader 或者 writer 持有,则调用Lock方法会一直阻塞;Unlock 则是对应释放锁的方法
RLock/RUnlock读操作调用的方法。如果锁已经被writer 持有,则RLock方法会一直阻塞,直到能获取到锁,否则就直接返回;而 RUnlockreader 释放锁的方法
RLocker为读操作返回一个Locker接口的对象。该对象的Lock方法会调用RWMutexRLock方法;该对象的Unlock方法会调用RWMutexRUnlock方法

计数器就是一个很好使用RWMutex读写互斥锁的例子。

  • 读操作可以并行执行;
  • 写操作只允许一个线程执行;
  • 计数器的count++操作是写操作,而获取count的值为读操作;
  • 这种场景下非常适合使用读写锁,即readers-writers问题。

如何使用RWMutex保护共享资源?

// Counter 线程安全的计数器结构体
type Counter struct {
    mu    sync.RWMutex
    count uint64
}

// Incr 写锁保护写操作
func (c *Counter) Incr() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

// Count 读锁保护读操作
func (c *Counter) Count() uint64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.count
}

func main() {
    var counter Counter
    // 开启10个reader进行读操作
    for i := 0; i < 1000; i++ {
       go func() {
          for {
             count := counter.Count() // 计数器读操作
             fmt.Println(count)
             time.Sleep(time.Millisecond)
          }
       }()
    }

    // writer
    for {
       counter.Incr()
       time.Sleep(time.Second)
    }
}

上述代码中,使用for循环开启了十个进行读操作的goroutine,每个goroutine循环读取count的值,每读取一次,sleep 1 毫秒。同时,以main goroutine作为写操作的执行goroutine,循环执行计数器的写操作,每一秒写一次。

  • Incr方法会修改计数器的值,是一个写操作方法,使用Lock/Unlock进行保护。Incr方法每秒才调用一次,所以writer竞争锁的频次比较的低;
  • Count方法读取计数器的值,是一个读操作方法,使用RLock/RUnlock方法进行保护。十个goroutine每毫秒都在执行一次查询;

通过读写锁,可以看到能够极大的提升计数器的性能,在读取计数器的值时,能够并发进行读操作,避免了串行读。

如果使用Mutex,性能就不会像读写锁这么好。因为多个reader 并发读时,使用互斥锁导致了reader要排队读的情况,即串行读,没有RWMutex并发读的性能好。

可以通过修改一下Count方法,对比一下串行读与并发读的执行过程。

// Count 读锁保护读操作
func (c *Counter) Count() uint64 {
    c.mu.Lock()
    defer c.mu.Unlock()
    time.Sleep(time.Millisecond * 100)
    return c.count
}

// Count 读锁保护读操作
func (c *Counter) Count() uint64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    time.Sleep(time.Millisecond * 100)
    return c.count
}

如果在实际场景中,遇到可以明确区分 reader 和 writer goroutine 的场景,并且存在大量的并发读,少量的并发写,并且有性能上的需求,则可以考虑使用读写锁 RWMutex 替换 Mutex

二、RWMutex实现原理

readers-writers问题一般有三类,基于对读和写操作的优先级,读写锁的设计和实现也分成三类。

  • Read-preferring:读操作优先可以提供很高的并发性,但在竞争激烈的情况下,很有可能会导致写操作饥饿,即写操作goroutine无法获取到锁,只有所有的读操作结束后,没有读操作获取锁,写操作才能够获取到锁。
  • Write-preferring:写操作优先,如果有一个writer goroutine在等待请求锁时,其他的新来的readergoroutine请求锁时会被阻止,保证writer goroutine先获取到锁,当然,如果有一些reader已经请求了锁的话,新请求的writer也会等待已经存在的reader都释放锁之后才能获取。写操作优先主要避免了writer的饥饿问题。
  • 不指定优先级:不区分readerwriter的优先级,在某些场景下这种设计反而更加高效,避免了上述两种方法导致的读饥饿问题与写饥饿问题。

在Go标准库中,RWMutex的设计思想采用了Write-preferring方案,一个正在阻塞的Lock调用会排除新的reader请求到锁。

1、RWMutex结构

在Go中,RWMutex是基于基于 Mutex 实现的,结合条件变量(condition variables)、信号量(semaphores)等并发原语来实现。

RWMutex结构体中,包含一个 Mutex,以及四个辅助字段 writerSem、readerSem、readerCount 和 readerWait:

type RWMutex struct {
    w           Mutex  // 互斥锁,解决多个writer的竞争
    writerSem   uint32 // writer等待reader完成的信号量
    readerSem   uint32 // reader等待writer完成的信号量
    readerCount int32  // reader的数量
    readerWait  int32  // writer等待完成的reader的数量
}

const rwmutexMaxReaders = 1 << 30

上述结构体中:

  • w:解决writer之间竞争锁的互斥锁;
  • writerSem:阻塞writer的信号量;
  • readerSem:阻塞reader的信号量;
  • readerCount:记录当前reader的数量,以及标识是否有writer竞争锁;
  • readerWait:记录writer请求锁时,需要等待read完成的reader数量;
  • rwmutexMaxReaders:定义了最大的reader数量;

2、RLock/RUnlock 的实现

移出race等无关紧要,RLock方法如下:

func (rw *RWMutex) RLock() {
    // rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先级高,所以把后来的reader阻塞休眠
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
       // A writer is pending, wait for it.
       runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

上述RLock方法中,对reader的计数器readerCount加1,当readerCount为负数时,则会将当前请求锁的reader进行阻塞睡眠。

为什么readerCount会是负数呢?其实readerCount有双重含义:

  • 若当前RWMutex没有writer竞争或持有锁时,readerCount为正数,为正常的reader的计数;
  • 若当前RWMutex有writer 竞争锁或者持有锁时,readerCount为负数,此时readerCount不仅为reader的计数,而且负数标识为当前是否有 writer 竞争或持有锁,此时请求锁的reader会进入阻塞,等待锁的释放(runtime_SemacquireMutex(&rw.readerSem, false, 0));

RUnlock方法:

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
       // 检查是否所有的reader都释放读锁,唤醒请求写锁的writer
       rw.rUnlockSlow(r)
    }
}

当调用RUnlock时,会将reader的计数器readerCount的计数值减去1,标识当前reader释放读锁。之后判断当前减去后的返回值,如果返回值为负数,说明当前有writer正在获取RWMutex锁,此时调用rw.rUnlockSlow(r)方法来检查是否所有的reader都释放读锁,若所有的reader都释放了锁,则唤醒请求写锁的writer

rUnlockSlow方法如下:

func (rw *RWMutex) rUnlockSlow(r int32) {
    // A writer is pending.
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
       // The last reader unblocks the writer.
       runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

在代码第三行,判断readerWait需要等待的reader数量是否为0,若为0则通过writerSem信号量唤醒阻塞的writer。当一个或者多个reader持有锁的时候,竞争锁的writer会等待这些reader释放完,才可能持有这把锁。

rUnlockSlow方法将持有锁的reader计数减少1(等待完成的reader的数量readerWait减1),会检查既有的 reader 是不是都已经释放了锁(readerWait是否为0),如果都释放了锁,就会唤醒 writerruntime_Semrelease方法),让 writer 持有锁。

3、Lock/Unlock 的实现

Lock方法:

为了避免多个writer竞争共享资源,RWMutex内部使用Mutex来实现writer之间的互斥,保证并发安全。writer通过调用Lock方法来获取RWMutex读写锁。

func (rw *RWMutex) Lock() {
    // 首先解决其他writer竞争问题
    rw.w.Lock()
    // 反转readerCount,告诉reader有writer竞争锁
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 如果当前有reader持有锁,那么需要等待活跃的reader释放锁
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
       runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

上述代码中:

  • 在第3行代码中,writer调用RWMutex中成员变量MutexLock方法获取互斥锁。
  • writer获取到互斥锁后,会第5行代码将readerCount反转为负值,将原来的正整数readerCount(>=0)修改为负数(readerCount - rwmutexMaxReaders),让readerCount保持两个含义,即保存了持有读锁的reader个数,又表示当前有writer获取到锁,之后将拿到的返回值在增加rwmutexMaxReaders得到r,用于后续的判断(相当于恢复readerCount为正数,记录当前活跃的reader,用于后续的判断,但是实际readerCount已反转为负数)。
  • 在第7行代码中,若当前已有活跃的reader是否全部都释放了锁(r != 0),以及判断目前是否还有正在等待锁的reader(atomic.AddInt32(&rw.readerWait, r) != 0),若存在,则阻塞当前writer。即如果 readerCount 不是 0,就说明当前有持有读锁的 readerRWMutex需要把这个当前 readerCount 赋值给 readerWait 字段保存下来(第 7 行), 同时这个 writer 进入阻塞等待状态(第 8 行)。
  • 每当一个 reader 释放读锁的时候(调用 RUnlock 方法时,具体可参考上述的rUnlockSlow方法),readerWait 字段就减 1,直到所有的活跃的 reader 都释放了读锁,才会唤醒这个 writer

所谓活跃的reader,就是指持有读锁还没有释放的那些reader

UnLock方法:

func (rw *RWMutex) Unlock() {
    // 将readerCount转回正数,即告知reader没有活跃的writer了
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    
    // 唤醒阻塞的reader们
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 释放内部的互斥锁
    rw.w.Unlock()
}

当持有锁的writer需要释放锁时,调用Unlock方法。

  • 在第3行代码中,将readerCount再次反转为正数,即告知reader们已经没有writer获取锁了。因为在writer获取锁时,readerCount通过减去常量rwmutexMaxReaders反转为负数,因此再次反转为正数的方法为增加 rwmutexMaxReaders 这个常数值。
  • 在第6行中,通过循环调用runtime_Semrelease函数,使用信号量readerSem唤醒阻塞的reader
  • 唤醒reader后,释放内部的互斥锁Mutex

Lock/Unlock方法中,需要重点理解readerCount这个字段的含义以及反转的方式,其次是字段的更改和内部互斥锁的顺序。在 Lock 方法中,是先获取内部互斥锁,才会修改的其他字段;在 Unlock 方法中,是先修改的其他字段,才会释放内部互斥锁,保证字段的修改也受到互斥锁的保护。

三、RWMutex使用注意事项

RWMutex在使用上与Mutex一样暴露的API简单便利,但是在实际使用时,容易犯一些错误。

1、RWMutex不可复制

RWMutexMutex类似,是由互斥锁和通过一系列记录状态的参数构成的结构体,包括信号量readerCount等,本身互斥锁就不可复制,再加上一系列状态字段,RWMutex就更不可能赋值使用。

原因在于一但读写锁被使用,它的字段就会记录当前的一些锁状态,此时复制该锁会将其当前的锁状态也一同赋值,而赋值出来的状态是难以预估的,并且原来的锁释放了,并不会修改复制出来的读写锁,从而导致状态不统一,可能锁永远无法释放。

可以借助 vet 工具,在变量赋值、函数传参、函数返回值、遍历数据、struct 初始化等时,检查是否有读写锁隐式复制的情景。

2、重入导致死锁

读写锁在发生重入或者递归调用的情况下,容易导致死锁。

第一种情况:

因为读写锁内部是基于互斥锁Mutex实现控制writer的并发访问,而Mutex本身有重入问题,因此在调用writer 重入调用 Lock时,容易出现死锁现象。

func lockUser1(l *sync.RWMutex) {
    fmt.Println("lock user1")
    l.Lock()
    lockUser2(l)
    l.Unlock()
}

func lockUser2(l *sync.RWMutex) {
    l.Lock()
    fmt.Println("lock user2")
    l.Unlock()
}

func main() {
    lock := &sync.RWMutex{}
    lockUser1(lock)
}

执行上述代码,会得到死锁的错误输出。

第二种情况:

当有reader获取到读写锁时,writer会进行阻塞等待,如果此时reader的读操作调用writer的写操作,即调用Lock方法,那么readerwriter会形成互相依赖的死锁状态。reader会等待writer 完成后再释放锁,而writer 需要等待这个 reader执行结束释放锁之后,才能不阻塞地继续执行。这是一个读写锁常见的死锁场景。

第三种情况:

当当一个 writer 请求锁时,如果已经有一部分活跃的reader,此时writer会等待这些活跃的reader完成后才能获取到锁,但之后活跃的reader如果在依赖新的reader的话,这些新的reader就会等待writer释放锁后才能继续执行,从而就会形成一个环形依赖:writer 依赖活跃的 reader -> 活跃的 reader 依赖新来的 reader -> 新来的 reader 依赖 writer

3、释放未加锁的RWMutex

与互斥锁相同,LockUnlock的调用总是成对出现的,RLockRUnlock的调用也必须成对出现。

LockRLock多余的调用会导致锁没有被释放,可能会出现死锁,而UnlockRUnlock多余的调用会导致panic

四、总结

在Go语言中,sync.RWMutex(读写互斥锁)是一个用于并发控制的同步原语,它允许多个goroutine同时读取共享资源,但在同一时间只允许一个goroutine写入共享资源。这种锁的设计是为了在读取操作远多于写入操作的场景中提高性能。

如果实际业务场景需要解决的问题是一个 readers-writers 问题,那么就可以毫不犹豫地选择 RWMutex,不用考虑其它选择。

在使用RWMutex,需要尽量注意避免重入,重入带来的死锁非常隐蔽,而且难以诊断。


参考文章

time.geekbang.org/column/arti…