【Rust学习之旅】多线程并发编程(十六)
序
这一期我们来讲讲 rust 的多线程吧,前面我们讲到了,安全和高效的处理并发是 Rust 语言的主要目标之一。
Rust 由于语言设计理念、安全、性能的多方面考虑,选择了多线程与 async/await
相结合,
对于 前端来说 async/await
绝对不陌生,当然这里有些不一样,JavaScript 是单线程的动态语言。使用的是事件循环机制。那我们看看 rust 的async/await
有什么不一样的地方。
并发和并行
什么是并发和并行?我们来看下面一个简单的例子就明白了
你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行
。
你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支持并发
。
你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行
。
并发
的关键是你有处理多个任务的能力,不一定要同时
。
并行
的关键是你有同时处理多个任务的能力
。
我们经常在买手机,或者买电脑什么什么 CPU 8 核、12 核什么的。
单核心并发
在早期只有一个 CPU 核心时,我们的任务是怎么处理的呢?
在 OS 级别,多线程负责管理我们的任务队列,你可以简单认为一个线程管理着一个任务队列,然后线程之间还能根据空闲度进行任务调度。我们的程序只会跟 OS 线程打交道,并不关心 CPU 到底有多少个核心,真正关心的只是 OS,当线程把任务交给 CPU 核心去执行时,如果只有一个 CPU 核心,那么它就只能同时处理一个任务。
和排队一样,假如某个任务执行时间过长,就会导致用户界面的假死(相信使用 Windows 的同学或多或少都碰到过假死的问题)那么就需要 CPU 的任务调度了(真实 CPU 的调度很复杂,我们这里做了简化),有一个调度器会按照某些条件从队列中选择任务进行执行,并且当一个任务执行时间过长时,会强行切换该任务到后台中(或者放入任务队列,真实情况很复杂!),去执行新的任务。
不断这样的快速任务切换,对用户而言就实现了表面上的多任务同时处理,但是实际上最终也只有一个 CPU 核心在不停的工作。
因此并发的关键在于:快速轮换处理不同的任务,给用户带来所有任务同时在运行的假象。
多核心并行
当 CPU 核心增多到 N
时,那么同一时间就能有 N
个任务被处理,那么我们的并行度就是 N
,相应的处理效率也变成了单核心的 N
倍(实际情况并没有这么高)。
多核心并发
当核心增多到 N
时,操作系统同时在进行的任务肯定远不止 N
个,这些任务将被放入 M
个线程队列中,接着交给 N
个 CPU 核心去执行,最后实现了 M:N
的处理模型,在这种情况下,并发与并行是同时在发生的,所有用户任务从表面来看都在并发的运行,但实际上,同一时刻只有 N
个任务能被同时并行的处理。
很明显我们的 JavaScript 就是第一种单核心并发。所以性能问题也是 JavaScript 的一个缺点,当然这是在浏览器这种环境下,这样的设计也更合理。
使用线程
多线程编程的风险 由于多线程的代码是同时运行的,因此我们无法保证线程间的执行顺序,这会导致一些问题:
- 竞态条件(race conditions),多个线程以非一致性的顺序同时访问数据资源
- 死锁(deadlocks),两个线程都想使用某个资源,但是又都在等待对方释放资源后才能使用,结果最终都无法继续执行
- 一些因为多线程导致的很隐晦的 BUG,难以复现和解决
创建线程
使用 thread::spawn
可以创建线程:
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
有几点值得注意:
- 线程内部的代码使用闭包来执行
main
线程一旦结束,程序就立刻结束,因此需要保持它的存活,直到其它子线程完成自己的任务thread::sleep
会让当前线程休眠指定的时间,随后其它线程会被调度运行(上一节并发与并行中有简单介绍过),因此就算你的电脑只有一个 CPU 核心,该程序也会表现的如同多 CPU 核心一般,这就是并发!
来看看输出:
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!
如果多运行几次,你会发现好像每次输出会不太一样,因为:虽说线程往往是轮流执行的,但是这一点无法被保证!线程调度的方式往往取决于你使用的操作系统。总之,千万不要依赖线程的执行顺序。
等待子线程的结束
上面的代码你不但可能无法让子线程从 1 顺序打印到 10,而且可能打印的数字会变少,因为主线程会提前结束,导致子线程也随之结束,更过分的是,如果当前系统繁忙,甚至该子线程还没被创建,主线程就已经结束了!
因此我们需要一个方法,让主线程安全、可靠地等所有子线程完成任务后,再 kill self:
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..5 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
handle.join().unwrap();
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
通过调用 handle.join
,可以让当前线程阻塞,直到它等待的子线程的结束,在上面代码中,由于 main
线程会被阻塞,因此它直到子线程结束后才会输出自己的 1..5
:
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!
以上输出清晰的展示了线程阻塞的作用,如果你将 handle.join
放置在 main
线程中的 for
循环后面,那就是另外一个结果:两个线程交替输出。
在线程闭包中使用 move
在前面闭包
章节中,有讲过 move
关键字在闭包中的使用可以让该闭包拿走环境中某个值的所有权,同样地,你可以使用 move
来将所有权从一个线程转移到另外一个线程。
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
// 下面代码会报错borrow of moved value: `v`
// println!("{:?}",v);
}
Rust 的所有权机制保证了数据使用上的安全:v
的所有权被转移给新的线程后,main
线程将无法继续使用:最后一行代码将报错。
线程是如何结束的
之前我们提到 main
线程是程序的主线程,一旦结束,则程序随之结束,同时各个子线程也将被强行终止。那么有一个问题,如果父线程不是 main
线程,那么父线程的结束会导致什么?自生自灭还是被干掉?
在系统编程中,操作系统提供了直接杀死线程的接口,简单粗暴,但是 Rust 并没有提供这样的接口,原因在于,粗暴地终止一个线程可能会导致资源没有释放、状态混乱等不可预期的结果,一向以安全自称的 Rust,自然不会砸自己的饭碗。
那么 Rust 中线程是如何结束的呢?答案很简单:线程的代码执行完,线程就会自动结束。但是如果线程中的代码不会执行完呢?那么情况可以分为两种进行讨论:
- 线程的任务是一个循环 IO 读取,任务流程类似:IO 阻塞,等待读取新的数据 -> 读到数据,处理完成 -> 继续阻塞等待 ··· -> 收到 socket 关闭的信号 -> 结束线程,在此过程中,绝大部分时间线程都处于阻塞的状态,因此虽然看上去是循环,CPU 占用其实很小,也是网络服务中最最常见的模型
- 线程的任务是一个循环,里面没有任何阻塞,包括休眠这种操作也没有,此时 CPU 很不幸的会被跑满,而且你如果没有设置终止条件,该线程将持续跑满一个 CPU 核心,并且不会被终止,直到
main
线程的结束
第一情况很常见,我们来模拟看看第二种情况:
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个线程A
let new_thread = thread::spawn(move || {
// 再创建一个线程B
thread::spawn(move || {
loop {
println!("I am a new thread.");
}
})
});
// 等待新创建的线程执行完成
new_thread.join().unwrap();
println!("Child thread is finish!");
// 睡眠一段时间,看子线程创建的子线程是否还在运行
thread::sleep(Duration::from_millis(100));
}
以上代码中,main
线程创建了一个新的线程 A
,同时该新线程又创建了一个新的线程 B
,可以看到 A
线程在创建完 B
线程后就立即结束了,而 B
线程则在不停地循环输出。
从之前的线程结束规则,我们可以猜测程序将这样执行:A
线程结束后,由它创建的 B
线程仍在疯狂输出,直到 main
线程在 100 毫秒后结束。如果你把该时间增加到几十秒,就可以看到你的 CPU 核心 100% 的盛况了-,-
多线程的性能
创建线程的性能
据不精确估算,创建一个线程大概需要 0.24 毫秒,随着线程的变多,这个值会变得更大,因此线程的创建耗时并不是不可忽略的,只有当真的需要处理一个值得用线程去处理的任务时,才使用线程,一些鸡毛蒜皮的任务,就无需创建线程了。
创建多少线程合适
因为 CPU 的核心数限制,当任务是 CPU 密集型时,就算线程数超过了 CPU 核心数,也并不能帮你获得更好的性能,因为每个线程的任务都可以轻松让 CPU 的某个核心跑满,既然如此,让线程数等于 CPU 核心数是最好的。
但是当你的任务大部分时间都处于阻塞状态时,就可以考虑增多线程数量,这样当某个线程处于阻塞状态时,会被切走,进而运行其它的线程,典型就是网络 IO 操作,我们可以为每一个进来的用户连接创建一个线程去处理,该连接绝大部分时间都是处于 IO 读取阻塞状态,因此有限的 CPU 核心完全可以处理成百上千的用户连接线程,但是事实上,对于这种网络 IO 情况,一般都不再使用多线程的方式了,毕竟操作系统的线程数是有限的,意味着并发数也很容易达到上限,而且过多的线程也会导致线程上下文切换的代价过大,使用 async/await
的 M:N
并发模型,就没有这个烦恼。
多线程的开销
多线程的开销往往是在锁、数据竞争、缓存失效上,这些限制了现代化软件系统随着 CPU 核心的增多性能也线性增加的野心。
线程屏障(Barrier)
在 Rust 中,可以使用 Barrier
让多个线程都执行到某个点后,才继续一起往后执行:
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let mut handles = Vec::with_capacity(6);
let barrier = Arc::new(Barrier::new(6));
for _ in 0..6 {
let b = barrier.clone();
handles.push(thread::spawn(move|| {
println!("before wait");
b.wait(); // 注意这个地方
println!("after wait");
}));
}
for handle in handles {
handle.join().unwrap();
}
}
上面代码,我们在线程打印出 before wait
后增加了一个屏障,目的就是等所有的线程都打印出before wait后,各个线程再继续执行:
before wait
before wait
before wait
before wait
before wait
before wait
after wait
after wait
after wait
after wait
after wait
after wait
线程局部变量
对于多线程编程,线程局部变量在一些场景下非常有用,而 Rust 通过标准库和三方库对此进行了支持。
标准库 thread_local
使用 thread_local
宏可以初始化线程局部变量,然后在线程内部使用该变量的 with
方法获取变量值:
use std::cell::RefCell;
use std::thread;
thread_local!(static FOO: RefCell<u32> = RefCell::new(1));
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 2;
});
// 每个线程开始时都会拿到线程局部变量的FOO的初始值
let t = thread::spawn(move|| {
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 3;
});
});
// 等待线程完成
t.join().unwrap();
// 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
FOO.with(|f| {
assert_eq!(*f.borrow(), 2);
});
上面代码中,FOO
即是我们创建的线程局部变量,每个新的线程访问它时,都会使用它的初始值作为开始,各个线程中的 FOO
值彼此互不干扰。注意 FOO
使用 static
声明为生命周期为 'static
的静态变量。
可以注意到,线程中对 FOO
的使用是通过借用的方式,但是若我们需要每个线程独自获取它的拷贝,最后进行汇总,就有些强人所难了。
你还可以在结构体中使用线程局部变量:
use std::cell::RefCell;
struct Foo;
impl Foo {
thread_local! {
static FOO: RefCell<usize> = RefCell::new(0);
}
}
fn main() {
Foo::FOO.with(|x| println!("{:?}", x));
}
或者通过引用的方式使用它:
use std::cell::RefCell;
use std::thread::LocalKey;
thread_local! {
static FOO: RefCell<usize> = RefCell::new(0);
}
struct Bar {
foo: &'static LocalKey<RefCell<usize>>,
}
impl Bar {
fn constructor() -> Self {
Self {
foo: &FOO,
}
}
}
三方库 thread-local
除了标准库外,一位大神还开发了 thread-local 库,它允许每个线程持有值的独立拷贝:
use thread_local::ThreadLocal;
use std::sync::Arc;
use std::cell::Cell;
use std::thread;
let tls = Arc::new(ThreadLocal::new());
// 创建多个线程
for _ in 0..5 {
let tls2 = tls.clone();
thread::spawn(move || {
// 将计数器加1
let cell = tls2.get_or(|| Cell::new(0));
cell.set(cell.get() + 1);
}).join().unwrap();
}
// 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
let tls = Arc::try_unwrap(tls).unwrap();
let total = tls.into_iter().fold(0, |x, y| x + y.get());
// 和为5
assert_eq!(total, 5);
该库不仅仅使用了值的拷贝,而且还能自动把多个拷贝汇总到一个迭代器中,最后进行求和,非常好用。
用条件控制线程的挂起和执行
条件变量(Condition Variables)经常和 Mutex
一起使用,可以让线程挂起,直到某个条件发生后再继续执行:
use std::thread;
use std::sync::{Arc, Mutex, Condvar};
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
thread::spawn(move|| {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
println!("changing started");
*started = true;
cvar.notify_one();
});
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
println!("started changed");
}
上述代码流程如下:
main
线程首先进入while
循环,调用wait
方法挂起等待子线程的通知,并释放了锁started
- 子线程获取到锁,并将其修改为
true
,然后调用条件变量的notify_one
方法来通知主线程继续执行
只被调用一次的函数
有时,我们会需要某个函数在多线程环境下只被调用一次,例如初始化全局变量,无论是哪个线程先调用函数来初始化,都会保证全局变量只会被初始化一次,随后的其它线程调用就会忽略该函数:
use std::thread;
use std::sync::Once;
static mut VAL: usize = 0;
static INIT: Once = Once::new();
fn main() {
let handle1 = thread::spawn(move || {
INIT.call_once(|| {
unsafe {
VAL = 1;
}
});
});
let handle2 = thread::spawn(move || {
INIT.call_once(|| {
unsafe {
VAL = 2;
}
});
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("{}", unsafe { VAL });
}
代码运行的结果取决于哪个线程先调用 INIT.call_once
(虽然代码具有先后顺序,但是线程的初始化顺序并无法被保证!因为线程初始化是异步的,且耗时较久),若 handle1
先,则输出 1
,否则输出 2
。
call_once 方法
执行初始化过程一次,并且只执行一次。
如果当前有另一个初始化过程正在运行,线程将阻止该方法被调用。
当这个函数返回时,保证一些初始化已经运行并完成,它还保证由执行的闭包所执行的任何内存写入都能被其他线程在这时可靠地观察到。
结语
Rust 的线程模型是 1:1
模型,因为 Rust 要保持尽量小的运行时。
我们可以使用 thread::spawn
来创建线程,创建出的多个线程之间并不存在执行顺序关系,因此代码逻辑千万不要依赖于线程间的执行顺序。
main
线程若是结束,则所有子线程都将被终止,如果希望等待子线程结束后,再结束 main
线程,你需要使用创建线程时返回的句柄的 join
方法。
在线程中无法直接借用外部环境中的变量值,因为新线程的启动时间点和结束时间点是不确定的,所以 Rust 无法保证该线程中借用的变量在使用过程中依然是合法的。你可以使用 move
关键字将变量的所有权转移给新的线程,来解决此问题。
父线程结束后,子线程仍在持续运行,直到子线程的代码运行完成或者 main
线程的结束。
这一章章节只讲了多线程开发的一些简单概念,后面还会有讲他们怎么通信的,这些对比起nodejs
其他也有相似之处。
越到后面越来越复杂,最近的文章都很多参考 Rust语言圣经(Rust Course)
. 这的确是值得囍品读理解的不二之选。
对于前端来说,确实越来越难。关于我反复入门 rust 这件事
🤔😄
转载自:https://juejin.cn/post/7253371343751151676