likes
comments
collection
share

Rust 并发编程- Atomic 并发原语

作者站长头像
站长
· 阅读数 5

原子类型和原子操作

原子(atom)指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。

原子操作( atomic operation )是指不可分割且不可中断的一个或一系列操作,在并发编程中需要由 CPU 层面做出一些保证,让一系列操作成为原子操作。 一个原子操作从开始到结束可以是一个操作步骤,也可以包含多个操作步骤,这些步骤的顺序不可以被打乱,执行过程也不会被其他机制打断。

注:由于原子操作是通过指令提供的支持,因此它的性能相比消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁释放锁等问题,同时支持修改读取等操作,并具备较高的并发性能,几乎所有的语言都支持原子类型。

原子类型是用来帮助开发者更轻松的实现原子操作的数据类型,原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了CAS循环,当大量的冲突发生时,该等待还是得等待!但是总归比锁要好。

注:CAS 全称是 Compare and swap, 它通过一条指令读取指定的内存地址,然后判断其中的值是否等于给定的前置值,如果相等,则将其修改为新的值

Atomic 原子操作作为一个并发原语,是实现所有并发原语的基石,几乎所有的语言都支持原子类型和原子操作,比如 Java 的java.util.concurrent.atomic提供了很多原子类型,Go 语言的sync/atomic包提供了对原子操作的支持,Rust 也不例外。

注:原子操作是 CPU 的概念,而编程语言中也有类似的概念,叫做并发原语。并发原语是内核提供给外核调用的函数,这种函数在执行过程中不允许中断。

Rust 中的 Atomic 并发原语

Rust 中的原子类型位于std::sync::atomic module中。

这个 module 的文档中对原子类型有如下描述: Rust 中的原子类型在线程之间提供原始的共享内存通信,并且是其他并发类型的构建基础。

std::sync::atomic module 目前共提供了以下12种原子类型:

AtomicBool
AtomicI8
AtomicI16
AtomicI32
AtomicI64
AtomicIsize
AtomicPtr
AtomicU8
AtomicU16
AtomicU32
AtomicU64
AtomicUsize

原子类型与普通的类型基本上没有太多的区别,例如AtomicBoolbool,只是一个可以在多线程中使用,另一个则更适用于单线程下使用。

AtomicI32为例,它的定义是一个结构体,有以下原子操作相关的方法:

pub fn fetch_add(&self, val: i32, order: Ordering) -> i32 - 对原子类型进行加(或减)运算
pub fn compare_and_swap(&self, current: i32, new: i32, order: Ordering) -> i32 - CAS(rust 1.50废弃, 由compare_exchange替代)
pub fn compare_exchange(&self, current: i32, new: i32, success: Ordering, failure: Ordering) -> Result<i32, i32> - CAS
pub fn load(&self, order: Ordering) -> i32 - 从原子类型内部读取值
pub fn store(&self, val: i32, order: Ordering) - 向原子类型内部写入值
pub fn swap(&self, val: i32, order: Ordering) -> i32 - 交换

可以看到每个方法都有一个 Ordering 类型的参数,Ordering 是一个枚举,表示该操作的内存屏障的强度,用于控制原子操作使用的内存顺序

注:内存顺序是指 CPU 在访问内存时的顺序,该顺序可能受以下因素的影响:

  • 代码中的先后顺序
  • 编译器优化导致在编译阶段发生改变(内存重排序 reordering)
  • 运行阶段因 CPU 的缓存机制导致顺序被打乱
pub enum Ordering {
    Relaxed,
    Release,
    Acquire,
    AcqRel,
    SeqCst,
}

Rust 中 Ordering 这个枚举的枚举值分别代表什么:

  • Relaxed, 这是最宽松的规则,它对编译器和 CPU 不做任何限制,可以乱序
  • Release 释放,设定内存屏障(Memory barrier),保证它之前的操作永远在它之前,但是它后面的操作可能被重排到它前面(用于写入)
  • Acquire 获取,设定内存屏障,保证在它之后的访问永远在它之后,但是它之前的操作却有可能被重排到它后面,往往和Release在不同线程中联合使用(用于读取)
  • AcqRel, 是 Acquire 和 Release 的结合,同时拥有它们俩提供的保证。对于load,它使用的是 Acquire 命令,对于store,它使用的是 Release 命令,希望该操作之前和之后的读取或写入操作不会被重新排序。AcqRel一般用在fetch_add
  • SeqCst 顺序一致性, SeqCst就像是AcqRel的加强版,它不管原子操作是属于读取还是写入的操作,只要某个线程有用到SeqCst的原子操作,线程中该SeqCst操作前的数据操作绝对不会被重新排在该SeqCst操作之后,且该SeqCst操作后的数据操作也绝对不会被重新排在SeqCst操作前;它还保证所有线程看到的所有的 SeqCst 操作的顺序是一致的(虽然性能低,但是最保险)

通过这个Ordering枚举类型的参数,开发者可以自己定制底层的 Memory Ordering。

注:什么是 Memory Ordering, 摘录维基百科中的定义:

Memory Ordering (内存排序) 是指 CPU 访问主存时的顺序。可以是编译器在编译时产生,也可以是 CPU 在运行时产生。反映了内存操作重排序,乱序执行,从而充分利用不同内存的总线带宽。现代处理器大都是乱序执行。因此需要内存屏障以确保多线程的同步。

关于对Memory Ordering 的理解,有两个线程都要操作 AtomicI32 类型,假设 AtomicI32 类型数据初始值是0,一个线程执行读操作,另一个线程执行写操作要将数据写为10。假设写操作执行完成后,读线程再执行读操作就一定能读到数据10吗? 答案是不确定的,由于不同编译器的实现和CPU的优化策略,可能会出现虽然写线程执行完写操作了,但最新的数据还存在CPU的寄存器中,还没有同步到内存中。为了确保寄存器到内存中的数据同步,就需要Memory Ordering了。 Release 可以理解为将寄存器的值同步到内存,Acquire 是忽略当前寄存器中存的值,而直接去内存中读取最新的值。 例如当我们调用原子类型的 store 方法时提供的 Ordering 是 release,在调用原子类型的load 方法时提供的 Ordering 是 Acquire 就可以保证执行读操作的线程一定会读到寄存器里最新的值。

多线程中使用 Atomic

因为原子类型都实现了Sync trait,所以原子类型的变量在线程之间共享是安全的,但因为它们本身没有提供共享机制,因此比较常见的用法是将其放在原子引用计数智能指针Arc。 下面是官方文档中一个简单的自旋锁的例子:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    // 使用原子类型创建一个锁,通过引用计数获得共享所有权
    let spinlock = Arc::new(AtomicUsize::new(1));
    // 引用计数 +1
    let spinlock_clone = spinlock.clone();
    
    let thread = thread::spawn(move || {
        // SeqCst排序:写操作(存储)使用release 语义:写屏障之前的读写操作不能重排在写屏障之后
        spinlock_clone.store(0, Ordering::SeqCst);
    });

    // 使用 while循环,来等待某个临界区可用的一种锁
    // SeqCst排序:读操作(读取)使用 acquire 语义 读屏障之后的读写操作不能重排到读写屏障之前
    // 上面的线程中的写(存储)指令,下面的指令要求之后的读写操作不能在此之前
    while spinlock.load(Ordering::SeqCst) != 0 {}

    if let Err(panic) = thread.join() {
        println!("Thread had an error: {:?}", panic);
    }
}

注:自旋锁是指当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被其他线程获取,那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。 自旋锁实际上是通过 CPU 空转 (spin) 忙等待 (budy wait),例如上面代码中的 while 循环,来等待某个临界区可用的一种锁。

使用自旋锁可以的减少线程的阻塞,适用于对锁的竞争不激烈,且占用锁时间非常短的场景。 但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁,受保护的临界区过大,线程自旋的就消耗大于线程阻塞挂起操作的消耗,自旋操作会一直占用CPU做无用功,就会造成CPU浪费,其他需要CPU的线程反而不能获得CPU,系统性能会急剧下降。

上面例子是自旋锁功能的实现,并且使用的内存排序是Ordering::SeqCst,下面我们尝试实现一个自选锁:

use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use std::thread;
use std::time::Duration;

struct SpinLock {
    lock: AtomicBool,
}

impl SpinLock {
    pub fn new() -> Self {
        Self {
            lock: AtomicBool::new(false),
        }
    }

    pub fn lock(&self) {
        while self
            .lock
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
            .is_err()
        // 尝试加锁, 如果加锁失败则一直自旋
        {
            // CAS的消耗比较大, 当加锁失败时, 通过简单load读取锁的状态, 只要读取到锁被释放时才会再去尝试CAS加锁
            while self.lock.load(Ordering::Relaxed) {}
        }
    }

    pub fn unlock(&self) {
        // 解锁
        self.lock.store(false, Ordering::Release);
    }
}

fn main() {
    let spinlock = Arc::new(SpinLock::new());
    let spinlock1 = spinlock.clone();

    let thread = thread::spawn(move || {
        // 子线程加锁1,内部调用了compare_exchange 方法,修改状态
        spinlock1.lock();
        thread::sleep(Duration::from_millis(100));
        println!("do something1!");
        // 子线程解锁1
        spinlock1.unlock();
    });
    
    thread.join().unwrap();

    // 主线程加锁
    spinlock.lock();
    println!("do something2!");
    // 主线程解锁
    spinlock.unlock();
}

上面我们实现的自旋锁,本质就是一个原子类型AtomicBool,它的初始值为false

当执行lock方法进行加锁操作时,利用了原子操作CAS的特性,如果compare_exchange失败,则尝试加锁的线程会卡在这个while循环中自旋。 这里有一个性能上的小优化,因为执行CAS消耗代价比较大,所以在CAS失败时,再不断通过简单load读取锁的状态, 只有读取到锁被释放时才会再去尝试CAS加锁。这样效率更好一些。

当执行unlock方法时,直接将AtomicBool设置storefalse,采用的 Memory Ordering 是Release,会将寄存器中的值与内存中的值同步,内存中就为false。此时,如果有线程卡在lock方法while循环处自旋,CAS操作compare_exchange采用的 Memory Ordering 是Acquire,将会忽略其自己当前寄存器中的值,从内存中读取到新的值为falseCAS将执行成功,也就是加锁成功。

Atomic 能替代锁吗

那么原子类型既然这么全能,它可以替代锁吗?答案是不行:

  • 对于复杂的场景下,锁的使用简单粗暴,不容易有坑;
  • std::sync::atomic包中仅提供了数值类型的原子操作:AtomicBoolAtomicIsizeAtomicUsize等,而锁可以应用于各种类型;
  • 在有些情况下,必须使用锁来配合,比如MutexRwLockCondvar等;

Atomic 的应用场景

事实上,Atomic虽然对于用户不太常用,但是对于高性能库的开发者、标准库开发者都非常常用,它是并发原语的基石,除此之外,还有一些场景适用:

  • 无锁(lock free)数据结构
  • 全局变量,例如全局自增 ID, 在后续章节会介绍
  • 跨线程计数器,例如可以用于统计指标

以上列出的只是Atomic适用的部分场景,具体场景需要大家未来根据自己的需求进行权衡选择。

总结

原子(atom)就是类比生物学中不可再分的原子,原子操作(atomic operation)就是“不可再被中断的一个或一系列操作”。原子类型是用来帮助开发者更轻松的实现原子操作的数据类型。并发原语是内核提供给外核调用的函数,这种函数在执行过程中不允许中断。

Atomic原子类型是无锁类型,内部使用了CAS循环,不需要开发者处理加锁释放锁的问题,同时支持修改读取等原子操作,这些操作是通过指令提供的支持,因此它的性能相比消息传递会好很多。原子操作需要配合使用Ordering内存排序,通过这个Ordering枚举类型的参数,开发者可以自己定制底层的 Memory Ordering。因为Atomic原子类型性能比锁高不少,所以在 Rust 中有广泛的使用场景,比如作为作为全局变量,作为跨线程变量等,但是无法完全取代锁,因为锁足够简单。

原子操作可以归纳为以下5类操作:

  • fetch_add - 对原子类型进行加(或减)运算
  • compare_and_swapcompare_exchange - 比较,如果相等则进行交换
  • load - 从原子类型内部读取值
  • store - 向原子类型内部写入值
  • swap - 交换

参考

转载自:https://juejin.cn/post/7211095650130149434
评论
请登录