(十)Go并发编程详解:锁、WaitGroup、Channel
前言
传统的编程语言如C++、Java、Python等,它们的并发逻辑通常基于操作系统提供的线程。并发执行单元(线程)之间的通信依赖于操作系统提供的原语,如共享内存、信号、管道、消息队列和套接字等。其中,共享内存是应用最广泛的一种方式。
然而,使用共享内存的并发模型往往复杂且容易出错,尤其在大型或复杂的业务场景中。
Go语言在设计之初就把解决传统并发模型中的问题作为���标,并借鉴了CSP(Communicating Sequential Processes,通信顺序进程)并发模型的思想。
CSP模型旨在简化并发程序的编写,使得编写并发程序与编写顺序程序一样简单。
在CSP模型中,通过生产者 -> 输出数据 -> 输入/输出原语 -> 输出数据的方式进行通信和同步。
为了实现CSP模型,Go语言引入了Channel。Goroutine可以通过channel读写数据,通过channel将多个goroutine连接起来形成通信的桥梁。
尽管Go语言中CSP模型是主流的并发模型,但它也支持共享内存并发模型。在sync包中提供了诸如互斥锁、读写锁、条件变量和原子操作等机制。
本章节是Go并发编程理论篇的第二篇,我会带领大家学习Go各种并发原语,并以面试题形式进行输出,相信大家看完本章节后会有所收获!
关于Go并发,很多小伙伴私信我说网上资料很多,也有很多很全面,详细的资料,但是都晦涩,看不下去。于是我整理成了笔记的形式,再通过树状的笔记输出形成本文。我认为每个并发原语都可以概括为几个大的版块:基本概念、应用场景、基本用法、易错场景、基本实现原理。如果你能把这几个问题都说清楚,道明白了,那么相信你无论是应对面试还是实际使用,都能够做到心中自有意,下笔如有神!
互斥锁 (Mutex)
基本概念
互斥锁(Mutex)是一种用于在并发环境中安全访问共享资源的机制。当一个协程获取到锁时,它将拥有临界区的访问权,而其他请求该锁的协程将会阻塞,直到该锁被释放。
应用场景
并发访问共享资源的情形非常普遍,例如:
- 秒杀系统
- 多个goroutine并发修改某个变量
- 同时更新用户信息
如果没有互斥锁的控制,将会导致商品超卖、变量数值不正确、用户信息更新错误等问题。这时候就需要使用互斥锁来控制并发访问。
基本用法
Mutex实现了Locker接口,提供了两个方法:Lock
和Unlock
。
Lock
方法用于对临界区上锁,获得该锁的协程拥有临界资源的访问权,其他请求临界区的协程会阻塞等待该锁的释放。Unlock
方法用于解锁,释放锁使其他协程可以访问临界区。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
var count int
increment := func() {
mu.Lock()
defer mu.Unlock()
count++
fmt.Println("Count:", count)
}
for i := 0; i < 5; i++ {
go increment()
}
time.Sleep(time.Second)
}
易错场景
- 不可重入的互斥锁:Go的Mutex是不可重入锁。由于Mutex锁没有记录锁的持有者信息,无法得知谁拥有锁。如果一个获取了锁的协程再次请求锁,将会被阻塞,形成死锁。
func example() {
var mu sync.Mutex
mu.Lock()
defer mu.Unlock()
// Do something...
mu.Lock() // 死锁
}
- Lock和Unlock不配对:未正确配对的Lock和Unlock调用会导致死锁。如果对已经锁定的锁再次调用Lock,将会阻塞;对未锁定的Mutex调用Unlock将会panic。
func example() {
var mu sync.Mutex
mu.Lock()
// 未调用mu.Unlock()
mu.Unlock() // 正确
}
- 复制已使用的锁:复制已使用的锁会导致意外的行为。
func example() {
var mu sync.Mutex
copyMu := mu
copyMu.Lock() // 错误
}
基本实现
Mutex结构体有两个字段:state
和sema
。
type Mutex struct {
state int32
sema uint32
}
State字段的含义
Mutex有四种状态:
mutexLocked
:Mutex上锁标志mutexWoken
:Mutex唤醒标志mutexStarving
:Mutex正常/饥饿模式标志waiterCount
:等待者数量
Mutex的正常模式和饥饿模式
- 正常模式:等待者队列遵循先入先出原则,被唤醒的goroutine不会直接获得锁,而是与新请求锁的goroutine竞争锁。新请求锁的goroutine由于正在CPU上执行,获得锁的几率更大,从而减少上下文切换的性能损失。然而,这可能导致被唤醒的goroutine长时间无法获得锁。
- 饥饿模式:当等待时间超过阈值1毫秒时,进入饥饿模式。被唤醒的goroutine被放入等待队列的队首,当前goroutine在调用Unlock释放锁时,会直接将锁交给等待队列的队首,新请求锁的goroutine不会参与竞争,而是排到等待队列的队尾。当等待队列没有goroutine或等待时间小于1毫秒时,Mutex将从饥饿模式切换回正常模式。
代码示例
以下代码展示了如何使用Mutex在并发环境中安全地访问共享资源:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
var count int
increment := func() {
mu.Lock()
defer mu.Unlock()
count++
fmt.Println("Count:", count)
}
for i := 0; i < 5; i++ {
go increment()
}
time.Sleep(time.Second)
}
在上述代码中,多个goroutine同时调用increment
函数,通过Mutex来确保对共享变量count
的访问是安全的。
读写锁 (RWMutex)
基本概念
在并发编程中,为了保证多个协程安全地访问共享资源,我们通常使用Mutex互斥锁。然而,在读多写少的场景下,Mutex会导致性能问题,因为所有操作(包括读操作)都必须串行进行。为了解决这一问题,可以区分读操作和写操作。RWMutex是一种读写锁,同一时间只能被一个写操作持有,或者被多个读操作持有。
基本用法
RWMutex提供了五个方法:Lock
、Unlock
、RLock
、RUnlock
和RLocker
。
Lock
方法用于在写操作时获取写锁,会阻塞等待当前未释放的写锁。当处于写锁状态时,新的读操作将会阻塞等待。Unlock
方法用于释放写锁。RLock
方法用于在读操作时获取读锁,会阻塞等待当前写锁的释放。如果锁处于读锁状态,当前协程也能获取读锁。RUnlock
方法用于释放读锁。RLocker
方法用于获取一个Locker接口的对象,调用其Lock
方法时会调用RLock
方法,调用Unlock
方法时会调用RUnlock
方法。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var rw sync.RWMutex
var count int
write := func() {
rw.Lock()
defer rw.Unlock()
count++
fmt.Println("Write:", count)
}
read := func() {
rw.RLock()
defer rw.RUnlock()
fmt.Println("Read:", count)
}
// Start multiple readers
for i := 0; i < 5; i++ {
go read()
}
// Start a single writer
go write()
time.Sleep(time.Second)
}
实现原理
RWMutex主要通过readerCount
字段来维护读锁的数量。写操作时,会将readerCount
减去2的30次方变成一个负数,从而阻塞新的读锁请求。当写锁被释放时,将readerCount
加上2的30次方,恢复成一个整数并唤醒等待中的读锁操作。
易错场景
RWMutex的易错场景和Mutex类似,包括:
- 不可重入锁:Go的RWMutex是不可重入锁。如果一个获取了锁的协程再次请求同一个锁,将会被阻塞,形成死锁。
func example() {
var rw sync.RWMutex
rw.Lock()
defer rw.Unlock()
// Do something...
rw.Lock() // 死锁
}
- Lock和Unlock不配对:未正确配对的Lock和Unlock调用会导致死锁。如果对已经锁定的锁再次调用Lock,将会阻塞;对未锁定的RWMutex调用Unlock将会panic。
func example() {
var rw sync.RWMutex
rw.Lock()
// 未调用rw.Unlock()
rw.Unlock() // 正确
}
- 复制已使用的锁:复制已使用的锁会导致意外行为。
func example() {
var rw sync.RWMutex
copyRw := rw
copyRw.Lock() // 错误
}
- 隐蔽的死锁情景:写锁操作等待旧的读锁的释放,旧的读锁等待新的读锁的释放,新的读锁等待写锁的释放,形成死锁。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var rw sync.RWMutex
var count int
write := func() {
rw.Lock()
defer rw.Unlock()
count++
fmt.Println("Write:", count)
}
read := func() {
rw.RLock()
defer rw.RUnlock()
fmt.Println("Read:", count)
}
// 启动多个读操作
for i := 0; i < 5; i++ {
go read()
}
// 启动写操作
go write()
time.Sleep(time.Second)
}
在上述代码中,多个goroutine同时调用read
函数,通过RWMutex来确保对共享变量count
的读取是安全的。同时,write
函数用于更新共享变量count
,确保在写操作时独占访问权。
死锁
什么是死锁
死锁指的是一组进程由于相互持有和等待资源,导致无法继续执行的状态。在这种情况下,所有相关的进程都会无限期阻塞,无法向前推进。具体来说,死锁发生在一个进程持有某些资源并等待其他进程释放其占有的资源,同时这些其他进程也在等待第一个进程释放资源,形成相互等待的状态。
死锁的必要条件
死锁的发生需要满足以下四个必要条件:
- 互斥条件:
- 资源同一时间只能被一个进程所拥有。
- 请求和保持条件:
- 一个进程已经拥有某些资源,但在等待其他资源时不释放已持有的资源。
- 不可剥夺条件:
- 进程持有的资源在未使用完毕前,不能被强行剥夺,只能由进程自己释放。
- 循环等待条件:
- 存在一个进程集合中的每个进程都在等待另一个进程所持有的资源,形成一个循环等待链。
如何解决死锁问题
为了解决死锁问题,可以采取以下两种策略:
- 检测和恢复:
- 系统可以定期检测死锁的存在,并采取措施恢复。例如,通过回滚进程的一部分操作或强制剥夺资源。
- 破坏死锁的必要条件:
- 可以通过设计系统来破坏死锁的四个必要条件之一,例如:
- 破坏互斥条件:尽量使用共享资源来减少互斥性。
- 破坏请求和保持条件:在进程开始时一次性请求所有资源,或者在请求新的资源之前释放已持有的资源。
- 破坏不可剥夺条件:设计成可以强制剥夺资源,例如通过优先级调度。
- 破坏循环等待条件:对资源进行排序,并要求进程按序请求资源,避免形成循环等待。
- 可以通过设计系统来破坏死锁的四个必要条件之一,例如:
示例代码
以下是一个Go语言中的死锁示例,展示了两个goroutine由于相互等待对方持有的资源而导致的死锁:
package main
import (
"fmt"
"sync"
)
func main() {
var mutexA, mutexB sync.Mutex
go func() {
mutexA.Lock()
fmt.Println("Goroutine 1: Locked mutexA")
// Simulate some work
mutexB.Lock()
fmt.Println("Goroutine 1: Locked mutexB")
mutexB.Unlock()
mutexA.Unlock()
}()
go func() {
mutexB.Lock()
fmt.Println("Goroutine 2: Locked mutexB")
// Simulate some work
mutexA.Lock()
fmt.Println("Goroutine 2: Locked mutexA")
mutexA.Unlock()
mutexB.Unlock()
}()
// Wait for goroutines to finish (they won't due to deadlock)
select {}
}
在上述代码中,两个goroutine分别持有mutexA
和mutexB
,并且尝试获取对方的锁,导致死锁发生。每个goroutine无限期等待对方释放资源,形成相互等待的循环。
通过了解死锁的概念、必要条件以及解决策略,我们可以更好地设计并发程序,避免陷入死锁状态。
WaitGroup
基本概念
WaitGroup
是 Go 语言的 sync
包下提供的一种并发原语,用来解决并发编排的问题。它主要用于等待一组 goroutine 完成。假设一个大任务需要等待三个小任务完成才能继续执行,如果采用轮询的方法,可能会导致两个问题:一是小任务已经完成但大任务需要很久才能被轮询到,二是轮询会造成 CPU 资源的浪费。因此,WaitGroup
通过阻塞等待并唤醒大任务的 goroutine 来解决这个问题。
基本用法
WaitGroup
提供了三个方法:Add
、Done
和 Wait
。
Add(delta int)
:将计数器增加 delta 值。Done()
:将计数器的值减一,相当于Add(-1)
。Wait()
:阻塞等待,直到计数器的值变为 0,然后唤醒调用者。
实现原理
WaitGroup
维护了两个计数器,一个是 v
计数器,另一个是 w
计数器。
- 调用
Add
方法时,v
计数器的值会增加相应的 delta 值。 - 调用
Done
方法时,v
计数器的值会减一。 - 调用
Wait
方法时,w
计数器的值会加一。当v
计数器的值为 0 时,会唤醒所有的 waiter。
易错场景
使用 WaitGroup
需要注意以下易错场景:
- 计数器的值为负数会引发 panic。
v
计数器增加的值大于减少的值,会造成一直阻塞。
示例代码
以下是一个使用 WaitGroup
的示例代码:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Done() 方法用于减少计数器
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1) // Add() 方法增加计数器
go worker(i, &wg)
}
wg.Wait() // Wait() 方法阻塞等待所有计数器为 0
fmt.Println("All workers done")
}
在上述代码中,main
函数创建了一个 WaitGroup
并启动了三个 goroutine,每个 goroutine 执行 worker
函数。在 worker
函数中,调用 wg.Done()
方法表示当前工作已经完成。main
函数中的 wg.Wait()
方法阻塞等待,直到所有的 goroutine 都完成工作并调用了 Done
方法。
小结
WaitGroup
是 Go 语言中非常有用的并发原语,用于等待一组 goroutine 完成。通过合理使用 Add
、Done
和 Wait
方法,可以避免轮询等待带来的性能问题,并提高并发编排的效率。在使用 WaitGroup
时,需要注意计数器的增减操作,避免引发 panic 或长时间阻塞。
Channel
基本概念
Go 语言提倡通过通信来实现共享内存,而不是通过共享内存来通信。Go 的 CSP(Communicating Sequential Processes)并发模型正是通过 Goroutine 和 Channel 来实现的。Channel 是 Go 语言中用于 goroutine 之间通信的主要工具。
应用场景
Channel 有以下几类应用场景:
- 数据交互:通过 Channel 可以模拟并发的 Buffer 或者 Queue,实现生产者-消费者模式。
- 数据传递:通过 Channel 将数据传递给其他的 goroutine 进行处理。
- 信号通知:Channel 可以用于传递一些信号,例如 close、data ready 等。
- 并发编排:通过 Channel 的阻塞等待机制,可以让一组 goroutine 按照一定的顺序并发或串行执行。
- 实现锁功能:通过 Channel 的阻塞等待机制,可以实现互斥锁的功能。
基本用法
Channel 有三种类型:
- 只能接收的 Channel:
<-chan T
- 只能发送的 Channel:
chan<- T
- 既能发送又能接收的 Channel:
chan T
Channel 通过 make
函数进行初始化,未初始化的 Channel 的零值是 nil
,对 nil
的 Channel 进行接收或发送操作会导致阻塞。
Channel 可以分为有缓冲和无缓冲两种。无缓冲的 Channel 是同步的,有缓冲的 Channel 是异步的。发送操作只有在 Channel 满时才会阻塞,接收操作只有在 Channel 为空时才会阻塞。
发送操作是 chan<-
,接收操作是 <-chan
。接收数据时可以返回两个值,第一个是元素,第二个是一个布尔值,若为 false
则说明 Channel 已经被关闭并且 Channel 中没有缓存的数据。
Go 的内建函数 close
、cap
、len
都可以操作 Channel 类型,发送和接收都可以作为 select
语句的 case,Channel 也可以应用于 for range
语句。
实现原理
发送
在发送数据给 Channel 时,发送语句会转化为 chansend
函数:
- 如果 Channel 是
nil
,调用者会被阻塞。 - 如果 Channel 已经关闭,发送操作会导致 panic。
- 如果
recvq
字段有 receiver,则将数据交给它,而不需要放入 buffer 中。 - 如果没有 receiver,则将数据放入 buffer 中。
- 如果 buffer 满了,则发送者 goroutine 会加入到
sendq
中阻塞休眠,直到被唤醒。
接收
在接收数据时,接收语句会转化为 chanrecv
函数:
- 如果 Channel 是
nil
,调用者会被阻塞。 - 如果 Channel 已经被关闭,并且队列中无缓存元素,则返回
false
和一个对应元素的零值。 - 如果
sendq
中有 sender 并且 buffer 中有数据,则优先从 buffer 中取出,否则从sendq
中弹出一个 sender,把它的数据复制给 receiver。 - 如果没有 sender,则从 buffer 中正常取一个元素;如果没有元素,则 receiver 会加入到
recvq
中阻塞等待,直到接收到数据或者 Channel 被关闭。
关闭
- 如果 Channel 是
nil
,关闭nil
的 Channel 会导致 panic。 - 如果关闭已经关闭的 Channel 也会导致 panic。
- 否则将
recvq
和sendq
全部清除并唤醒。
示例代码
以下是一个使用 Channel 的示例代码:
package main
import (
"fmt"
"time"
)
// 生产者:生成数据并发送到 channel
func producer(ch chan<- int, count int) {
for i := 0; i < count; i++ {
ch <- i
fmt.Println("Produced:", i)
time.Sleep(time.Millisecond * 500)
}
close(ch) // 关闭 channel,表示生产结束
}
// 消费者:从 channel 接收数据并处理
func consumer(ch <-chan int) {
for data := range ch {
fmt.Println("Consumed:", data)
time.Sleep(time.Millisecond * 1000)
}
}
func main() {
ch := make(chan int, 5) // 创建一个带缓冲的 channel
go producer(ch, 10) // 启动生产者
consumer(ch) // 启动消费者
}
在上述代码中,main
函数创建了一个带缓冲的 Channel,并启动了一个生产者 goroutine 和一个消费者 goroutine。生产者不断生成数据并发送到 Channel 中,消费者从 Channel 中接收数据并进行处理。生产者完成后关闭 Channel,消费者则在接收到所有数据后结束。
至此我们已经掌握了Go基本的并发编程,但是距离我们能熟练运用还有差距,在接下来的篇章中我将继续为大家带来go并发编程实战,记得关注我哦!!!
转载自:https://juejin.cn/post/7391703878355517459