Rust 异步编程之使用 join! 和 select! 同时运行多个 Future
当只执行一个 Future 时, 在异步函数async fn
或者异步代码块async {}
内部可以直接使用.await
,当需要并发的执行多个 Future 时,直接使用 .await
会阻塞并发任务,直到特定的 Future 完成(串行执行)。futures
包中提供了很多实用的可以并发执行 Future 的工具,比如 join!
宏 和 select!
宏。
注:futures::future 模块下提供了一系列可以操作 Future 的函数(比操作宏丰富的多),详见: docs.rs/futures/lat… docs.rs/futures/lat…
join! 宏
join!
宏允许同时等待多个不同 Future 的完成,且可以并发地运行这些 Future。
先来看两个使用使用.await
的错误版本:
struct Book;
struct Music;
async fn enjoy_book() -> Book { /* ... */ Book }
async fn enjoy_music() -> Music { /* ... */ Music}
// 错误版本1: 异步内部是顺次串行执行,而不是同时运行它们
async fn enjoy1_book_and_music() -> (Book, Music) {
// 实际在异步函数内部是是串行执行
let book = enjoy_book().await; // await 触发阻塞式执行
let music = enjoy_music().await; // await 触发阻塞式执行
(book, music)
}
// 错误版本2: 异步内部是顺次串行执行,而不是同时运行它们
async fn enjoy2_book_and_music() -> (Book, Music) {
// 实际在异步函数内部是是串行执行
let book_future = enjoy_book(); // async 函数是惰性的,并没有立即执行
let music_future = enjoy_music(); // async 函数是惰性的,并没有立即执行
(book_future.await, music_future.await)
}
上面两个例子看似都可以顺利异步运行,但是实际上是必须先看完书后,才能听音乐,即异步函数内部的任务是依次(顺次)串行执行,而不是同时并发的运行。
因为在 Rust 中 Future 是惰性的,直到调用 .await
时,才会开始运行。而那两个 await
由于在代码中有先后顺序,因此它们是顺序运行的。
为了正确的并发运行两个 Future , 我们来试试 futures::join!
宏:
use futures::join;
// 使用`join!`会返回一个元组,里面的值是对应的`Future`执行结束后输出的值。
async fn enjoy_book_and_music() -> (Book, Music) {
let book_fut = enjoy_book();
let music_fut = enjoy_music();
// join! 宏必须等待它管理的所有 Future 完成后才能完成
join!(book_fut, music_fut)
}
fn main() {
futures::executor::block_on(enjoy_book_and_music());
}
如果希望同时运行一个数组里的多个异步任务,可以使用
futures::future::join_all
方法
try_join! 宏
由于join!
必须等待它管理的所有 Future 完成后才能完成,如果你希望在某一个 Future 报错后就立即停止所有 Future 的执行,可以使用 try_join!
,特别是当 Future 返回 Result
时。
注:传给 try_join!
的所有 Future 都必须拥有相同的错误类型。如果错误类型不同,可以考虑使用来自 futures::future::TryFutureExt
模块的 map_err
和err_info
方法将错误进行转换:
use futures::{
future::TryFutureExt,
try_join,
};
struct Book;
struct Music;
async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
/**
* 传给 try_join! 的所有 Future 都必须拥有相同的错误类型。
* 如果错误类型不同,可以考虑使用来自 futures::future::TryFutureExt 模块的 map_err和 err_info方法将错误进行转换:
*/
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
// 某一个 Future 报错后就立即停止所有 Future 的执行,可以使用 try_join!
try_join!(book_fut, music_fut)
}
async fn get_into_book_and_music() -> (Book, Music) {
get_book_and_music().await.unwrap()
}
fn main() {
futures::executor::block_on(get_into_book_and_music());
}
select! 宏
join!
宏只有等所有 Future 结束后,才能集中处理结果,select!
宏则代表等待多个 Future ,只要任何一个 Future 完成,都可以立即被处理:
use futures::{
future::FutureExt, // for `.fuse()`
pin_mut,
select,
};
async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }
/**
* 赛跑模式:同时并发地运行 t1 和 t2, 无论两者哪个先完成, 函数结束且不会等待另一个任务的完成
*/
async fn race_tasks() {
// .fuse() 方法可以让 Future 实现 FusedFuture 特征,
let t1 = task_one().fuse();
let t2 = task_two().fuse();
// pin_mut 宏会为 Future 实现 Unpin特征
pin_mut!(t1, t2);
// 同时等待多个 Future ,且任何一个 Future 结束后,都可以立即被处理,可以考虑使用 futures::select!:
select! {
() = t1 => println!("任务1率先完成"),
() = t2 => println!("任务2率先完成"),
}
}
上面的代码会同时并发地运行 t1
和 t2
, 无论两者哪个先完成,都会调用对应的 println!
打印相应的输出,然后函数结束且不会等待另一个任务的完成。
注意:使用select
宏所必须的满足: FusedFuture + Unpin, 通过 fuse 方法和 pin_mut 宏实现
首先,.fuse()
方法可以让 Future
实现 FusedFuture
trait, 而 pin_mut!
宏会为 Future
实现 Unpin
trait:
注:
select!
宏所必须的满足:FusedStream
+Unpin
两个 Trait 约束:
Unpin
:由于select
不会过拿走所有权的方式使用 Future,而是通过可变引用的方式去使用,这样当select
结束后,该 Future 若没有被完成,它的所有权还可以继续被其它代码使用。FusedFuture
:当 Future 一旦完成后,那select
就不能再对其进行轮询使用。Fuse
意味着熔断,相当于 Future 一旦完成,再次调用poll
会直接返回Poll::Pending
。
只有实现了FusedFuture
,select
才能配合 loop
一起使用。假如没有实现,就算一个 Future
已经完成了,它依然会被 select
不停的轮询执行。
Stream
稍有不同,它们使用的特征是 FusedStream
。 通过.fuse()
(也可以手动实现)实现了该特征的 Stream
,对其调用.next()
或 .try_next()
方法可以获取实现了FusedFuture
特征的Future
:
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};
async fn add_two_streams() -> u8 {
// mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
// mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
// .fuse() 方法可以让 Stream 实现 FusedStream 特征,
let s1 = futures::stream::once(async { 10 }).fuse();
let s2 = futures::stream::once(async { 20 }).fuse();
// pin_mut 宏会为 Stream 实现 Unpin 特征
pin_mut!(s1, s2);
let mut total = 0;
loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
default => panic!(), // 该分支永远不会运行,因为`Future`会先运行,然后是`complete`
};
if let Some(next_num) = item {
total += next_num;
}
}
println!("add_two_streams,total = {total}");
total
}
fn main() {
executor::block_on(add_two_streams());
}
注:
select!
宏还支持default
和complete
分支:
complete
分支:当所有的Future
和Stream
完成后才会被执行,它往往配合loop
使用,loop
用于循环完成所有的Future
default
分支:若没有任何Future
或Stream
处于Ready
状态, 则该分支会被立即执行
在使用select
宏过程中,推荐两个很实用的函数和类型:
Fuse::terminated()
函数 :可以在select
循环构建一个空的 Future(已实现FusedFuture
),后面按需填充新 futureFuturesUnordered
类型 :可以使某个Future
有多个拷贝且都可以同时并发运行
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};
async fn future_in_select() {
// 创建一个空 Future,并且已经实现了 FusedFuture
let fut = Fuse::terminated();
// 创建一个 FuturesUnordered 类型,可以多次拷贝
let mut async_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = i32>>>> = FuturesUnordered::new();
async_tasks.push(Box::pin(async { 1 }));
pin_mut!(fut);
let mut total = 0;
loop {
select! {
// select_next_some 函数可以用在 `select` 上,并且只运行从 stream 返回的 `Some(_)` 值而忽略 `None`
num = async_tasks.select_next_some() => {
println!("first num is {num} and total is {total}");
total += num;
println!("total is {total}");
if total >= 10 { break; }
// 判断是否已经终止
if fut.is_terminated() {
// 按需填充新 future
fut.set(async { 1 }.fuse());
}
},
num = fut => {
println!("second num is {num} and total is {total}");
total += num;
println!("now total is {total}");
async_tasks.push(Box::pin(async { 1 }));
},
complete => break,
default => panic!(),
};
}
println!("total finally is {total}");
}
fn main() {
executor::block_on(future_in_select());
}
总结
futures
包中提供了很多实用的并发执行 Future 的工具,比如:
join!
宏:并发的运行多个不同 Future,且等待必须等待所有 Future 全部完成,才算结束,可以理解为必须全部完成的任务并发模式try_join!
宏:并发的运行多个不同 Future,当有某一个 Future 报错后就立即停止所有 Future 的执行,Future 返回是Result
,可以提前结束的任务并发模式select!
宏:并发的运行多个不同 Future,且只要任何一个 Future 完成,都可以立即被处理,可以理解成任务赛跑模式- 使用
select!
宏所必须的满足:FusedFuture
+Unpin
, 通过fuse
方法和pin_mut
宏实现
参考
转载自:https://juejin.cn/post/7217669171579420732