likes
comments
collection
share

Rust原子和锁:第一章、Rust并发基础

作者站长头像
站长
· 阅读数 18

Rust原子和锁:第一章、Rust并发基础 早在多核处理器普及之前,操作系统就允许一台计算机同时运行多个程序。这是通过在进程之间快速切换来实现的,允许每个进程一个接一个地重复运行。如今,几乎我们所有的电脑,甚至我们的手机和手表都有多核处理器,这是真正地并行执行多个进程。

操作系统尽可能地将进程彼此隔离,允许程序在完全不知道其他进程在做什么的情况下做自己的事情。例如,如果不事先询问操作系统内核,一个进程通常不能访问另一个进程的内存,或以任何方式与其通信。

然而,作为同一进程的一部分,程序可以产生额外的执行线程。同一进程内的线程并不是相互隔离的。线程之间共享内存,并可以通过共享内存相互交互。

本章将解释Rust中线程是如何生成的,以及围绕线程的所有基本概念,比如如何在多个线程之间安全共享数据。本章解释的概念是本书其余部分的基础。

1.1、Rust中的线程****

每个程序都从主线程开始。主线程将执行main函数,如果需要的话,可以用来生成更多的线程。

在Rust中,使用标准库中的std::thread::spawn函数来生成新线程。它只有一个参数:新线程将要执行的函数。一旦这个函数返回,线程就会停止。

让我们来看一个例子:

use std::thread;

 

fn main() {

    thread::spawn(f);

    thread::spawn(f);

    println!("Hello from the main thread.");

}

 

fn f() {

    println!("Hello from another thread!");

    let id = thread::current().id();

    println!("This is my thread id: {id:?}");

}

我们生成两个线程,它们都将f作为它们的主函数执行。这两个线程都将打印一条消息并显示它们的线程id,而主线程也将打印它自己的消息。

线程ID

Rust标准库为每个线程分配一个唯一的标识符。标识符可以通过Thread::id()访问,类型为ThreadId。ThreadId仅支持复制和比较相等操作。不能保证id是连续分配的,只能保证每个线程的id是不同的。

如果您多次运行上面的示例程序,您可能会注意到每次运行的输出是不同的。这是我在机器上运行时得到的输出:

Hello from the main thread.

Hello from another thread!

This is my thread id:

令人惊讶的是,部分输出似乎缺失了。

这里发生的情况是,新线程在完成之前,主线程已经执行完成了。

从main返回将退出整个程序,即使其他线程仍在运行。

在这个特殊的例子中,一个新生成的线程只有足够的时间来处理第二条消息的一半,然后程序就被主线程关闭了。

如果我们想要确保线程在从main返回之前已经完成,我们可以通过join来等待它们。为此,我们必须使用spawn函数的返回值JoinHandle:

fn main() {

    let t1 = thread::spawn(f);

    let t2 = thread::spawn(f);

    println!("Hello from the main thread.");

    t1.join().unwrap();

    t2.join().unwrap();

}

.join()方法等待线程完成执行,并返回std::thread::Result。如果线程因为panic而未能成功完成其功能,则该文件将包含panic消息。我们可以尝试处理这种情况,或者在join一个已经panic的线程时调用.unwrap()。

运行这个版本的程序将不再导致截断的输出:

Hello from the main thread.

Hello from another thread!

This is my thread id: ThreadId(3)

Hello from another thread!

This is my thread id: ThreadId(2)

唯一在运行之间仍然会改变的是消息打印的顺序:

Hello from the main thread.

Hello from another thread!

Hello from another thread!

This is my thread id: ThreadId(2)

This is my thread id: ThreadId(3)

输出锁

println宏使用std::io::Stdout::lock()来确保它的输出不会被中断。println!()表达式将等待任何并发运行的表达式完成后再写入任何输出。否则,我们可以得到更多的交错输出,比如:

Hello fromHello from another thread!

 another This is my threthreadHello fromthread id: ThreadId!

( the main thread.

2)This is my thread

id: ThreadId(3)

与将函数名传递给std::thread::spawn不同,更常见的是传递一个闭包。这允许我们将值值move到新线程中:

let numbers = vec![1, 2, 3];

thread::spawn(move || {

    for n in &numbers {

        println!("{n}");

    }

}).join().unwrap();

其中,numbers的所有权被move到新生成的线程中。如果没有使用move关键字,闭包将通过引用捕获numbers。这将导致编译器错误,因为新线程可能比该变量生命周期更长。

由于线程可能一直运行到程序执行的最后,spawn函数的参数类型有一个“静态生命周期”。换句话说,它只接受可能永远存在的函数。通过引用捕获局部变量的闭包可能不会永远保存,因为当局部变量不存在时,该引用将失效。

从线程中获取值是通过从闭包返回值来完成。这个返回值可以从join方法返回的Result中获得:

let numbers = Vec::from_iter(0..=1000);

let t = thread::spawn(move || {

    let len = numbers.len();

    let sum = numbers.iter().sum::();

    sum / len ①

});

let average = t.join().unwrap(); ②

println!("average: {average}");

如上,线程的闭包①返回的值通过join方法②发送回主线程。

如果numbers为空,线程在尝试除0(①)时会产生panic,而join则会返回panic消息,导致主线程也因为unwrap(②)而产生panic。

线程构建器

std::thread::spawn函数实际上只是std::thread::Builder::new().spawn().unwrap()的简写。

std::thread::Builder允许在生成新线程之前做一些设置。您可以使用它来配置新线程的堆栈大小,并为新线程命名。线程的名称可以通过std::thread::current().name()获得,将在panic消息中使用,并将在大多数平台上的监视和调试工具中可见。

此外,spawn函数返回std::io::Result,允许你处理生成新线程失败的情况。如果操作系统内存不足,或者对程序应用了资源限制,就可能发生这种情况。如果不能生成一个新线程,std::thread::spawn函数就会panic。

1 . 2、 线程作用域****

如果我们确定一个线程的生命周期不会超过某个作用域,那么该线程可以安全地借用那些不会永远存在的东西,比如局部变量,只要它们的生命周期超过这个作用域。

Rust标准库提供std::thread::scope函数来生成这样的作用域线程。它允许我们生成不能超过传递给该函数的闭包范围的线程,从而可以安全地借用局部变量。

它的工作原理最好用一个例子来说明:

let numbers = vec![1, 2, 3];

thread::scope(|s| { ①

    s.spawn(|| { ②

        println!("length: {}", numbers.len());

    });

    s.spawn(|| { ②

        for n in &numbers {

            println!("{n}");

        }

    });

}); ③

①我们使用闭包调用std::thread::scope函数。我们的闭包是直接执行的,并获得一个表示作用域的参数s。

②我们使用s来衍生线程。闭包可以借用局部变量,比如numbers。

③当作用域结束时,所有尚未join的线程都将自动join。

此模式保证作用域中生成的任何线程都不能超过作用域的生命周期。正因为如此,这个有作用域的衍生方法对其参数类型没有“静态绑定”,允许我们引用任何超出作用域的东西,比如numbers。

在上面的例子中,两个新线程同时访问numbers。这很好,因为它们(或主线程)都不会修改它。如果我们修改第一个线程来修改numbers,如下所示,编译器将不允许我们生成另一个同样使用numbers的线程:

let mut numbers = vec![1, 2, 3];

thread::scope(|s| {

    s.spawn(|| {

        numbers.push(1);

    });

    s.spawn(|| {

        numbers.push(2); // Error!

    });

});

确切的错误信息取决于Rust编译器的版本,因为它经常被改进以产生更好的诊断,但尝试编译上面的代码将导致类似这样的结果:

error[E0499]: cannot borrow numbers as mutable more than once at a time

 --> example.rs:7:13

  |

4 |     s.spawn(|| {

  |             -- first mutable borrow occurs here

5 |         numbers.push(1);

  |         ------- first borrow occurs due to use of numbers in closure

  |

7 |     s.spawn(|| {

  |             ^^ second mutable borrow occurs here

8 |         numbers.push(2);

  |         ------- second borrow occurs due to use of numbers in closure

泄漏末日****

在Rust 1.0之前,标准库有一个名为std::thread::scoped的函数,它会直接生成一个线程,就像std::thread::spawn一样。它允许非静态捕获,因为它返回一个JoinGuard,而不是JoinHandler,它在删除线程时加入线程。任何借来的数据只需要比这个JoinGuard活得更久。这似乎是安全的,只要JoinGuard在某个时刻被丢弃。

就在Rust 1.0发布之前,人们慢慢发现不可能保证某些东西会被删除。有很多方法,比如创建一个引用计数节点的循环,可以在不丢弃的情况下忘记或泄露某些内容。

最终,在一些人所说的“泄漏末日”中,得出的结论是,(安全)接口的设计不能依赖于对象在生命周期结束时总是会被丢弃的假设。泄露一个对象可能会导致泄露更多的对象(例如,泄露一个Vec也会泄露它的元素),但它可能不会导致未定义的行为。由于这个结论,std::thread::scoped不再被认为是安全的,并从标准库中删除。此外,std::mem::forget从不安全函数升级为安全函数,以强调遗忘(或泄漏)始终是可能的。

直到很久以后,在Rust 1.63中,一个新的std::thread::scope函数加入了一个新的设计,它的正确性不依赖于Drop。

1 . 3、 共享所有权和引用计数****

到目前为止,我们已经了解了如何使用move闭包将值的所有权转移到线程(“Rust中的线程”),以及如何从存活时间较长的父线程(“作用域线程”)借用数据。当两个线程之间共享数据时,其中任何一个线程都不能保证比另一个线程活得更久,它们都不能是该数据的所有者。它们之间共享的任何数据都需要和生命周期最长的线程一样长。

 

1.3.1、 静态变量 static****

有几种方法可以创建不属于单个线程的内容。最简单的是一个static值,它由整个程序“拥有”,而不是一个单独的线程。在下面的例子中,两个线程都可以访问X,但是它们都不拥有它:

static X: [i32; 3] = [1, 2, 3];

thread::spawn(|| dbg!(&X));

thread::spawn(|| dbg!(&X));

静态项有一个常量初始化项,永远不会被删除,甚至在程序的主函数开始之前就已经存在。每个线程都可以借用它,因为它保证始终存在。

1 . 3 . 2、 内存泄漏 Leaking ****

另一种分享所有权的方法是分配泄露。使用Box::leak,可以释放Box的所有权,并承诺永远不会丢弃它。从那时起,Box将永远存在,没有所有者,允许任何线程在程序运行期间借用它。

let x: &'static [i32; 3] = Box::leak(Box::new([1, 2, 3]));

thread::spawn(move || dbg!(x));

thread::spawn(move || dbg!(x));

move闭包可能使它看起来像是我们将所有权转移到线程中,但仔细观察x类型就会发现,我们只是给线程提供了一个对数据的引用。

请注意,“静态生命周期”并不意味着该值从程序开始就存在,而只意味着它存在到程序结束。过去根本无关紧要。

泄漏Box的缺点是泄漏内存。我们分配一些东西,但从不放弃和释放它。如果这种情况只发生有限的次数,那还好。但如果我们继续这样做,程序会慢慢耗尽内存。

1.3.3、 引用计数 Ref erence Counting)****

为了确保共享数据被丢弃和重新分配,我们不能完全放弃它的所有权。相反,我们可以共享所有权。通过跟踪所有者的数量,我们可以确保只有在没有所有者时才会删除值。

Rust标准库通过std::rc::Rc类型(“引用计数”的缩写)提供了此功能。它与Box非常相似,只是克隆它不会分配任何新内容,而是增加存储在所包含值旁边的计数器。原Rc和克隆Rc都将引用相同的分配:他们共享所有权。

use std::rc::Rc;

let a = Rc::new([1, 2, 3]);

let b = a.clone();

assert_eq!(a.as_ptr(), b.as_ptr()); // Same allocation!

删除一个Rc会减少计数器。只有最后一个Rc(它将看到计数器降为零)才会删除和释放所包含的数据。

然而,如果我们试图将Rc发送到另一个线程,我们将遇到以下编译器错误:

error[E0277]: Rc cannot be sent between threads safely

    |

8   |     thread::spawn(move || dbg!(b));

    |                   ^^^^^^^^^^^^^^^

事实证明,Rc不是线程安全的(在“线程安全:发送和同步”中详细介绍)。如果多个线程对相同的分配有一个Rc,它们可能会尝试同时修改引用计数器,这可能会产生不可预测的结果。

相反,我们可以使用std::sync::Arc,它代表“原子引用计数”。它与Rc相同,除了它保证对引用计数器的修改是不可分割的原子操作之外,这使得在多线程中使用它是安全的。(详见第二章。)

use std::sync::Arc;

 

let a = Arc::new([1, 2, 3]); ①

let b = a.clone(); ②

 

thread::spawn(move || dbg!(a)); ③

thread::spawn(move || dbg!(b)); ③

①我们将一个数组和一个从1开始的引用计数器放在一个新的分配中。

②克隆Arc会使引用计数增加到2,并为相同的分配提供第二个Arc。

③两个线程都有自己的Arc,通过它可以访问共享数组。两者都在丢弃Arc时递减参考计数器。最后一个丢弃Arc的线程将看到计数器降为零,并将是丢弃和释放数组的线程。

命名克隆

给每一个Arc副本取一个不同的名字会很快使代码变得非常混乱和难以理解。虽然Arc的每个克隆都是一个单独的对象,但每个克隆都表示相同的共享值,这不能通过不同的命名来很好地反映出来。

Rust允许(并鼓励)通过定义具有相同名称的新变量来隐藏变量。如果在相同的作用域中执行此操作,原始变量就不能再命名了。但是通过打开一个新的作用域,像let a = a.clone()这样的语句;可用于在该作用域内重用相同的名称,而使原始变量在作用域外可用。

通过在新的作用域中包装闭包(使用{}),我们可以在将变量移动到闭包之前克隆变量,而不必重命名它们。

let a = Arc::new([1, 2, 3]);

let b = a.clone();

thread::spawn(move || {

    dbg!(b);

});

dbg!(a)

Arc的克隆体也在同一范围内。每个线程都有自己的克隆,但名称不同。

let a = Arc::new([1, 2, 3]);

thread::spawn({

    let a = a.clone();

    move || {

        dbg!(a);

}

});

dbg!(a);

Arc的克隆体生活在另一个范围。我们可以在每个线程中使用相同的名称。

因为所有权是共享的,所以引用计数指针(Rc和Arc)与共享引用(&T)具有相同的限制。它们不允许对其包含的值进行可变访问,因为该值可能同时被其他代码借用。

例如,如果我们试图对Arc<[i32]>中的整数片进行排序,编译器会阻止我们这样做,告诉我们不允许改变数据:

error[E0596]: cannot borrow data in an Arc as mutable

  |

6 |     a.sort();

  |     ^^^^^^^^

1 . 4、 借用和数据竞争****

在Rust中,值可以通过两种方式借用:

不可变的借

使用&借用会给出一个不可变的引用。这样的引用可以被复制。对该引用的数据的访问在该引用的所有副本之间共享。顾名思义,编译器通常不允许您通过这样的引用更改某些内容,因为这可能会影响当前借用相同数据的其他代码。

可变的 借用

用&mut借用一些东西会给出一个可变引用。可变借用保证了它是该数据的唯一活动借用。这确保了改变数据不会改变其他代码当前正在查看的任何内容。

这两个概念结合在一起完全防止了数据竞争:一个线程正在改变数据,而另一个线程正在并发地访问数据。数据竞争通常是未定义的行为,这意味着编译器不需要考虑这些情况。它会简单地假设它们不会发生。

 

为了阐明这是什么意思,让我们看一个例子,编译器可以使用借用规则做出有用的假设:

fn f(a: &i32, b: &mut i32) {

    let before = *a;

    *b += 1;

    let after = *a;

    if before != after {

        x(); // never happens

    }

}

此处,我们获得一个对整数的不可变引用,并在b所引用的整数加1之前和之后存储该整数的值。编译器可以自由地假设关于借位和数据竞争的基本规则得到维护,这意味着b不可能引用与a相同的整数。事实上,整个程序中没有任何东西可以可变地借用a所指向的整数,只要a正在借用它。因此,编译器可以很容易地得出结论,*a不会改变,if语句的条件永远不会为真,并可以完全从程序中删除对x的调用作为优化。

不可能编写一个Rust程序来打破编译器的假设,除非使用一个不安全的块来禁用一些编译器的安全检查。

未定义的行为

像C、c++和Rust这样的语言有一组需要遵守的规则,以避免所谓的未定义行为。例如,Rust的规则之一是,对任何对象的可变引用都不能超过一个。

在Rust中,只有在使用不安全的代码时才有可能破坏这些规则。“不安全”并不意味着代码是不正确的或永远不能安全使用,而是编译器没有为您验证代码是安全的。如果代码违反了这些规则,它就被称为不健全的。

编译器可以假定这些规则从未被破坏,而不需要检查。一旦被破坏,就会导致所谓的未定义行为,我们需要不惜一切代价避免这种行为。如果我们允许编译器做出一个实际上不正确的假设,它很容易就会对代码的不同部分得出更多错误的结论,从而影响整个程序。

作为一个具体的例子,让我们看一个在片上使用get_unchecked方法的小片段:

let a = [123, 456, 789];

let b = unsafe { a.get_unchecked(index) };

get_unchecked方法给我们一个给定索引的片元素,就像[index]一样,但是允许编译器假设索引总是在边界内,而不需要进行任何检查。

这意味着在这个代码片段中,由于a的长度为3,编译器可能会假设index小于3。我们要确保它的假设成立。

如果我们打破这个假设,例如,如果我们在index = 3的情况下运行,任何事情都可能发生。这可能会导致从内存中读取a后面字节中存储的任何内容。这可能会导致程序崩溃。它可能最终执行程序中完全不相关的部分。它会造成各种各样的破坏。

也许令人惊讶的是,未定义的行为甚至可以“回到过去”,导致之前的代码出现问题。为了理解这是如何发生的,想象我们在前面的代码片段之前有一个match语句,如下所示:

match index {

   0 => x(),

   1 => y(),

   _ => z(index),

}

let a = [123, 456, 789];

let b = unsafe { a.get_unchecked(index) };

由于不安全的代码,编译器被允许假设index只为0、1或2。逻辑上可以得出这样的结论:match语句的最后一条只匹配2,因此z只被称为z(2)。这个结论不仅可以用来优化匹配,还可以用来优化z本身。这包括丢弃代码中未使用的部分。

如果我们使用索引3执行这个语句,程序可能会尝试执行已经优化掉的部分,从而导致完全不可预测的行为,远在我们到达最后一行的不安全块之前。就像这样,未定义的行为可以在整个程序中以非常意想不到的方式向后和向前传播。

在调用任何不安全的函数时,请仔细阅读其文档,并确保完全理解其安全需求:作为调用者,您需要维护的假设,以避免未定义的行为。

1.5、 内部可变性****

上一节中介绍的借用规则很简单,但可能相当有限——特别是在涉及多个线程时。遵循这些规则使得线程之间的通信非常有限,几乎不可能,因为多个线程访问的数据不可能被改变。

幸运的是,这里有一个逃生口:内部可变性。具有内部可变性的数据类型略微改变了借用规则。在某些条件下,这些类型允许通过“不可变”引用进行突变。

 

在“引用计数”中,我们已经看到了一个涉及内部不可变的微妙例子。Rc和Arc都改变了一个参考计数器,即使可能有多个克隆都使用相同的参考计数器。

一旦涉及到内部可变类型,将引用称为“不可变的”或“可变的”就会变得混乱和不准确,因为有些东西可以通过这两种方式发生变化。更准确的术语是“共享”和“独占”:共享引用(&T)可以复制并与他人共享,而独占引用(&mut T)保证它是唯一的独占借用T。对于大多数类型,共享引用不允许突变,但也有例外。因为在这本书中我们将主要使用这些例外,所以我们将在本书的其余部分使用更准确的术语。

让我们来看看一些具有内部可变性的类型,以及它们如何允许通过共享引用进行变异而不会导致未定义的行为。

1.5.1、 Cell****

std::cell:: cell 只是简单地包装了一个T,但是允许通过共享引用进行突变。为了避免未定义的行为,它只允许您将值复制出来(如果T是copy),或将其替换为另一个整体值。此外,它只能在单个线程中使用。

让我们看一个类似于前一节的例子,但这次使用Cell而不是i32:

use std::cell::Cell;

fn f(a: &Cell, b: &Cell) {

    let before = a.get();

    b.set(b.get() + 1);

    let after = a.get();

    if before != after {

        x(); // might happen

    }

}

与上次不同的是,现在if条件可以为真。因为Cell具有内部可变性,只要我们有一个对它的共享引用,编译器就不能再假设它的值不会改变。a和b可能指向相同的值,因此通过b的突变也可能影响a。但是,它仍然可以假设没有其他线程并发地访问单元格。

Cell上的限制并不总是容易处理。由于它不能直接让我们借用它所保存的值,我们需要将一个值移出(在其位置上留下一些东西),修改它,然后将它放回去,以改变它的内容:

fn f(v: &Cell<Vec>) {

    let mut v2 = v.take(); // Replaces the contents of the Cell with an empty Vec

    v2.push(1);

    v.set(v2); // Put the modified Vec back

}

1.5.2、 RefCell****

与常规的Cell不同,std:: Cell::RefCell允许您以很小的运行时成本借用其内容。RefCell不仅包含一个T,还包含一个计数器,用于跟踪任何未偿还的借位。如果您试图在它已经被可变地借用时借用它(反之亦然),它会恐慌,这避免了未定义的行为。和Cell一样,RefCell只能在单个线程中使用。

通过调用borrow或borrow_mut来借用RefCell的内容:

use std::cell::RefCell;

 

fn f(v: &RefCell<Vec>) {

    v.borrow_mut().push(1); // We can modify the Vec directly.

}

虽然Cell和RefCell非常有用,但当我们需要处理多线程时,它们变得相当无用。因此,让我们继续讨论与并发性相关的类型。

1.5.3、 MutexRwLock****

RwLock或读写锁是RefCell的并发版本。RwLock保存一个T并跟踪任何未偿还的借位。然而,与RefCell不同的是,它不会对冲突的借位感到恐慌。相反,它会阻塞当前线程——使其进入睡眠状态——同时等待冲突的借位消失。在其他线程处理完数据之后,我们只需要耐心地等待轮到我们处理数据。

借用RwLock的内容称为锁定。通过锁定它,我们可以暂时阻止并发冲突的借用,允许我们在不引起数据竞争的情况下借用它。

互斥锁非常类似,但概念上稍微简单一些。它不像RwLock那样跟踪共享借用和独占借用的数量,它只允许独占借用。

 

我们将在“锁定:互斥锁和rwlock”中更详细地介绍这些类型。

1.5.4、 Atomics****

原子类型表示Cell的并发版本,是第2章和第3章的主要主题。像Cell一样,它们通过让我们整体地复制值来避免未定义的行为,而不让我们直接借用内容。

但是,与Cell不同的是,它们不能有任意大小。因此,对于任何T都没有通用的Atomic类型,而只有特定的原子类型,如AtomicU32和AtomicPtr。哪些是可用的取决于平台,因为它们需要处理器的支持以避免数据竞争。(我们将在第7章深入讨论。)

由于它们的大小非常有限,原子通常不直接包含需要在线程之间共享的信息。相反,它们通常被用作在线程之间共享其他(通常是更大的)内容的工具。当原子操作被用来描述其他数据时,事情会变得异常复杂。

1.5.5、 UnsafeCell****

UnsafeCell是内部可变性的基本构件。

UnsafeCell包装了一个T,但没有附带任何避免未定义行为的条件或限制。相反,它的get()方法只是给出一个原始指针,指向它所换行的值,这只能在不安全的块中有意义地使用。它让用户以一种不会导致任何未定义行为的方式使用它。

最常见的情况是,不直接使用UnsafeCell,而是包装在另一种类型中,通过有限的接口(如Cell或Mutex)提供安全性。所有具有内部可变性的类型(包括上面讨论的所有类型)都构建在UnsafeCell之上。

1 . 6、 线程安全性 SendSync****

在本章中,我们已经看到了一些不线程安全的类型,这些类型只能在单个线程上使用,例如Rc、Cell等。由于需要这种限制来避免未定义的行为,所以编译器需要为你理解和检查它,这样你就可以使用这些类型,而不必使用不安全的块。

该语言使用两个特殊的特征来跟踪哪些类型可以安全地跨线程使用:

发送

如果一个类型可以被发送到另一个线程,那么它就是Send。换句话说,如果该类型值的所有权可以转移到另一个线程。例如,Arc为发送,Rc为不发送。

同步

如果一个类型可以与另一个线程共享,则该类型为Sync。换句话说,类型T是同步的当且仅当对该类型&T的共享引用是发送时。例如,i32是Sync,但Cell不是。(但是,的Cell是发送。)

所有基本类型,如i32、bool和str都是Send和Sync。

这两个特征都是自动特征,这意味着它们是基于类型的字段自动实现的。一个具有全部为Send和Sync字段的结构体本身也是Send和Sync。

选择不使用这两种方法的方法是向类型中添加一个不实现trait的字段。为此,特殊的std::marker::PhantomData类型通常会派上用场。该类型被编译器视为T,但它在运行时实际上并不存在。这是零大小类型,不占地方。

让我们来看看下面的结构体:

use std::marker::PhantomData;

struct X {

    handle: i32,

    _not_sync: PhantomData<Cell<()>>,

}

在这个例子中,如果句柄是它唯一的字段,X将同时是发送和同步。但是,我们添加了一个零大小的PhantomData<Cell<()>>字段,它被视为Cell<()>。由于Cell<()>不是Sync, x也不是。但是,它仍然是Send,因为它的所有字段都实现了Send。

原始指针(T和mut T)既不是发送指针也不是同步指针,因为编译器不知道它们代表什么。

选择任何一种特质的方法与选择其他特质的方法是一样的;使用impl块来实现你的类型的trait:

struct X {

    p: *mut i32,

}

unsafe impl Send for X {}

unsafe impl Sync for X {}

注意实现这些特征需要不安全关键字,因为编译器不能为你检查它是否正确。这是你对编译器做出的承诺,它必须信任它。

 

如果你试图移动一些东西到另一个线程,而不是发送,编译器会礼貌地阻止你这样做。下面是一个小例子来证明这一点:

fn main() {

    let a = Rc::new(123);

    thread::spawn(move || { // Error!

        dbg!(a);

    });

}

在这里,我们尝试发送一个Rc到一个新线程,但是Rc,不像Arc,没有实现发送。

如果我们尝试编译上面的例子,我们会遇到一个类似这样的错误:

error[E0277]: Rc<i32> cannot be sent between threads safely

   --> src/main.rs:3:5

    |

3   |     thread::spawn(move || {

    |     ^^^^^^^^^^^^^ Rc<i32> cannot be sent between threads safely

    |

    = help: within [closure], the trait Send is not implemented for Rc<i32>

note: required because it's used within this closure

   --> src/main.rs:3:19

    |

3   |     thread::spawn(move || {

    |                   ^^^^^^^

note: required by a bound in spawn

thread::spawn函数要求其参数为Send,闭包只有在所有捕获都为Send时才为Send。如果我们试图捕获不是发送的东西,我们的错误就会被捕获,从而保护我们不受未定义行为的影响。

1 . 7、 互斥锁和rwlock****

在线程之间共享(可变)数据最常用的工具是互斥,它是“互斥”的缩写。互斥锁的作用是通过暂时阻止同时试图访问某些数据的其他线程来确保线程对某些数据具有独占访问权。

从概念上讲,互斥锁只有两种状态:锁定和未锁定。当线程锁定一个未锁定的互斥锁时,该互斥锁被标记为锁定,线程可以立即继续。当一个线程试图锁定一个已经锁定的互斥锁时,该操作将被阻塞。线程在等待互斥锁被解锁时进入睡眠状态。解锁只能在锁定的互斥锁上进行,并且应该由锁定互斥锁的线程完成。如果其他线程正在等待锁定互斥锁,解锁将导致其中一个线程被唤醒,因此它可以尝试再次锁定互斥锁并继续其进程。

使用互斥锁保护数据只是所有线程之间的协议,即它们只在锁定互斥锁时才访问数据。这样,就不会有两个线程同时访问该数据,从而导致数据竞争。

1.7.1、 Rust的Mutex****

Rust标准库通过std::sync::Mutex提供了这个功能。它是T类型的泛型,T类型是互斥锁保护的数据类型。通过使这个T成为互斥锁的一部分,数据只能通过互斥锁访问,从而允许一个安全的接口,可以保证所有线程都遵守协议。

为了确保锁定的互斥锁只能由锁定它的线程解锁,它没有unlock( )方法。相反,它的lock()方法返回一种称为MutexGuard的特殊类型。这个守卫表示我们已经锁定互斥锁的保证。它的行为就像一个通过DerefMut特性的独占引用,使我们能够独占访问互斥锁保护的数据。解锁互斥锁是通过释放守卫来完成的。当我们删除守卫时,我们就放弃了访问数据的能力,守卫的drop实现将解锁互斥锁。

让我们看一个例子来看看互斥量在实践中的应用:

use std::sync::Mutex;

 

fn main() {

    let n = Mutex::new(0);

    thread::scope(|s| {

        for _ in 0..10 {

            s.spawn(|| {

                let mut guard = n.lock().unwrap();

                for _ in 0..100 {

                    *guard += 1;

                }

            });

        }

    });

    assert_eq!(n.into_inner().unwrap(), 1000);

}

在这里,我们有一个互斥量,一个保护整数的互斥量,我们生成了10个线程,每个线程将这个整数增加100次。每个线程将首先锁定互斥量以获得一个MutexGuard,然后使用该保护来访问整数并修改它。当变量超出作用域时,该保护将隐式地立即删除。

线程完成后,我们可以通过into_inner( )安全地从整数中移除保护。into_inner方法获得了互斥量的所有权,这保证了其他任何东西都不能再引用这个互斥量,从而使锁定变得不必要。

即使增量是以1为单位发生的,但观察这个整数的线程只能看到100的倍数,因为它只有在互斥锁被解锁时才能看到这个整数。实际上,多亏了互斥,这100个增量加在一起现在是一个不可分割的原子操作。

为了清楚地看到互斥锁的效果,我们可以让每个线程在解锁互斥锁之前等待一秒钟:

use std::time::Duration;

 

fn main() {

    let n = Mutex::new(0);

    thread::scope(|s| {

        for _ in 0..10 {

            s.spawn(|| {

                let mut guard = n.lock().unwrap();

                for _ in 0..100 {

                    *guard += 1;

                }

                thread::sleep(Duration::from_secs(1)); // New!

            });

        }

    });

    assert_eq!(n.into_inner().unwrap(), 1000);

}

当您现在运行该程序时,您将看到它需要大约10秒钟才能完成。每个线程只等待一秒钟,但是互斥锁确保每次只有一个线程可以等待。

如果我们在睡觉前一秒解除守卫,并因此解锁互斥锁,我们将看到它平行发生:

fn main() {

    let n = Mutex::new(0);

    thread::scope(|s| {

        for _ in 0..10 {

            s.spawn(|| {

                let mut guard = n.lock().unwrap();

                for _ in 0..100 {

                    *guard += 1;

                }

                drop(guard); // New: drop the guard before sleeping!

                thread::sleep(Duration::from_secs(1));

            });

        }

    });

    assert_eq!(n.into_inner().unwrap(), 1000);

}

通过这个更改,这个程序只需要大约一秒钟,因为现在10个线程可以同时执行它们的一秒钟睡眠。这说明了尽可能缩短互斥锁的锁定时间的重要性。将互斥锁锁得比必要的时间更长会完全抵消并行的任何好处,有效地迫使所有事情都以串行方式发生。

1 . 7 . 2、锁中毒****

上面例子中的unwrap( )调用与锁中毒有关。

Rust中的互斥锁在线程持有锁时出现panic被标记为中毒。当这种情况发生时,互斥锁将不再被锁定,但是调用它的锁方法将导致一个Err,表明它已经中毒。

 

这是一种防止将受互斥锁保护的数据置于不一致状态的机制。在我们上面的例子中,如果一个线程在整数增加少于100次后panic,互锁将被解锁,整数将处于一个意外的状态,它不再是100的倍数,这可能会破坏其他线程所做的假设。在这种情况下,自动将互斥锁标记为有毒会迫使用户处理这种可能性。

在一个中毒的互斥锁上调用lock( )仍然会锁定该互斥锁。lock( )返回的Err包含MutexGuard,允许我们在必要时纠正不一致的状态。

虽然锁中毒似乎是一种强大的机制,但在实践中,从潜在的不一致状态中恢复并不经常。大多数代码要么无视中毒,要么在锁中毒时使用unwrap( )来引发panic,从而有效地将恐慌传播给互斥锁的所有用户。

MutexGuard的生命周期

虽然隐式地删除一个守卫可以很方便地解锁互斥锁,但有时也会导致一些微妙的意外。如果我们用let语句为守卫分配一个名称(就像上面的例子一样),可以相对直接地看到它何时会被删除,因为局部变量是在定义它们的作用域的末尾被删除的。不过,不显式地删除保护可能导致互斥锁的锁定时间超过必要的时间,如上面的示例所示。

使用守卫而不给它指定名字也是可能的,而且有时非常方便。由于MutexGuard的行为类似于对受保护数据的独占引用,因此我们可以直接使用它,而不需要首先为保护分配名称。例如,如果你有一个互斥量<Vec>,你可以在一条语句中锁定互斥量,将一个项目推入Vec,然后再次解锁互斥量:

list.lock().unwrap().push(1);

在更大的表达式中产生的任何临时变量,比如lock( )返回的guard,都将在语句的末尾被删除。虽然这似乎是显而易见和合理的,但它会导致一个常见的陷阱,通常涉及匹配、if let或While let语句。这里有一个例子就遇到了这个陷阱:

if let Some(item) = list.lock().unwrap().pop() {

    process_item(item);

}

如果我们的意图是锁定列表,弹出一个项目,解锁列表,然后在列表解锁后处理项目,那么我们在这里犯了一个微妙但重要的错误。直到整个if let语句结束,临时保护才会被删除,这意味着我们在处理项目时不必要地持有锁。

 

也许令人惊讶的是,对于类似的if语句,这不会发生,比如下面的例子:

if list.lock().unwrap().pop() == Some(1) {

    do_something();

}

在这里,临时守卫确实在if语句体执行之前被删除。原因是常规if语句的条件总是一个普通布尔值,它不能借用任何东西。没有理由将临时对象的生命周期从条件延长到语句结束。然而,对于if let语句,情况可能不是这样。例如,如果我们使用front( )而不是pop( ),则item将从列表中借用,因此有必要保留警卫。由于借用检查器实际上只是一个检查,并不影响数据删除的时间或顺序,因此使用pop( )时也会发生同样的情况,尽管这是不必要的。

我们可以通过将pop操作移动到单独的let语句来避免这种情况。然后在语句的末尾,在if let之前,删除守卫:

let item = list.lock().unwrap().pop();

if let Some(item) = item {

    process_item(item);

}

1 . 7 . 3、 - 写锁****

互斥锁只与独占访问有关。MutexGuard将为我们提供对受保护数据的独占引用(&mut),即使我们只想查看数据,共享引用(&T)就足够了。

读写锁是互斥锁的一种稍微复杂一点的版本,它能够理解独占访问和共享访问之间的区别,并且两者都可以提供。它有三种状态:未解锁、由单个写入器锁定(用于独占访问)和由任意数量的读取器锁定(用于共享访问)。它通常用于经常由多个线程读取,但只在一段时间内更新一次的数据。

Rust标准库通过std::sync::RwLock类型提供这种锁。它的工作原理类似于标准互斥锁,除了它的接口主要分为两部分。它没有一个lock()方法,而是有一个read()和write()方法,用于锁定读取器或写入器。它有两种保护类型,一种用于读取器,另一种用于写入器:RwLockReadGuard和RwLockWriteGuard。前者只实现了Deref,使其行为类似于对受保护数据的共享引用,而后者也实现了DerefMut,使其行为类似于独占引用。

它实际上是RefCell的多线程版本,动态跟踪引用的数量,以确保支持借用规则。

 

Mutex和RwLock都需要发送T,因为它们可以用来向另一个线程发送T。RwLock还需要T实现Sync,因为它允许多个线程持有对受保护数据的共享引用(&T)。(严格地说,您可以为一个不满足这些要求的T创建一个锁,但是您不能在线程之间共享它,因为锁本身不会实现Sync。)

Rust标准库只提供了一种通用的RwLock类型,但它的实现取决于操作系统。读写锁实现之间有许多微妙的变化。大多数实现在有写入器等待时将阻塞新的读取器,即使锁已经被读锁定。这样做是为了防止写入器饥饿,即许多读取器共同阻止锁被解锁,不允许任何写入器更新数据。

其他语言中的 Mute x

Rust的标准互斥锁和RwLock类型与其他语言(如C或C++)中的互斥锁和RwLock类型略有不同。

最大的区别是Rust的Mutex包含了它所保护的数据。例如,在C++中,std::mutex不包含它所保护的数据,它甚至不知道它在保护什么。这意味着用户有责任记住哪些数据受到保护,哪些互斥锁受到保护,并确保在每次访问“受保护的”数据时锁定正确的互斥锁。在阅读其他语言中涉及互斥对象的代码时,或者在与不熟悉Rust的程序员交流时,记住这一点很有用。Rust程序员可能会说“互斥锁中的数据”,或者说“把它包装在一个互斥锁中”,这可能会让那些只熟悉其他语言中的互斥锁的人感到困惑。

如果你真的需要一个不包含任何内容的独立互斥,例如保护一些外部硬件,你可以使用mutex <()>。但即使在这样的情况下,最好还是定义一个(可能是零大小的)类型来与该硬件接口,并将其包装在一个互斥锁中。这样,在您可以与硬件交互之前,您仍然被迫锁定互斥量。

1.8、等待 阻塞 和条件变量****

当数据被多个线程改变时,在许多情况下,它们需要等待一些事件,等待关于数据的某些条件变为真实。例如,如果我们有一个互斥锁保护一个Vec,我们可能想要等到它包含任何东西。

虽然互斥锁允许线程等待,直到它被解锁,但它不提供等待任何其他条件的功能。如果我们只有一个互斥锁,我们必须一直锁定互斥锁来反复检查Vec中是否还有任何东西。

1 . 8 . 1、线程 阻塞****

等待来自另一个线程的通知的一种方法称为线程停放。线程可以自行停放,使其进入睡眠状态,从而停止消耗任何CPU周期。然后,另一个线程可以解除停置的线程,将其从午睡中唤醒。

线程停放可以通过std:: Thread::park()函数实现。要取消停放,可以在表示要取消停放的线程的Thread对象上调用unpark()方法。这样的对象可以从spawn返回的连接句柄中获得,也可以通过std::thread::current()由线程本身获得。

让我们深入研究一个使用互斥锁在两个线程之间共享队列的示例。在下面的示例中,新生成的线程将使用队列中的项,而主线程将每秒钟向队列中插入一个新项。线程驻留用于在队列为空时使消耗线程等待。

use std::collections::VecDeque;

fn main() {

    let queue = Mutex::new(VecDeque::new());

    thread::scope(|s| {

        // Consuming thread

        let t = s.spawn(|| loop {

            let item = queue.lock().unwrap().pop_front();

            if let Some(item) = item {

                dbg!(item);

            } else {

                thread::park();

            }

        });

 

        // Producing thread

        for i in 0.. {

            queue.lock().unwrap().push_back(i);

            t.thread().unpark();

            thread::sleep(Duration::from_secs(1));

        }

    });

}

消费线程运行一个无限循环,在这个循环中,它从队列中弹出条目,并使用dbg宏显示它们。当队列为空时,它将使用park()函数停止并进入睡眠状态。如果它被解除停放,则park()调用返回,循环继续,再次从队列中弹出项目,直到队列为空。等等......

生成线程通过将数字推入队列,每秒钟生成一个新数字。每次添加一个项时,它都会在引用消费线程的Thread对象上使用unpark()方法来取消它。这样,消费线程就会被唤醒来处理新元素。

这里要做的一个重要观察是,如果我们去掉停车,这个程序理论上仍然是正确的,尽管效率很低。这很重要,因为park()并不保证它只会因为匹配的unpark()而返回。虽然有点罕见,但它可能会有虚假的唤醒。我们的例子很好地处理了这个问题,因为消费线程将锁定队列,看到它是空的,然后直接解锁并再次停放它自己。

线程暂停的一个重要属性是,在线程暂停之前调用unpark()不会丢失。取消停放的请求仍然被记录,下一次线程尝试停放自己时,它将清除该请求并直接继续,而不实际进入睡眠状态。为了了解为什么这对于正确的操作是至关重要的,让我们通过两个线程执行的步骤的可能顺序:

1、消费线程(我们称它为C)锁定队列。

2、C试图从队列中弹出一个项目,但它是空的,导致None。

3、C解锁队列。

4、生成线程(我们称之为P)锁定队列。

5、P将一个新项推入队列。

6、P再次解锁队列。

7、P调用unpark()通知C有新项。

8、C调用park()进入睡眠状态,等待更多的项目。

虽然在第3步释放队列和第8步停止队列之间很可能只有很短的时间,但第4步到第7步可能发生在线程自己停止之前。如果unpark()在线程未停放时不执行任何操作,则通知将丢失。即使队列中有项,消费线程仍将等待。由于将unpark请求保存为将来对park()的调用,我们不必担心这个问题。

然而,unpark请求不会堆积起来。调用unpark()两次,然后再调用park()两次,仍然会导致线程进入睡眠状态。第一个park()清除请求并直接返回,但第二个像往常一样进入睡眠状态。

这意味着,在上面的示例中,重要的是,我们只在看到队列为空时才暂停线程,而不是在每个已处理的项之后都暂停线程。虽然在本例中,由于睡眠时间太长(1秒),这种情况极不可能发生,但多个unpark()调用可能只唤醒一个park()调用。

不幸的是,这确实意味着如果unpark()在park()返回之后被调用,但在队列被锁定和清空之前,unpark()调用是不必要的,但仍然会导致下一个park()调用立即返回。这会导致(空)队列被锁定和解锁的时间延长。虽然这不会影响程序的正确性,但会影响程序的效率和性能。

这种机制在简单的情况下工作得很好,比如我们的例子,但当事情变得更复杂时,它很快就失效了。例如,如果我们有多个消费者线程从同一个队列中获取项目,那么生产者线程将无法知道哪个消费者实际上正在等待并且应该被唤醒。生产者必须确切地知道消费者在等待什么时间,以及它在等待什么条件。

1 . 8 . 2、 条件变量****

条件变量是一种更常用的选项,用于等待受互斥锁保护的数据发生变化。它们有两个基本操作:等待和通知。线程可以等待一个条件变量,在此之后,当另一个线程通知相同的条件变量时,它们可以被唤醒。多个线程可以等待同一个条件变量,通知可以发送给一个等待线程,也可以发送给所有等待线程。

这意味着我们可以为我们感兴趣的特定事件或条件创建一个条件变量,例如队列非空,并等待该条件。任何导致该事件或条件发生的线程都会通知条件变量,而不需要知道哪些或有多少线程对该通知感兴趣。

为了避免在解锁互斥锁和等待条件变量之间的短暂时间内丢失通知,条件变量提供了一种自动解锁互斥锁并开始等待的方法。这意味着不可能出现通知丢失的情况。

Rust标准库提供了一个条件变量std::sync::Condvar。它的wait方法接受一个MutexGuard来证明我们已经锁定了互斥锁。它首先解锁互斥锁并进入睡眠状态。之后,当被唤醒时,它重新锁定互斥量并返回一个新的MutexGuard(这证明互斥量再次被锁定)。

它有两个通知函数:notify_one只唤醒一个等待线程(如果有的话),notify_all唤醒所有线程。

让我们修改一下线程停放的例子,改为使用Condvar:

use std::sync::Condvar;

let queue = Mutex::new(VecDeque::new());

let not_empty = Condvar::new();

 

thread::scope(|s| {

    s.spawn(|| {

        loop {

            let mut q = queue.lock().unwrap();

            let item = loop {

                if let Some(item) = q.pop_front() {

                    break item;

                } else {

                    q = not_empty.wait(q).unwrap();

                }

            };

            drop(q);

            dbg!(item);

        }

    });

 

    for i in 0.. {

        queue.lock().unwrap().push_back(i);

        not_empty.notify_one();

        thread::sleep(Duration::from_secs(1));

    }

});

我们必须做出一些改变:

l 我们现在不仅有一个包含队列的Mutex,还有一个Condvar来传达“非空”条件。

l 我们不再需要知道唤醒哪个线程,所以我们不再存储spawn的返回值。相反,我们使用notify_one方法通过条件变量通知使用者。

l 解锁、等待和重新锁定都是通过wait方法完成的。我们必须稍微重组控制流,以便能够将守卫传递给等待方法,同时在处理一个项目之前仍然丢弃它。

 

现在我们可以生成任意数量的消费线程,甚至以后可以生成更多线程,而不需要更改任何内容。条件变量负责将通知传递给感兴趣的线程。

如果我们有一个更复杂的系统,其中线程对不同的条件感兴趣,我们可以为每个条件定义一个Condvar。例如,我们可以定义一个表示队列非空,定义另一个表示队列为空。然后,每个线程将等待与它们正在做的事情相关的任何条件。

通常,Condvar只与单个互斥锁一起使用。如果两个线程试图使用两个不同的互斥对象并发地等待一个条件变量,可能会导致恐慌。

Condvar的一个缺点是它只能在与互斥锁一起使用时才能工作,但对于大多数用例来说,这是完全没问题的,因为这正是已经用于保护数据的东西。

thread::park()和Condvar::wait()都有一个带有时间限制的变体:thread::park_timeout()和Condvar::wait_timeout()。它们将Duration作为额外参数,这是它应该放弃等待通知并无条件唤醒的时间。

1 . 9、 总结****

l 多个线程可以在同一个程序中并发运行,并且可以在任何时候生成。

l 主线程结束时,整个程序结束。

l 数据竞争是未定义的行为,Rust的类型系统完全阻止了这种行为(在安全代码中)。

l Send类型的数据可以发送到其他线程,Sync类型的数据可以在线程之间共享。

l 常规线程可能和程序运行的时间一样长,因此只能借用静态数据,比如静态数据和泄漏的分配数据。

l 引用计数(Arc)可用于共享所有权,以确保只要至少有一个线程在使用数据,数据就会存在。

l 限定作用域的线程对于限制线程的生命周期以允许它借用非静态数据(如局部变量)非常有用。

l &T是一个共享的引用。&mut T是独家参考。常规类型不允许通过共享引用进行突变。

l 由于UnsafeCell,一些类型具有内部可变性,它允许通过共享引用进行变异。

l Cell和RefCell是单线程内部可变的标准类型。Atomics、Mutex和RwLock是它们的多线程等价物。

 

l Cell和atomics只允许作为一个整体替换值,而RefCell、Mutex和RwLock允许您通过动态执行访问规则直接改变值。

l 线程暂停是等待某些条件的一种方便方法。

l 当一个条件是关于互斥锁保护的数据时,使用Condvar比线程停放更方便,也更有效。