likes
comments
collection
share

浅谈Go语言的并发控制

作者站长头像
站长
· 阅读数 22

前言

本文原创,著作权归WGrape所有,未经授权,严禁转载

一、说明

  • 为方便阅读,“线程”、“协程”、“子程序”虽然是有区别的,但在本文中不做区分,存在混用的情况
  • 文章虽然以Go语言为主题,但很多原理和思想对于其他语言都是通用的,可举一反三
  • 内容尽量浅尝辄止,不过于广泛或细节而偏离主题的同时,也让大家有所收获

二、内容大纲

  • 了解并发控制
  • Go语言常用的并发控制
  • channel存在的问题
  • 强大的Context

三、了解并发控制

1、理解

一种协调多个子程序间并发执行的技术,注意“多个”和“并发”这两个关键条件缺一不可

2、目的

实现线程安全,也称为并发安全

3、分类

并发控制主要分为两类,一种是对数据的控制,一种是对行为的控制

  • 数据的控制 :数据的共享或者独占
  • 行为的控制 :行为的开始或者停止

4、总结

  • 并发控制包括锁、非锁两种技术
  • 控制的对象主要包括数据、行为两种
  • 锁通过对数据的直接控制,可以实现对行为的间接控制(如Redis的Setnx分布式锁)
  • 非锁技术通过对行为的直接控制,可以实现对数据的间接控制(如JAVA的synchronized同步语句块)

浅谈Go语言的并发控制

四、Go语言常用的并发控制

Go主要提供了如下几种并发控制方法

  • 锁技术 :锁
  • 非锁技术 :同步原语、Channel、Context

1、锁

Go只提供了Mutex和RWMutex两种类型的锁,之所以只提供这两种锁,是因为在应用层所有奇奇怪怪的锁都是基于这两个锁之上的,这两个是最基本的锁。就像在硬件层,总线锁是最基本的锁一样

(1) Mutex 互斥锁

<1> 理解

Mutex的本意是互斥量,如果一个量的值只能为0或1,那么这个量就是互斥量

互斥量的值0或1,可以用来表示上锁和解锁两种状态,这种基于互斥量的锁也就叫做互斥锁,其最重要的特征就是互斥性(独占性),即线程1占有的时候,其他线程必须等待,如图所示 浅谈Go语言的并发控制

<2> 使用

假设有一个账户,转账线程先获得了锁,但是在过了一段时间后,100元才入账成功。在转账线程没有解锁的这段时间内,读取账户的线程在这段时间内无法获得锁

type Account struct {
	money int
	lock  sync.Mutex
}

func main() {

	// 我的账户
	var myAccount Account

	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {

		fmt.Print("write thread: try to get lock ...\n")
		myAccount.lock.Lock()
		fmt.Print("write thread: get lock ! \n")

		go func() {
			fmt.Print("read thread: try to get lock ...\n")
			myAccount.lock.Lock()
			fmt.Printf("read thread: get lock ! money = %d \n", myAccount.money)
			wg.Done()
		}()

		time.Sleep(3 * time.Second) // 模拟业务逻辑耗时
		myAccount.money = 100
		fmt.Print("write thread: write success ! \n")
		myAccount.lock.Unlock()
		wg.Done()
	}()
	wg.Wait()
}

浅谈Go语言的并发控制

<3> 适用场景

适用于读写频率没有显著区别,且任意时刻都必须保证只有一个线程可以写入或访问的情况

(2) RWMutex 读写锁

<1> 互斥锁的缺点

互斥锁有一个比较明显的缺点,如果被线程锁住时间较长,且线程在最后一刻才把数据写入,那么在被线程锁住的这打段时间内,会耽误其他线程的读取操作(比如展示用户当前粉丝数、金币数等),如图所示 浅谈Go语言的并发控制

<2> 读写锁

为了解决互斥锁占用资源、效率低的问题,就出现了一种新的锁,即读写锁

  • 读写锁具有两个状态 :写状态 和 读状态
  • 在写状态下,所有线程(读或写)都需要等待获取锁
  • 在读状态下,读线程可以并发获得锁,不需要等待,但是写线程必须等待所有读线程结束后才能获得锁

浅谈Go语言的并发控制

<3> 使用

在互斥锁代码的基础上做如下修改,转账线程开始只用读锁,在最后转账的时候,才用写锁。这样在转账线程未真正转账前,不会影响到读取账户的线程

type Account struct {
	money int
	lock  sync.RWMutex
}

func main() {

	// 我的账户
	var myAccount Account

	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {

		fmt.Print("write thread: try to get lock ...\n")
		myAccount.lock.RLock()
		fmt.Print("write thread: get lock ! \n")

		go func() {
			fmt.Print("read thread: try to get lock ...\n")
			myAccount.lock.RLock()
			fmt.Printf("read thread: get lock ! money = %d \n", myAccount.money)
			wg.Done()
			myAccount.lock.RUnlock()
		}()

		time.Sleep(3 * time.Second) // 模拟业务逻辑耗时
		myAccount.lock.RUnlock()
		myAccount.lock.Lock()
		myAccount.money = 100
		fmt.Print("write thread: write success ! \n")
		myAccount.lock.Unlock()
		wg.Done()
	}()
	wg.Wait()
}

浅谈Go语言的并发控制

<4> 适用场景

适用于多读少写的场景

(3) 扩展

<1> Mutex饥饿模式

Mutex内置了公平机制,即增加了饥饿模式,用来解决线程饥饿的问题

<2> 实现其他类型锁

有了Go提供的基本锁,可以在其基础上实现自旋锁、可重入锁

2、同步原语

原语一词译作primitive,是原始、远古的意思,表示已经到了最远始,不可再追溯的时间尽头,引申到计算机中就是不可再分割的、最小的操作语句

同步原语是一组用于子程序间同步语句,且这组语句在执行过程中不可再分割,即具有原子性

(1) atomic包

Go提供的基于原子读、原子写而实现的一系列原子操作,由于没有使用锁,所以是一种无锁(lock free)技术

<1> 读和写都是非原子的

在并发条件下,读操作、写操作存在不是原子操作的情况,如下图所示

  • 非原子读 :线程1读取的时候,被线程2的写入而干扰
  • 非原子写 :写=读+写,线程1读出数据后加一,新值还未写入,就被线程2先写入

浅谈Go语言的并发控制

<2> 原子读和原子写的实现

Go的原子读(Load)和原子写(Store),都是借助硬件层通过汇编实现的

  • 原子读 :依赖硬件层的 总线锁 或 缓存锁 实现
  • 原子写 :依赖硬件层的CPU指令 CMPXCHG

对于这部分,只需要知道Go底层的Load、Store操作等是通过硬件支持的即可,本文不再描述。下面主要介绍下CMPXCHG指令的思想 :CAS

<3> 理解CAS

理解CAS(Compare And Swap)需要从其本意上去思考,其包含**“比较和交换”**两个操作,先比较,如果一致则交换。由于每次只是检测是否一致,是否有冲突,所以这是一种基于冲突检测的机制。注意CAS是一种思想,既可以在硬件层实现(如CMPXCHG指令),也可以在应用上层实现(如乐观锁) 浅谈Go语言的并发控制

在比较中,如果发现不一样,一般的补偿机制是自旋,即不断重试以下操作

  1. 读取新值
  2. 再次比较是否一致,如果一致则写入
  3. 不一致则再次重试整个操作

<4> CAS的常见应用

目前基于CAS实现的技术非常多,常见的有如下几种

  • 乐观锁
  • MVCC

<5> CAS的优缺点

优点 :性能明显高于锁 缺点 :CPU开销大,ABA问题

<6> 一系列原子操作

既然有了最基本的原子操作 :原子读和原子写。因此很容易就可以在这两个操作的基础上,再实现一系列原子操作(当然还可以扩展出更多 ......),这样的一系列原子操作,都会收录在Go语言的atomic包中

  • Increase() :原子性的自增
  • Decrease() :原子性的自减
  • Add(x) :原子性的对某个数字增加X

<7> 使用

先看第一种不使用原子操作的情况,对 money 并发自增1000次,可以看到最后的结果并不是1000

func main() {
	money := 0
	wg := sync.WaitGroup{}

	wg.Add(1000)
	for i := 1; i <= 1000; i++ {
		go func() {
			money++
			wg.Done()
		}()
	}
	wg.Wait()
	fmt.Printf("money = %d \n", money)
}

浅谈Go语言的并发控制 再看使用原子操作的情况,结果是1000,实现了并发场景下的原子性

func main() {
	var money int32
	wg := sync.WaitGroup{}

	wg.Add(1000)
	for i := 1; i <= 1000; i++ {
		go func() {
			atomic.AddInt32(&money, 1)
			wg.Done()
		}()
	}
	wg.Wait()
	fmt.Printf("money = %d \n", money)
}

浅谈Go语言的并发控制

<8> 为什么不用锁

上面的代码使用锁,也可以达到相同的效果,但由于锁需要用户控制,容易产生死锁或bug,而原子操作由底层实现,安全可靠。所以一般情况下,只要是可以用原子操作代替锁的场景,都建议使用原子操作

<9> 适用场景

适用于需要对整型数字做并发安全控制的场景

(2) 信号量

信号量是一种由Dijkstra(迪杰斯特拉)发明的子程序间同步技术,包含一个信号量和两个PV原语,P原语用来尝试获取信号(-1),V原语用来新增信号(+1) ​

Go语言的实现主要由一个数据结构和三个具有原子性的方法组成,如下所示

type Semaphore struct {
	value int
}

func (s *Semaphore) add() {
	atomic.AddUint64(&s.value, 1)
}

func (s *Semaphore) done() {
    atomic.AddUint64(&s.value, -1)
}

func (s *Semaphore) wait() {
	for{
        if atomic.LoadUint64(&s.value) == 0{
			return
		}
	}
}

<1> 理解

信号量可以理解为门口的信号灯,每新来一辆车就排队(add),门里面的车未完全进入时,下一辆车必须等待(wait),直到门里面的车完全出去后(done),下一辆车才能进入 浅谈Go语言的并发控制

注意信号量的机制不是队列,图中内容只是为了更形象的理解

<2> 使用

在Go语言中,信号量由WaitGroup对象实现,主要包括add、done、wait三个操作。如果主协程开启了两个子协程,主协程需要等待两个协程工作完全结束后才能停止,实现方法如下

func main(){
	wg := sync.WaitGroup{}
	fmt.Print("main routine waiting ...\n\n")
	wg.Add(2)
	go func() {
		fmt.Print("goroutine1 working ...\n\n")
		time.Sleep(1 * time.Second) // 模拟业务逻辑耗时
		fmt.Print("goroutine1 done\n\n")
		wg.Done()
	}()

	go func() {
		fmt.Print("goroutine2 working ...\n\n")
		time.Sleep(2 * time.Second) // 模拟业务逻辑耗时
		fmt.Print("goroutine2 done\n\n")
		wg.Done()
	}()

	wg.Wait()
	fmt.Print("main routine stop ...\n\n")
}

浅谈Go语言的并发控制

<3> 适用场景

适用于一个协程等待另一个或一组协程结束的场景

(3) once

once也是Go提供的一种同步原语,具有一个重要特征 :即使并发,也只会执行一次

<1> 理解

once由一个Once对象和一个Do方法组成,内部基于锁和原子操作

  • 锁 :用来保证只有一个协程执行
  • 原子操作 :保证 done 字段读取的原子性
// Once 对象
type Once struct {
	done uint32 // 标记, 记录是否已执行, 0未执行, 1已执行
	m    Mutex // 锁, 用来保证只有一个协程执行
}

func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

<2> 使用

开启10个协程,每个协程都执行某个方法,要求这个方法只能被执行1次

func main() {
	var once sync.Once
    
	doOnce := func() {
		fmt.Print("\n\ndo only once ...\n\n\n")
	}
    
	for i := 0; i < 10; i++ {
		go func() {
			once.Do(doOnce)
		}()
	}
    
	time.Sleep(2 * time.Second)
}

浅谈Go语言的并发控制

<3> 适用场景

适用于在并发条件下,某个操作必须只做一次的场景

(4) cond

cond是Go提供的一种比较少用的同步原语,如果需要在某个条件下才唤醒一个协程或全部的协程的时候,可以考虑下cond。而且它能够让出处理器的使用权,提高 CPU 的利用率,远比我们使用 sleep() 这种方式优雅和高效

<1> 理解

使用步骤如下

  1. 先使用NewCond()创建一个cond变量
  2. 在不同的协程中,使用cond先lock(),然后wait()等待条件,最后unlock()
  3. 在需要唤醒不同协程的通知协程中,使用signal()唤醒一个协程,或使用lock()broadcast()unlock() 的方式唤醒所有协程
  4. 注意唤醒顺序是按照入链表顺序,即先进先出

浅谈Go语言的并发控制

<2> 使用

func main() {
    var condition = false
	var wg sync.WaitGroup
	wg.Add(3)
	mutexLock := sync.Mutex{}
	conditionPrimitive := sync.NewCond(&mutexLock)

	// 子协程拿到锁后, 发现条件为假, 会阻塞等待, 进入 notifyList 待唤醒列表
	go func() {
		conditionPrimitive.L.Lock() // 上锁
		for condition == false {
			fmt.Println("routine1 wait ...")
			conditionPrimitive.Wait() // 阻塞等待
		}
		fmt.Println("routine1 done: condition =", condition)
		conditionPrimitive.L.Unlock() // 解锁
		wg.Done()
	}()

	// 子协程拿到锁后, 发现条件为假, 会阻塞等待, 进入 notifyList 待唤醒列表
	go func() {
		conditionPrimitive.L.Lock() // 上锁
		for condition == false {
			fmt.Println("routine2 wait ...")
			conditionPrimitive.Wait() // 阻塞等待
		}
		fmt.Println("routine2 done: condition =", condition)
		conditionPrimitive.L.Unlock() // 解锁
		wg.Done()
	}()

	// 子协程拿到锁后, 发现条件为假, 会阻塞等待, 进入 notifyList 待唤醒列表
	go func() {
		conditionPrimitive.L.Lock() // 上锁
		for condition == false {
			fmt.Println("routine3 wait ...")
			conditionPrimitive.Wait() // 阻塞等待
		}
		fmt.Println("routine3 done: condition =", condition)
		conditionPrimitive.L.Unlock() // 解锁
		wg.Done()
	}()

	time.Sleep(2 * time.Second)

	// 主协程唤醒先第一个协程, 条件置为真
	fmt.Print("\nwakeup: \n")
	condition = true
	conditionPrimitive.Signal()

	// 主协程拿到锁后, 条件置为真, 并广播给剩下的所有子协程
	conditionPrimitive.L.Lock() // 上锁
	conditionPrimitive.Broadcast() // 广播给待通知链表中的所有协程
	fmt.Print("broadcast: \n\n")
	conditionPrimitive.L.Unlock() // 解锁

	wg.Wait()
}

浅谈Go语言的并发控制

<3> 适用场景

需要一个或一组协程等待,并在一定条件下才会被唤醒的情况

3、channel

Channel是Go提供的用于协程间通信的数据结构

(1) 使用

func work(done chan bool) {
	for {
		select {
		case <-done:
			fmt.Printf("routine done\n\n")
			return
		default:
			fmt.Print("routine doing ...\n")
			time.Sleep(300 * time.Millisecond) // 模拟业务逻辑耗时
		}
	}
}

func main() {
	done := make(chan bool)
	fmt.Print("main routine doing ...\n\n")
	go work(done)
	time.Sleep(2 * time.Second) // 模拟业务逻辑耗时
	
    done <- true // 通知子协程停止工作
    
	fmt.Printf("main done\n\n")
}

浅谈Go语言的并发控制

(2) 适用场景

适用于通知协程的场景

五、channel存在的问题

1、理想情况

在理想的场景下,协程间通信的情况不会很复杂,比如协程间的通信只是单向的、参与通信的协程数量极少、不存在级联操作等。这样用一个channel就可以解决通信的问题,如果不够再增加也不会很复杂

2、现实情况

现实中的场景会要复杂的多,如下所示的消息队列的消费服务,包含了主协程、后台协程、接收协程、调度协程、消费协程共5种协程。 浅谈Go语言的并发控制 后台协程用于监听信号,如退出信号、热更新信号、重启信号等,所以就必须为上述架构再增加一套协程间通信的设计,设计如下图所示 浅谈Go语言的并发控制 每两个协程之间都需要两个反向的channel,而且这些协程之间的级联操作很长,就会导致整个服务内部各个协程间的通信过于复杂

3、总结

channel适用于场景简单的少量协程间的通信,一旦发现需要增加很多channel才能解决问题的时候,就要考虑修改架构设计,或者使用一种专门解决大量channel间通信问题的技术 :Context

六、强大的Context

(1) 介绍

Context是Go内置的一种接口类型,在此基础上又定义了三种结构体类型,分别是cancelCtx、timerCtx和valueCtx 浅谈Go语言的并发控制 Go提供了WithCancel()WithDeadline()WithTimeout()WithValue() 4个用于创建上述3种ctx对象的方法,其中WithValue()会在当前ctx上写入一个KV键值对,用于实现协程间数据的传递,其他的三种方法都是基于父类ctx新建一个子ctx,用于实现协程退出后,与之相关的子协程的自动结束

(2) 理解

1、先通过 Background() 创建一个根ctx 2、有了这个根节点之后,再继续在其基础上通过With*()创建子ctx节点。 3、每一个ctx节点的结束,子ctx节点都会结束 4、如果需要传递数据,使用WithValue()即可 浅谈Go语言的并发控制 其中ctx之间的派生关系如下图所示,ctx之间可以传递数据,也可以方便的实现自动级联操作 浅谈Go语言的并发控制

(3) 使用

// 后台协程
func background(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Print("background done\n")
			return
		default:
		}
		time.Sleep(500 * time.Millisecond)
	}
}

// 接受协程
func receive(ctx context.Context) {

	for {
		ctx = context.WithValue(ctx, "message", rand.Int31n(100))
		dispatch(ctx)
		select {
		case <-ctx.Done():
			fmt.Print("receive done\n")
			return
		default:
		}
		time.Sleep(500 * time.Millisecond)
	}
}

// 调度协程
func dispatch(ctx context.Context) {
	go consume(ctx)
	select {
	case <-ctx.Done():
		fmt.Print("dispatch done\n")
		return
	default:
	}
}

// 消费协程
func consume(ctx context.Context) {
	select {
	case <-ctx.Done():
		fmt.Print("consume done\n")
		return
	default:
		fmt.Printf("consuming ... message = %d\n", ctx.Value("message"))
	}
}

func main() {

	rootCtx := context.Background()
	ctx, cancel := context.WithCancel(rootCtx)

	go receive(ctx)
	go background(ctx)
	time.Sleep(1 * time.Second)
	cancel()
	time.Sleep(1 * time.Second)
}

浅谈Go语言的并发控制

(4) 适用场景

适用于协程间复杂通信的场景,如级联操作

七、总结

无论在Go还是其他语言中,一旦遇到并发控制的问题,都可以使用下面的步骤分析并解决

1、要控制数据,还是控制行为 ?

2、应该选择直接控制还是间接控制 ?

3、尽量使用其他技术来代替锁,不要直接使用锁 !

4、尽量保持线程间关系的简单清晰,不要设计复杂 !