likes
comments
collection
share

Rust权威指南之无畏并发

作者站长头像
站长
· 阅读数 13

一. 简述

安全并且高效地处理并发编程是Rust的另一个主要目标。并发编程和并行编程这两种概念随着计算机设备的多核优化而变得越来越重要。并发编程允许程序中的不同部分相互独立地运行;并行编程则允许程序中不同部分同时执行。

二. 线程的创建

在大部分现在操作系统中,执行程序的代码会运行在进程中,操作系统会同时管理多个进程。类似地,程序内部也可以拥有多个同时运行的独立部分,用来运行这些独立部分的就叫做线程。

由于多线程可以同时运行,所有将城中计算操作拆分至多个线程可以提高性能。但是这也增加了程序的复杂度,因为不同线程的执行顺序是无法确定的。这可能会导致一系列的问题:

  • 当多个线程以不一致的顺序访问数据或资源时产生的竞争状态
  • 当两个线程同时尝试获取对方持有的资源时产生的死锁,它会导致这两个线程无法继续运行;
  • 只会出现在特定情况下且难以稳定重现和修复的Bug

下面让我们看一下标准库中线程API的使用

2.1. 使用spawn创建新线程

我们可以调用thread::spawn函数来创建线程,它接受一个闭包作为参数,该闭包会包含我们想要在新线程中运行的代码。

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number{} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1))
        }
    });
    for i in 1..5 {
        println!("hi number{} from the main thread!", i);
        thread::sleep(Duration::from_millis(1))
    }
}

这里需要注意当主线程运行结束之后,创建出来的新线程也会相应的停止,而不管它的打印任务是否完成。输出如下:

hi number1 from the main thread!
hi number1 from the spawned thread!
hi number2 from the main thread!
hi number2 from the spawned thread!
hi number3 from the main thread!
hi number3 from the spawned thread!
hi number4 from the main thread!
hi number4 from the spawned thread!
hi number5 from the spawned thread!

2.2. 使用Join句柄等待所有线程结束

在上面的例子中我们也可以看到一个问题,新线程的数据还没有执行完毕就因为main函数执行结束而结束。这并不是我们希望的,下面我们看一下使用join函数

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number{} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1))
        }
    });
    for i in 1..5 {
        println!("hi number{} from the main thread!", i);
        thread::sleep(Duration::from_millis(1))
    }
    handle.join().unwrap()
}

但是我们也是需要注意,join函数使用的位置不当也会有问题的,如下:

let handle = thread::spawn(|| {
    for i in 1..10 {
        println!("hi number{} from the spawned thread!", i);
        thread::sleep(Duration::from_millis(1))
    }
});
handle.join().unwrap();
for i in 1..5 {
    println!("hi number{} from the main thread!", i);
    thread::sleep(Duration::from_millis(1))
}

此时输出结果如下

hi number1 from the spawned thread!
hi number2 from the spawned thread!
hi number3 from the spawned thread!
hi number4 from the spawned thread!
hi number5 from the spawned thread!
hi number6 from the spawned thread!
hi number7 from the spawned thread!
hi number8 from the spawned thread!
hi number9 from the spawned thread!
hi number1 from the main thread!
hi number2 from the main thread!
hi number3 from the main thread!
hi number4 from the main thread!

2.3. 在线程中使用move闭包

move闭包常常被用来thread::spawn函数配合使用,它允许你在某个线程中使用来自另一个线程的数据。

当我们为了使用主线程中数据,新线程的闭包必须捕获它所需要的值。例子:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    // 使用move获取main函数中的变量
    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });
    // println!("{:?}", v); @1
    handle.join().unwrap();
}

新线程捕获v变量之后获取v的所有权之后。可以尝试去掉@1处的注释,运行提示我们value borrowed here after move错误。

三. 线程间消息传递

使用消息传递机制来保证并安全正在变得越来越流行。在这种机制中,线程或actor之间通过给彼此发送包含数据的消息来进行通信。

Go编程语言在处理并发编程的口号:不要通过共享内存来通信,而是通过通信来共享内存。

Rust在标准库中实现了一个名为通道的编程概念,它可以被用于实现基于消息传递的并发机制。编程中的通道由发送者和接收者两部分组成。某一处代码可以通过调用发送者的方法来传递数据,而另一处代码则可以通过检查接收者来获取数据。当我们丢弃了发送者或接收者的任何一端时,我们就相应的通道被关闭了。下面我们看例子:

use std::sync::mpsc;
use std::thread;

fn main() {
    // tx: Sender<?> 发送端
    // rx: Receiver<?> 接收端
    let (tx, rx) = mpsc::channel();
    // 新线程发送数据
    thread::spawn(move || {
        let val = String::from("hello");
        // val 发送到通道之内会发生所有权转移
        tx.send(val).unwrap();
        // println!("val => {}", val); ERROR value borrowed here after move
    });
    // main函数接受
    let received = rx.recv().unwrap();
    println!("收到消息: {}", received);
}

这里的mpsc::channel()返回的是一个包含发送端和接收端的元组。通道的接受端有两个可用于获取信息的方法:recvtry_recv。我们使用的recv会阻塞主线程的执行直到有值被传入通道。一旦有值被传入通道,recv就会将它包裹在Result<T, E>中返回。而如果通道的发送端全部关闭了,recv则会返回一个错误来表明当前通道再也没有可接受的值。try_recv方法不会阻塞线程,它会立即返回Result<T, E>:当通道中存在消息时,返回包含该消息的Ok变体;否则便返回Err变体。当某个线程需要一边等待消息一边完成其他工作时,try_recv方法会非常有用。我们可以使用循环对消息进行处理。

3.1. 发送多个值

下面我们演示一下新线程中将多个值发送到通道,在主线程中将rx视作迭代器,接收通道中的值并打印。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    // 开启新线程
    thread::spawn(move || {
        let vals = vec![
            String::from("hello"),
            String::from("world"),
            String::from("the"),
            String::from("thread"),
            String::from("haha"),
        ];
        // 遍历往通道中发送数据
        vals.into_iter().for_each(|item| {
            tx.send(item).unwrap();
            // 暂停一秒钟
            thread::sleep(Duration::from_secs(1));
        });
    });
    // 遍历接受数据
    for item in rx {
        println!("收到消息: {}", item);
    }
}

此时执行代码,我们可以看到主线程中的for循环中执行暂停或延迟指令,这也就表明主线程确实是在等待接收新线程中传递过来的值。输出结果如下:

收到消息: hello
收到消息: world
收到消息: the
收到消息: thread
收到消息: haha

3.2. 多生产者

下面我们通过克隆通道的发送端来创建多个能够发送值到同一个接收端的线程。实现多个生产者和单个消费者的模式。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = mpsc::Sender::clone(&tx);
    // 新线程1 通过move 获取tx1的所有权
    thread::spawn(move || {
        let vals = vec![
            String::from("tx1 => hi"),
            String::from("tx1 =>from"),
            String::from("tx1 => the"),
            String::from("tx1 => thread"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    // 新线程2 通过move获取tx的所有权
    thread::spawn(move || {
        let vals = vec![
            String::from("tx => hello"),
            String::from("tx => world"),
            String::from("tx => haha"),
        ];
        vals.into_iter().for_each(|item| {
            tx.send(item).unwrap();
            thread::sleep(Duration::from_secs(1));
        });
    });
    // 消费数据
    for item in rx {
        println!("收到消息: {}", item);
    }
}

四. 共享状态的并发

消息传递确实是一种不错并发通信机制,但它并不是唯一的解决方法,下面我们看一下如何通过共享内存来通信。

从某种程度来说,任何编程语言中的通道都有类似于单一所有权的概念,因为你不应该在值传递给通道之后在使用它。而基于共享内存的并发通信机制则更类似于多重所有权的概念:多个线程可以同时访问相同的内存地址。但是由于要管理多个所有者,所有这会增加额外的复杂性。Rust的类型系统和所有权规能够帮助我们正确的管理这些所有权。

4.1. 互斥体

互斥是共享内存领域比较常见的并发原语。一个互斥体在任意时刻只允许一个线程访问数据。为了访问互斥体中的数据,线程必须首先发出信号来获取互斥体的锁。锁是互斥体的一部分,这种数据结果被用来记录当前谁拥有数据的唯一访问权。通过锁机制,互斥体守护了它所持有的数据。

在使用互斥体的时候需要注意下面两点:

  • 必须在使用数据前尝试获取锁;
  • 必须在使用完互斥体守护的数据后释放锁,这样其他线程才继续完成获取锁

下面我们实现一个多个线程累加计算结果的例子:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0)); // @1
    let mut handlers = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter); // @2
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap(); // @3
            *num += 1;
        });
        handlers.push(handle);
    }
    for handle in handlers {
        handle.join().unwrap();
    }
    println!("计算结果:{}", *counter.lock().unwrap()); // @4 计算结果:10
}

下面我们对例子中关键的代码解释下:

  • 针对@1的代码,首先Mutex<T>是一个互斥体的智能指针,当对实例操作的时候需要先调用lock函数获取锁,获取锁之后将返回指向内部数据的可变引用;接着因为我们在多个线程中使用需要使用move捕获变量造成所有权转移,所有我们需要一个智能指针方便后续使用的时候进行引用克隆,此时我们第一个念头肯定想的是上一节学习的Rc<T>,但是Rc<T>并不是线程安全的,Rust提供了一个安全的计数引用的智能指针Arc<T>。总结来说就是使用Arc<T>包裹Mutex<T>来实现多线程共享所有权;
  • 对于@2的代码是这样的,对于引用进行安全复制(引用计数加一),将其所有权转移到指定的线程中;
  • 接着@3的代码,是在互斥体的使用的时候需要先lock函数获取锁,之后可以拿到内部数据的可变引用,此时我们就可以执行加一操作了;
  • 最后@4的操作,获取最后执行的结果;

4.2. 注意点

在上面的例子中,counter虽然是不可变,但我们仍然可以获取其内部值的可变性。这意味着,Mutex<T>Cell系列类型有着相似的功能,它同样提供了内部可变性。

另外还有一点需要注意,Rc<T>会产生循环引用的风险。两个Rc<T>值在互相指向对方时会造成内存泄漏。与之类似,使用Mutex<T>也会有产生死锁的风险。

五. 并发扩展

使用Sync traitSend trait对并发进行扩展。

只有实现了Send trait的类型才可以安全地在线程间转移所有权,除了Rc<T>等极少数的类型,几乎所有的Rust类型都实现了Send trait。任何完全由Send类型组成的复合类型都会自动标记为Send,几乎所有的原生类型都满足Send约束。

只有实现了Sync trait的类型才可以安全地被多个线程引用。换句话说,对于任何类型T,如果&T满足约束Send,那么T就是满足Sync的。这意味T的引用能够安全地传递至另外的线程中。与Send类似,所有原生类型都满足Sync约束,而完全满足Sync的类型组合的复合类型也都会被自动识别为满足Sync的类型。

最后需要注意手动实现SendSync是不安全的,另外SendSync甚至没有任何可供实现的方法。它们仅仅被用来强化与并发相关的不可变性。