【译】Rust中Arc与Mutex
原文标题:Arc and Mutex in Rust 原文链接:itsallaboutthebit.com/arc-mutex/ 公众号: Rust 碎碎念 翻译 by: Praying
使用 Rust 进行并发编程,你迟早都会遇到Arc
和Mutex
类型。尽管在许多语言中都有与Mutex
类似的概念,但是在 Rust 之前,你可能没有听过Arc
。更重要的是,如果你不把它们与 Rust 的所有权模型放在一起思考,可能很难完全理解这些概念。这篇文章记录了我对 Rust 中的 Arc
和 Mutex
的理解。
通常情况下,当你在并发环境中共享数据时,要么使用共享内存,要么以消息形式传递数据。你可能经常听到传递消息(例如使用 Channel)是处理并发的首选方式,但我认为由于 Rust 中的所有权模型,二者在安全性或正确性上的区别并不像在其他语言中那么大。更具体地说:安全的 Rust 不允许有数据竞争。这也是为什么当我在 Rust 中选择消息传递或共享内存时,主要考虑的是便利性而非安全性。
如果你选择以共享内存的方式进行数据共享,你会发现,没有Arc
和Mutex
几乎寸步难行。Arc
是一种智能指针,它能够让你在多线程之间安全地共享某个值。Mutex
是另一种类型包装器(wrapper),它使得某个值可以在多线程之间被安全地修改。为了完全理解这些概念,让我们先来了解 Rust 的所有权模型。
Rust 中的所有权(Ownership in Rust)
如果你尝试对 Rust 中的所有模型进行提炼总结,可能会得到下面几条:
-
一个值只能有一个所有者
-
可以对一个值拥有多个不可变引用
-
对一个值只能有一个可变引用
让我们来看看这几条规则是如何工作的。给定一个User
结构体,它包含一个字段name
,类型为String
。我们创建一个线程,并使用user
打印一条消息。
use std::thread::spawn;
#[derive(Debug)]
struct User {
name: String
}
fn main() {
let user = User { name: "drogus".to_string() };
spawn(move || {
println!("Hello from the first thread {}", user.name);
}).join().unwrap();
}
目前为止一切顺利,程序按照预期编译并输出信息。现在我们添加第二个线程(对应代码中的 t2),也去访问user
实例。
1 fn main() {
2 let user = User { name: "drogus".to_string() };
3
4 let t1 = spawn(move || {
5 println!("Hello from the first thread {}", user.name);
6 });
7
8 let t2 = spawn(move || {
9 println!("Hello from the second thread {}", user.name);
10 });
11
12 t1.join().unwrap();
13 t2.join().unwrap();
14 }
编译这段代码会得到下面的错误信息。
error[E0382]: use of moved value: `user.name`
--> src/main.rs:15:20
|
11 | let t1 = spawn(move || {
| ------- value moved into closure here
12 | println!("Hello from the first thread {}", user.name);
| --------- variable moved due to use in closure
...
15 | let t2 = spawn(move || {
| ^^^^^^^ value used here after move
16 | println!("Hello from the second thread {}", user.name);
| --------- use occurs due to use in closure
|
= note: move occurs because `user.name` has type `String`, which does not implement the `Copy` trait
错误信息显示“use of moved value: user.name
”。编译器也贴心地给我们指出问题出现的代码位置。在第 11 行(对应于上面代码第 4 行)代码中,我们把值移动到第一个线程中,接着在第 15 行(对应于上面代码第 8 行)代码,我们有尝试对第二个线程做同样的事情。如果你看过所有权规则,应该不难理解——一个值只能有一个所有者。在上面的代码中,如果我们想要使用user.name
,就需要把它移动(move) 到第一个线程中,也因此我们就不能把它移动(move) 到另一个线程中。否则,就违背了所有权规则。但是,我们没有对数据进行修改(mutate),这意味着我们可以对数据有多个(不可变)引用。让我们来试试。
fn main() {
let user = User { name: "drogus".to_string() };
let t1 = spawn(|| {
println!("Hello from the first thread {}", &user.name);
});
let t2 = spawn(|| {
println!("Hello from the second thread {}", &user.name);
});
t1.join().unwrap();
t2.join().unwrap();
}
在这段代码上,我移除了闭包中的move
关键字,并让线程不可变地借用(borrow) user,或者换句话说,我使用了一个由&
表示的共享引用。编译这段代码会得到下面的信息。
error[E0373]: closure may outlive the current function, but it borrows `user.name`, which is owned by the current function
--> src/main.rs:15:20
|
15 | let t2 = spawn(|| {
| ^^ may outlive borrowed value `user.name`
16 | println!("Hello from the first thread {}", &user.name);
| --------- `user.name` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src/main.rs:15:14
|
15 | let t2 = spawn(|| {
| ______________^
16 | | println!("Hello from the second thread {}", &user.name);
17 | | });
| |______^
help: to force the closure to take ownership of `user.name` (and any other referenced variables), use the `move` keyword
|
15 | let t2 = spawn(move || {
| ++++
现在,编译器报出的信息说,闭包可能比当前函数(main)的生存周期更长。换言之,也就是说 Rust 编译器无法保证线程中的闭包能够在 main 函数结束之前结束。user
被线程借用,但是它被 main 函数所拥有。这种场景下,如果 main 函数结束,user
就会离开作用域然后被销毁(Drop)。因此,如果按照上面的方式在线程间共享user
,那么就可能会出现这种情况——线程尝试去读取已经被释放的内存。这是一种未定义行为并且我们也不希望出现这种情况。
编译器的提示信息还告诉我们,把变量user
移动到线程中进而避免借用可能是有帮助的。不过这种情况我们在一开始就尝试了,并不太好。现在,有两种方式可以解决这个问题,其中之一便是Arc
。不过,让我们先来了解另一种方式:scope thread。
Scoped threads
Scoped threads 这一特性既可以通过crossbeam这个库获取,也可以通过 Rust 中的实验性 nightly 功能获取。本文使用了 crossbeam 提供的实现,但两种方式在使用上区别不大。在Cargo.toml
中的依赖项中加入crossbeam = "0.8"
,下面的代码就可以解决之前的问题。
use crossbeam::scope;
#[derive(Debug)]
struct User {
name: String,
}
fn main() {
let user = User {
name: "drogus".to_string(),
};
scope(|s| {
s.spawn(|_| {
println!("Hello from the first thread {}", &user.name);
});
s.spawn(|_| {
println!("Hello from the second thread {}", &user.name);
});
})
.unwrap();
}
Scoped threads 的工作原理是,在某作用域内创建的所有线程都能被保证先于scope
闭包结束。也就是说
- 在作用域闭包(scoped closure)离开作用域之前,线程被 join 并等待完成。得益于此,编译器知道没有任何借用会比所有者生存周期更长。
一个有趣的事情在这里值得注意,作为一个人类读者,我们会把这两个程序都解释为有效。在 Rust 拒绝的版本中,我们在main()
函数结束之前就 join 了两个线程,所以将user
与线程共享实际上是安全的。不幸的是,这是你在用 Rust 编写时可能遇到的情况。编写一个能接受所有有效程序的编译器是不可能的,因此我们只能退而求其次:编写一个能拒绝所有无效程序的编译器,但代价是过于严格。Scoped threads 是专门为了让我们以编译器可以接受的方式编写这种代码而加入的特性。
尽管 Scoped threads 有用,但并非所有的场合都适用,例如在编写异步(async)代码时,下面让我们来看看Arc
。
Arc
Arc
是一种能够使得数据在线程间安全共享的智能指针,其名字是Atomic Reference Counter
三个单词的首字母缩写。它的工作方式从本质上来讲,是对将要共享的数据进行包装,并表现为此数据的一个指针。Arc
会追踪这个指针的所有拷贝,当最后一份拷贝离开作用域时,它就会安全释放内存。通过Arc
解决之前问题的方案如下。
1 use std::thread::spawn;
2 use std::sync::Arc;
3
4 #[derive(Debug)]
5 struct User {
6 name: String
7 }
8
9 fn main() {
10 let user_original = Arc::new(User { name: "drogus".to_string() });
11
12 let user = user_original.clone();
13 let t1 = spawn(move || {
14 println!("Hello from the first thread {}", user.name);
15 });
16
17 let user = user_original.clone();
18 let t2 = spawn(move || {
19 println!("Hello from the first thread {}", user.name);
20 });
21
22 t1.join().unwrap();
23 t2.join().unwrap();
24}
让我们一步一步来看。首先,在第 10 行,我们创建了一个user
并用一个Arc
对它进行包装。现在,这个值存储在内存中并且Arc
表现为它的一个指针。无论什么时候,我们对Arc
进行克隆,都只是克隆了一份引用,而不是user
本身。在第 12 行和第 17 行,我们分别对Arc
进行了克隆,并借着把克隆的指针移动到了新的线程中。如我们所见,Arc
允许我们在不必考虑生命周期的情况下共享数据。在这个例子中,我们有三个指针指向user
。一个是通过创建Arc
得到,一个在开始第一个线程之前通过克隆得到并被移动到第一个线程中,还有一个是在开始第二个线程之前通过克隆得到并被移动到第二个线程中。只要这三个指针中的任何一个还在存活,Rust 就不会销毁user
和释放内存。但是,当两个新线程和主函数结束时,所有的Arc
指针都离开作用域,user
也会随着最后一个指针离开作用域而被释放。
Send and Sync
让我们再深入一点儿。如果你看过Arc
的文档,你应该会看到它实现了 Send 和 Sync,但前提是被Arc
包装的类型也实现了Send
和Sync
。为了理解这是什么意思以及它为何以这种方式实现,让我们从Send
和Sync
的定义开始看看。
《The Rustonomicon》对Send
和Sync
的定义如下:
- 如果一个类型能够被安全地发送到另一个线程,那么这个类型是
Send
; - 如果一个类型能够安全地在线程间共享,那么这个类型是
Sync
。(当且仅当&T 是Send
时,T 是Sync
)
可以在 《The Rustonomicon》自由阅读的有关这些 trait 的描述,我也会尝试在这里分享我的理解。Send
和Sync
是作为一类标记 trait 存在,它们不提供任何需要被实现的方法,也不需要你去实现任何东西。它们的作用在于告知编译器某个类型在线程间可以被发送和共享的特性。让我们先从看起来相对直观的Send
开始。它意味着你不能把一个!Send
(读作:非Send
)的类型发送到另一个线程。例如,你不能把它发送到 channel,也不能移动到一个线程里。以下面的代码为例,它就无法编译。
#![feature(negative_impls)]
#[derive(Debug)]
struct Foo {}
impl !Send for Foo {}
fn main() {
let foo = Foo {};
spawn(move || {
dbg!(foo);
});
}
Send
和Sync
是自动派生的,这意味着如果一个类型的所有字段都满足Send
,那么这个类型也就自动实现了Send
。这段代码使用了一个实验特性negative_impls
,其作用在于让我们显式地告诉编译器,我想要把这个类型标记为!Send
。尝试编译此代码将产生下面的错误:
`Foo` cannot be sent between threads safely
如果你创建一个 channel 然后把foo
发送至另一个线程,也会发生同样的事情。如果现在用上Arc
会怎么样呢?你或许已经猜到,这没什么用,还是会以相同的方式报错(对于!Sync
也是一样的,Arc
需要同时满足这两个)。
#![feature(negative_impls)]
#[derive(Debug)]
struct Foo {}
impl !Send for Foo {}
fn main() {
let foo = Arc::new(Foo {});
spawn(move || {
dbg!(foo);
});
}
为什么会这样呢?Arc
不是能够包装我们的类型并赋予它更多的能力么?确实如此,但Arc
无法让我们的类型变成线程安全的。我会在文章末尾给出一个更深入的例子来解释原因,但是现在,我们先继续学习如何使用这些类型把。
使用 Mutex 修改数据
现在,我们开始讨论Mutex
。Mutex
在许多语言中都被视作(互斥)信号量(Semaphores)。你创建了一个 mutex 对象,通过每次只允许一个线程访问的方式,用它保护一段特定的代码段。在 Rust 中,Mutex
更像是一个包装器(wrapper)。它让你在锁定 mutex 之后才能访问内部的值。Mutex
通常与 Arc
结合使用能够让线程间共享变量更加简单。让我们看看下面的示例:
1 use std::time::Duration;
2 use std::{thread, thread::sleep};
3 use std::sync::{Arc, Mutex};
4
5 struct User {
6 name: String
7 }
8
9 fn main() {
10 let user_original = Arc::new(Mutex::new(User { name: String::from("drogus") }));
11
12 let user = user_original.clone();
13 let t1 = thread::spawn(move || {
14 let mut locked_user = user.lock().unwrap();
15 locked_user.name = String::from("piotr");
16 // after locked_user goes out of scope, mutex will be unlocked again,
17 // but you can also explicitly unlock it with:
18 // drop(locked_user);
19 });
20
21 let user = user_original.clone();
22 let t2 = thread::spawn(move || {
23 sleep(Duration::from_millis(10));
24
25 // it will print: Hello piotr
26 println!("Hello {}", user.lock().unwrap().name);
27 });
28
29 t1.join().unwrap();
30 t2.join().unwrap();
31}
让我们看看这段代码。在 main()函数的第一行,我们创建了一个User
结构体的实例并用Mutex
和Arc
对其进行包装。通过Arc
,我们可以轻松克隆指针并在线程间共享Mutex
。在第 14 行,你可以看到 mutex 被锁定并且在那之后底层的值可以被这个线程独占地使用。接着,我们在下一行代码对它的值进行修改。当离开作用域或我们手动调用drop(locked_user)
时,mutex 被解除锁定。
在第二个线程中,我们等待 10ms 并打印出name
。打印出的name
应该是在第一个线程中被更新后的内容。这一次 lock 操作在同一行语句完成,所以在这条语句结束后,mutex 就会被解除锁定。
值得注意的是,我们在调用lock()
后紧接着调用的unwrap()
方法。标准库中的Mutex
有一个被污染(poisoned)的概念。如果一个线程在 mutex 被锁定时 panic 了,我们无法确认Mutex
中的值是否仍然有效,因此默认采取的行为是返回一个 error 而不是 guard。所以,Mutex
要么返回一个装有值的Ok()
变量或者一个 error。可以从文档获取更多的信息。一般而言,并不推荐在生产代码中使用unwrap()
方法,但对于Mutex
而言却是一个有效的方式——如果一个 Mutex
已经被污染了,我们可能判断出程序状态是无效的,并使程序崩溃。
另一个关于Mutex
有趣的事情是,只要 Mutex
中的类型是Send
,Mutex
就会是Sync
。这是因为Mutex
确保只有一线程可以访问到内部的值,因此在线程间共享Mutex
是安全的。
Mutex: add Sync to a Send type
可能你还记得文章开头说过,Arc
需要底层的类型是Send
+Sync
,才能让Arc
也是Send
+Sync
。Mutex
只需要底层的类型是Send
,就能够让 Mutex
是Send
+Sync
。换句话说,Mutex
将会使一个!Sync
的类型Sync
,因此,你可以在线程间共享和修改它。
Mutex without Arc?
你可能会问,Mutex 是否可以不与Arc
一起而单独使用。我建议你在继续阅读之前,自己先思考一下:对于一个Send
类型的 Mutex 满足Send
+Sync
,这意味着什么?
如果你回到文章的第一部分,你可以看到它对 Arc
类型意味着什么,对于 Mutex
也意味着类似的事情。如果我们可以用类似 scope threads 的方式,完全可以脱离Arc
单独使用Mutex
:
use crossbeam::scope;
use std::{sync::Mutex, thread::sleep, time::Duration};
#[derive(Debug)]
struct User {
name: String,
}
fn main() {
let user = Mutex::new(User {
name: "drogus".to_string(),
});
scope(|s| {
s.spawn(|_| {
user.lock().unwrap().name = String::from("piotr");
});
s.spawn(|_| {
sleep(Duration::from_millis(10));
// should print: Hello piotr
println!("Hello {}", user.lock().unwrap().name);
});
})
.unwrap();
}
在这段程序中,我们实现了相同的目标。我们正在两个独立线程中访问Mutex
所包装的值,但是通过引用共享Mutex
而没有使用Arc
。但这并非总是可行的,比如在异步代码中,因此Mutex
总是搭配Arc
使用。
总结
我希望这篇文章能够帮助你理解 Rust 中的Arc
和Mutex
类型是什么?概括来讲,当你想在线程之间共享数据时,你通常会使用 Arc,而使用普通的引用是无法做到这一点的。当你需要修改在线程间共享的数据时,你还需要使用Mutex
。当你想要修改线程间共享的数据,并且无法通过使用引用来共享一个Mutex
时,你应该使用Arc<Mutex<...>>
。
Bonus: 为什么 Arc 需要类型是 Sync
现在,让我们回到这样一个问题:为什么Arc
需要底层的类型同时满足Send
和Sync
才能让自己是Send
和Sync
?这部分内容可以自由选择是否阅读,毕竟可能你的代码也不一定会用到Arc
和Mutex
。不过,它对于你理解标记 trait 可能有所帮助。
让我们以 Cell
为例。Cell
包装某个类型并使其具有内部可变性,或换句话说,它让我们可以在一个不可变的结构体上修改其内部的某个值。Cell
是Send
,但它也是!Sync
。
使用 Cell
的例子如下:
use std::cell::Cell;
struct User {
age: Cell<usize>
}
fn main() {
let user = User { age: Cell::new(30) };
user.age.set(36);
// will print: Age: 36
println!("Age: {}", user.age.get());
}
Cell
在某些情况下是有用的,但它不是线程安全的,也就是说它是!Sync
。如果你想在多个线程间共享一个由Cell
包装的值,这可能会让两个线程修改同一处内存,例如:
// this example will not compile, `Cell` is `!Sync` and thus
// `Arc` will be `!Sync` and `!Send`
use std::cell::Cell;
struct User {
age: Cell<usize>
}
fn main() {
let user_original = Arc::new(User { age: Cell::new(30) });
let user = user_original.clone();
std::thread::spawn(move || {
user.age.set(2);
});
let user = user_original.clone();
std::thread::spawn(move || {
user.age.set(3);
});
}
如果这段代码运行,它可能会导致未定义行为。这就是为什么Arc
无法与!Send
或!Sync
的类型一起工作。同时,Cell
是 Send
,这意味着它可以在线程间被发送。为什么?发送,或换个词-移动,不会让一个值被多个线程访问,它必须只能被一个线程拥有。一旦你把它移动到另一个线程,之前的线程就不再拥有这个值。基于这一点,我们总是在局部(local)修改一个 Cell
。
Bonus: 为什么 Arc 需要类型
到这里,你可能也会想,为什么Arc
不会为一个!Send
类型提供Send
特性。在 Rust 中,有一种类型是!Send
,它就是Rc
。Rc 是 Arc 的胞妹,但它不是原子的,Rc 仅用作引用计数器。它的作用与 Arc 基本相同,但只能在单线程中使用。它既不能在线程之间共享,也不能在线程之间移动。让我们来看看为什么
// this code won't compile, Rc is !Send and !Sync
use std::rc::Rc;
fn main() {
let foo = Rc::new(1);
let foo_clone = foo.clone();
std::thread::spawn(move || {
dbg!(foo_clone);
});
let foo_clone = foo.clone();
std::thread::spawn(move || {
dbg!(foo_clone);
});
}
这段代码不会被编译,因为Rc
是!Sync
+!Send
。它内部的计数器不是原子的,因此在线程间共享它会导致不准确的引用计数。现在假设Arc
能够让!Send
类型变成Send
:
use std::rc::Rc;
use std::sync::Arc;
#[derive(Debug)]
struct User {
name: Rc<String>,
}
unsafe impl Send for User {}
unsafe impl Sync for User {}
fn main() {
let foo = Arc::new(User {
name: Rc::new(String::from("drogus")),
});
let foo_clone = foo.clone();
std::thread::spawn(move || {
let name = foo_clone.name.clone();
});
let foo_clone = foo.clone();
std::thread::spawn(move || {
let name = foo_clone.name.clone();
});
}
这段代码可以编译,但它是错误的,请不要在实际代码中这样写!在这里,我定义了一个User
结构体,其内部有个Rc
类型的成员。因为Send
和Sync
是自动派生的,并且 Rc 是!Send
+!Sync
,所以 User 结构体也应该是!Send
+!Sync
。但是我们可以显式地告诉编译器把它标记为其他,在这段代码中使用不安全的 impl 语法标记为了Send
+Sync
。
现在你可以清楚地看到,如果Arc
允许!Send
类型在线程间发送,会出什么问题了。在这个例子中,Arc
的克隆被移动到不同的线程,然后没有任何东西阻止我们克隆 Rc 类型。并且因为 Rc 不是线程安全的,它可能会导致不准确的引用计数进而导致内存中的对象被过早释放或永不释放。
我知道这篇文章很长,所以要为看到这里的人点赞,谢谢!
转载自:https://juejin.cn/post/7104936990788812807