likes
comments
collection
share

GO原子操作「源码分析+案例」

作者站长头像
站长
· 阅读数 4

最近在参加创作者训练营,把年前写的一些文章(原计划每周一篇)一起发了吧。本文虽然用 Go 写案例,但完全不影响大家对原子操作的理解,原子操作需要硬件提供支持,主流语言应该都支持原子操作语法糖的。

我列一些我写过的「Go 学习之路」文章,若有兴趣可以点链接看看。

写作背景

名称解释

原子操作:指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。来源于原子操作_百度百科。原子操作是依赖于硬件(CPU)支持,并不是靠软件就能实现的,在多核 CPU 中也是同样有效。

原子操作要比锁效率更高更快,因为它没有上下文切换,不涉及对共享资源的加锁。

汇编分析代码执行步骤

如果觉得文字抽象,举一个例子解释。假设有段代码如下

func TestAdd(t *testing.T) {
	var (
		i = 0
	)
	i++
}

代码片段中 i++ 操作,在底层这个操作会被拆解成下面几步:

1、  加载变量值:编译器从内存中加载变量 i 的当前值到寄存器中。

2、  执行自增操作: 编译器会在寄存器中对变量的值进行自增操作,通过加法指令来实现。

3、  存储结果: 编译器将自增后的结果存储回内存中的变量 i。

来一段汇编,蓝色框内比较重要。

GO原子操作「源码分析+案例」

1、  MOVL $0, "".i+4(SP):将 0 移动到相对于栈指针 SP 偏移量为 4 的地址处(i 地址)。

2、  MOVL $1, "".i+4(SP): 将 1 移动到相对于栈指针 SP 偏移量为 4 的地址处(i 地址),覆盖了前面的 0。

3、  MOVQ 8(SP), BP:相对于栈指针 SP 偏移量为 8 的地址的值加载到基址寄存器 BP 中。通常用于函数返回之前,将函数调用前的基址恢复到基址寄存器中。

4、  ADDQ $16, SP:将栈指针 SP 增加 16,释放栈空间。

5、  RET:函数返回。

我们再贴一段用原子操作的代码

func main() {
	var (
		i int32 = 0
	)
	atomic.AddInt32(&i1)
}

汇编如下 GO原子操作「源码分析+案例」

1、  LEAQ type.int32(SB), AX:将 type.int32 的地址加载到寄存器 AX 中。

2、  PCDATA 1, 1, 1, 0:一种用于指示调试信息的指令,告诉编译器关于程序计数器(PC)和数据的信息。

3、  NOP:空操作指令,没有实际的操作,用于调整代码的对齐或者填充。

4、  CALL runtime.newobject(SB):调用 runtime.newobject 函数,用于分配 int32 类型的新对象,并将对象的地址返回到寄存器 AX 中。

5、  MOVQ AX, "".&i+16(SP):将寄存器 AX 中的值(即新对象的地址)存储到 i 的地址处,偏移量为 16。

6、  MOVL $0, (AX):将立即数 0 存储到地址 AX 指向的内存中,也就是将新分配的对象的值初始化为 0。

7、  MOVQ "".&i+16(SP), CX:将地址处于栈指针 SP 偏移量为 16 的位置处的值(即变量 i 的地址)加载到寄存器 CX 中。

8、  MOVL $1, DX:将立即数 1 存储到寄存器 DX 中。

9、  LOCK:接下来的指令是一个原子操作(这个很重要)。

10、  XADDL DX, (CX):将寄存器 DX 中的值(即 1)加到寄存器 CX 指向的内存地址中的值上,并将结果存储到寄存器 CX 中。

11、  ......后面省略

可以看出原子操作是加了一把锁 LOCK 的,整个自增过程将作为一个原子操作单元执行,保证线程安全性和数据一致性。意味着其他线程无法在自增操作期间读取或修改变量 i 的。

我再贴一段加锁的代码

func main() {
	var (
		i  int32 = 0
		mu sync.Mutex
	)

	mu.Lock()
	i++
	mu.Unlock()
}

汇编如下 GO原子操作「源码分析+案例」

1、.....

2、CALL sync.(*Mutex).Lock(SB):调用Mutex的Lock方法,即对锁进行加锁操作。

3、....

4、CALL sync.(*Mutex).Unlock(SB):调用Mutex的Unlock方法,即对锁进行解锁操作

5、.....

go tool 生成汇编命令
go tool compile -S main.go 

go build main.go && go tool objdump ./main 

第一命令是编译,将源代码编译成 .o 目标文件,并输出汇编代码。

第二个命令是反汇编,即从可执行文件反编译成汇编,所以要先用 go build 命令编译出可执行文件。

我比较喜欢用第一个。

有些 GO SDK 版本执行命令会报错比如:“main.go:4:2: could not import sync (file not found)”,我换了 SDK 版本就好了,怀疑是 SDK 版本有 bug。

原子操作支持下面这些操作类型

1、  原子加法操作: 共享内存中进行加法操作,一般是实现计数器等场景。

2、  比较并交换操作(Compare-and-Swap,CAS): CAS 操作是一种乐观并发控制机制,它检查某个内存位置的值是否等于预期值,如果是,则将该位置的值原子性的更新为新值。

3、  加载和存储(Load/Store): Load 操作指从内存中加载一个值,而 Store 操作指将一个值存储回内存。

第一个问题大家思考下,为啥 CPU 不提供更复杂的原子操作类型呢?文章末尾给我的思考。

源码解析

GO 逐步完善 SDK 提供能力也更丰富。1.9 版本前原子操作仅提供指针参数函数、和 atomic.Value;1.9 版本后提供了类型+方法方式执行原子操作。

GO 1.9 前版本

GO 提供的原子操作在 sync/atomic 包 doc.go 文件,仅提供通用函数,归纳 GO 提供的原子操作函数。提供的数据类型有:int32、int64、uint32、uint64、uintptr、unsafe 包中的 Pointer。unsafe.Pointer 类型不提供原子加法操作函数。

Swap 交换值

用于原子地交换指定内存地址的值,并返回原来的值。提供了多个变体,可以用于不同的数据类型,提供下面这几种数据类型: GO原子操作「源码分析+案例」

用 int32 举例如下

func TestAtomicSwap(t *testing.T) {
	var (
int32 = 3
	)
	old := atomic.SwapInt32(&v, 6)
	
	fmt.Printf("old=%v,new=%v\n", old, v)
}

结果输出如下

=== RUN   TestAtomicSwap
old=3,new=6
--- PASS: TestAtomicSwap (0.00s)
PASS
Add 加法

用于原子地对指定的内存地址的值进行加法操作,并返回原来的值。也有不同变体,提供的类型如下: GO原子操作「源码分析+案例」

用 int32 举例如下

func TestAtomicAdd(t *testing.T) {
	var (
		v  int32 = 0
		wg sync.WaitGroup
	)

	for i := 0i < 200i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			time.Sleep(200)
			atomic.AddInt32(&v, 1)
		}()
	}

	wg.Wait()

	fmt.Printf("v=%v\n", v)
}

结果输出如下

=== RUN   TestAtomicAdd
v=200
--- PASS: TestAtomicAdd (0.00s)
PASS

那有同学要问了,如果对计数器做减法操作呢?第二个参数传负数就可以了。

第一个问题大家思考下,uint32, uint64 类型怎么做减法呢?文章末尾给答案。

func TestAtomicAdd(t *testing.T) {
	var (
int32 = 10
	)
	out := atomic.AddInt32(&v, -2)

	fmt.Printf("v=%v,out=%v\n", v, out)
}

结果输出如下

=== RUN   TestAtomicAdd
v=8,out=8
--- PASS: TestAtomicAdd (0.00s)
PASS
CompareAndSwapInt32 比较和替换

用于执行比较并交换(compare-and-swap)操作。也有不同变体,提供的数据类型如下: GO原子操作「源码分析+案例」 用 int64 举例如下

func TestAtomicCompareAndSwap(t *testing.T) {
	var (
		v  int64 = 0
		wg sync.WaitGroup
	)

	for i := 0i < 200i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			time.Sleep(200)
			if swapped := atomic.CompareAndSwapInt64(&v, 0, 3); swapped {
				fmt.Printf("CompareAndSwapInt64 success...")
			}
		}()
	}

	wg.Wait()

	fmt.Printf("v=%v\n", v)
}

结果输出如下

=== RUN   TestAtomicCompareAndSwap
CompareAndSwapInt64 success...v=3
--- PASS: TestAtomicCompareAndSwap (0.00s)
PASS
Load 和 Store 加载和存储

Load 函数从内存地址 addr 中读取一个 uint32 类型的值,并返回这个值。 GO原子操作「源码分析+案例」

Store 函数将一个 uint32 类型的值 val 存储到内存地址 addr 中。 GO原子操作「源码分析+案例」

用 int64 举例如下

func TestAtomicStoreAndLoad(t *testing.T) {
	var (
		v  int64 = 0
		wg sync.WaitGroup
		m  = sync.Mutex{}
	)

	for i := 0i < 200i++ {
		wg.Add(1)
		go func() {
			defer func() {
				wg.Done()
			}()

			if val := atomic.LoadInt64(&v); val == 1 { // 快路径
				return
			}

			m.Lock() // 满路径需要加锁
			if val := atomic.LoadInt64(&v); val == 1 {
				return
			}
			atomic.StoreInt64(&v, 1) // 存储数据
			fmt.Printf("累加v=%v\n", v)
			m.Unlock()
		}()
	}

	wg.Wait()
}

结果输出如下

=== RUN   TestAtomicStoreAndLoad
累加v=1
--- PASS: TestAtomicStoreAndLoad (0.00s)
PASS

这几行代码跟 sync.Once 实现时一样的原理,快路径通过原子操作快速判断和返回,慢路径通过加锁的方案处理临界区。

第一个问题大家思考下,你们发现没?原子操作的第一个参数传的是 “addr *int64” 指针,为什么?文章末尾给答案。

GO 1.9 后版本

1.9 之后的版本增加了类型+方法使用方式,sync/atomic 包 type.go 文件。也仅支持这些类型:int32、int64、uint32、uint64、uintptr、unsafe.Pointer。我用 int32 和 unsafe.Pointer 举例,其他类型用法差异不大,自行研究。

atomic.Int32 类型

针对 int32 类型的原子操作

func TestInt32(t *testing.T) {
	var (
		n  atomic.Int32
		wg sync.WaitGroup
	)

	n.Store(2)
	for i := 0i < 200; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			time.Sleep(200)
			n.Add(1)
		}()
	}

	wg.Wait()
	fmt.Printf("累加v=%d\n", n.Load())
}

结果输出如下

=== RUN   TestName
累加v=202
--- PASS: TestName (0.00s)
PASS
atomic.Pointer 指针类型

用于执行针对指针类型的原子操作。它允许原子地读取和写入指针类型的值

func TestPointer(t *testing.T) {
	type P struct{ x int }

	var (
		pT     atomic.Pointer[P]
		ta, tb = P{1}, P{2}
	)

	pT.Store(&ta)
	fmt.Println(pT.Load())

	pa1 := pT.Load()
	fmt.Println(pa1 == &ta)

	pa2 := pT.Swap(&tb)
	fmt.Println(pa2 == &ta)
	fmt.Println(pT.Load())

	b := pT.CompareAndSwap(&ta, &tb)
	fmt.Println(b)

	b = pT.CompareAndSwap(&tb, &ta)
	fmt.Println(b)
}

结果输出如下

=== RUN   TestPointer
&{1}
true
true
&{2}
false
true
--- PASS: TestPointer (0.00s)
PASS

atomic.Value 类型

在 sync/atomic 包 value.go 文件,允许原子地存储和加载任意类型的值,而不需要额外的锁。你也可以把它理解成一个容器,可以放任意值。

func TestValue(t *testing.T) {
	type T struct{ ab, c int }
	var (
		x = T{123}
		y = T{456}
		z = T{789}
		v atomic.Value
	)

	v.Store(x)
	fmt.Println(v) 

	old := v.Swap(y)
	fmt.Println(v)       // {{4 5 6}}
	fmt.Println(old.(T)) // {1 2 3}
	swapped := v.CompareAndSwap(x, z)
	fmt.Println(swapped, v) // false {{4 5 6}}
	swapped = v.CompareAndSwap(y, z)
	fmt.Println(swapped, v) // true {{7 8 9}}
}

结果输出如下

=== RUN   TestValue
{{1 2 3}}
{{4 5 6}}
{1 2 3}
false {{4 5 6}}
true {{7 8 9}}
--- PASS: TestValue (0.00s)
PASS

使用 atomic.Value 有几个点要注意:

1、  不能存储 nil,否则会 Panic。(panic: sync/atomic: store of nil value into Value)

2、  我们向 atomic.Value 中第一次添加值后,后续添加的类型只能是第一次添加的类型,否则会 Panic。(panic: sync/atomic: store of inconsistently typed value into Value)

3、  尽量别向 atomic.Value 中存入引用类型,可能造成数据一致性问题。

func TestValue(t *testing.T) {
	type m map[int]int

	var (
		v atomic.Value
	)
	v.Store(make(m))

	m1 := v.Load().(m)
	m1[1] = 200

	m2 := v.Load().(m)
	fmt.Printf("mlen=%d,val=%d\n", len(m2), m2[1])
}

结果输出如下

=== RUN   TestValue1
mlen=1,val=200
--- PASS: TestValue1 (0.00s)
PASS

如果要解决这个问题也很简单,GO 官方举另一个很有意思例子,写场景加同步锁就可以解决啦。

func TestValue(t *testing.T) {
	type m map[int]int

	var (
		v  atomic.Value
		mu sync.Mutex // 用于写场景
	)
	v.Store(make(m))

	// 读场景
	read := func(key int) (val int) {
		m1 := v.Load().(m)
		return m1[key]
	}

	// 写场景
	write := func(key, val int) {
		mu.Lock()
		defer mu.Unlock()

		m1 := v.Load().(m)
		m2 := make(m) // 创建一个新的 map
		for k, v := range m1 {
			m2[k] = v // 复制所有数据到新的 map 里面
		}

		m2[key] = val // 更新数据
		v.Store(m2)   // 更新后的数据存储到新的 map 中
	}
	_ = read
	_ = write
}

总结

1、  原子指针函数和类型+方法的两种方式都可以执行原子操作,我更建议用类型+方法的方式,因为更简单更清晰。

2、  原子操作比互斥锁更轻便,但使用也是有限制的,原子操作仅提供有限的数据类型,使用时要珍酌清楚。

3、  atomic.Value 原子值中存储引用类型时,使用一定要注意否则会有安全性问题哈,最好别存引用类型。

4、  atomic.Value 不要存 nil,后续添加的类型一定要是第一次添加的类型,否则会 Panic。

5、  不要对外暴露原子变量(原子变量控制在包内访问)、不要将原子值及其指针值通过参数、通道等传递。

问题解答

1、  为啥 CPU 不提供更复杂的原子操作类型呢?

名词解释那部分我提到过原子操作是不会被中断的,肯定是越快越好。如果某一个操作一直没有完成,CPU 指令一直被占用影响是很大的,可能会导致性能下降。另外我猜测也有复杂性和通用性的考虑吧。

2、  为什么原子操作的第一个参数传的是 “addr *int64” 指针?

因为原子操作函数需要操作的是变量的地址,而不是变量的值本身。原子操作提供的可操作类型都比较简单,这些类型基本都是“非引用类型“,如果传入的是值本身,修改后对原值是无效的。大家可能会问了,原子操作不是可以操作 “unsafe.Pointer”?它不是指针吗?这个问题不是很好理解。“unsafe.Pointer” 作为参数传入只有 CompareAndSwapPointer、LoadPointer、StorePointer、SwapPointer 这 4 个函数,并没有提供数学运算操作,所以并不是操作 “unsafe.Pointer” 对应的值,而是操作 “unsafe.Pointer”,所以当然需要传入指针了。

3、  对于有符号整数类型T (int32或int64),调用AddT 函数的第二个参数可以是一个负值,以执行原子减操作。但是如何对 unsigned 类型 T 的值进行原子递减操作,例如 uint32, uint64 ?简单写几行代码

方案一:定义临时变量绕过编辑器检查

func TestUint32(t *testing.T) {
	var (
		n = atomic.Uint32{}
	)

	n.Add(30)
	// 方案一:定义临时变量绕过编辑器
	d := int32(-3)
	n.Add(uint32(d))
	fmt.Printf("n=%v\n", n.Load())
}

方案二:先把差量的绝对值减 1,再把得到的整数常量转换为 uint32 类型,在这个值之上做按位异或操作,就可以得到最终值。

func TestUint32(t *testing.T) {
	var (
		n = atomic.Uint32{}
	)

	n.Add(30)
	// 方案二:取反
	var tt int64 = 5
	n.Add(^uint32(tt - 1))
	fmt.Printf("n=%v\n", n.Load())
}

参考资料

atomic package - sync/atomic - Go Packages