likes
comments
collection
share

Rust 异步编程之使用 join! 和 select! 同时运行多个 Future

作者站长头像
站长
· 阅读数 9

当只执行一个 Future 时, 在异步函数async fn或者异步代码块async {}内部可以直接使用.await ,当需要并发的执行多个 Future 时,直接使用 .await会阻塞并发任务,直到特定的 Future 完成(串行执行)。futures 包中提供了很多实用的可以并发执行 Future 的工具,比如 join! 宏 和 select! 宏。

注:futures::future 模块下提供了一系列可以操作 Future 的函数(比操作宏丰富的多),详见: docs.rs/futures/lat… docs.rs/futures/lat…

join! 宏

join!宏允许同时等待多个不同 Future 的完成,且可以并发地运行这些 Future。

先来看两个使用使用.await的错误版本:

struct Book;
struct Music;

async fn enjoy_book() -> Book { /* ... */ Book }
async fn enjoy_music() -> Music { /* ... */ Music}

// 错误版本1: 异步内部是顺次串行执行,而不是同时运行它们
async fn enjoy1_book_and_music() -> (Book, Music) {
    // 实际在异步函数内部是是串行执行
    let book = enjoy_book().await; // await 触发阻塞式执行
    let music = enjoy_music().await; // await 触发阻塞式执行
    (book, music)
}

// 错误版本2: 异步内部是顺次串行执行,而不是同时运行它们
async fn enjoy2_book_and_music() -> (Book, Music) {
    // 实际在异步函数内部是是串行执行
    let book_future = enjoy_book(); // async 函数是惰性的,并没有立即执行
    let music_future = enjoy_music(); // async 函数是惰性的,并没有立即执行
    (book_future.await, music_future.await)
}

上面两个例子看似都可以顺利异步运行,但是实际上是必须先看完书后,才能听音乐,即异步函数内部的任务是依次(顺次)串行执行,而不是同时并发的运行

因为在 Rust 中 Future 是惰性的,直到调用 .await 时,才会开始运行。而那两个 await 由于在代码中有先后顺序,因此它们是顺序运行的。

为了正确的并发运行两个 Future , 我们来试试 futures::join! 宏:

use futures::join;

// 使用`join!`会返回一个元组,里面的值是对应的`Future`执行结束后输出的值。
async fn enjoy_book_and_music() -> (Book, Music) {
    let book_fut = enjoy_book();
    let music_fut = enjoy_music();
    // join! 宏必须等待它管理的所有 Future 完成后才能完成
    join!(book_fut, music_fut)
}

fn main() {
    futures::executor::block_on(enjoy_book_and_music());
}

如果希望同时运行一个数组里的多个异步任务,可以使用 futures::future::join_all 方法

try_join! 宏

由于join!必须等待它管理的所有 Future 完成后才能完成,如果你希望在某一个 Future 报错后就立即停止所有 Future 的执行,可以使用 try_join!,特别是当 Future 返回 Result 时。

注:传给 try_join! 的所有 Future 都必须拥有相同的错误类型。如果错误类型不同,可以考虑使用来自 futures::future::TryFutureExt 模块的 map_errerr_info方法将错误进行转换:

use futures::{
    future::TryFutureExt,
    try_join,
};

struct Book;
struct Music;

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
/**
 * 传给 try_join! 的所有 Future 都必须拥有相同的错误类型。
 * 如果错误类型不同,可以考虑使用来自 futures::future::TryFutureExt 模块的 map_err和 err_info方法将错误进行转换:
 */
async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    // 某一个 Future 报错后就立即停止所有 Future 的执行,可以使用 try_join!
    try_join!(book_fut, music_fut)
}

async fn get_into_book_and_music() -> (Book, Music) {
    get_book_and_music().await.unwrap()
}

fn main() {
    futures::executor::block_on(get_into_book_and_music());
}

select! 宏

join!宏只有等所有 Future 结束后,才能集中处理结果,select! 宏则代表等待多个 Future ,只要任何一个 Future 完成,都可以立即被处理:

use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

/**
 * 赛跑模式:同时并发地运行 t1 和 t2, 无论两者哪个先完成, 函数结束且不会等待另一个任务的完成
 */
async fn race_tasks() {
    // .fuse() 方法可以让 Future 实现 FusedFuture 特征,
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    // pin_mut 宏会为 Future 实现 Unpin特征
    pin_mut!(t1, t2);

    // 同时等待多个 Future ,且任何一个 Future 结束后,都可以立即被处理,可以考虑使用 futures::select!:
    select! {
        () = t1 => println!("任务1率先完成"),
        () = t2 => println!("任务2率先完成"),
    }
}

上面的代码会同时并发地运行 t1 和 t2, 无论两者哪个先完成,都会调用对应的 println! 打印相应的输出,然后函数结束且不会等待另一个任务的完成。

注意:使用select宏所必须的满足: FusedFuture + Unpin, 通过 fuse 方法和 pin_mut 宏实现

首先,.fuse()方法可以让 Future 实现 FusedFuture trait, 而 pin_mut! 宏会为 Future 实现 Unpintrait:

注:select!宏所必须的满足: FusedStream + Unpin 两个 Trait 约束:

  • Unpin:由于 select不会过拿走所有权的方式使用 Future,而是通过可变引用的方式去使用,这样当 select 结束后,该 Future 若没有被完成,它的所有权还可以继续被其它代码使用。
  • FusedFuture:当 Future 一旦完成后,那 select就不能再对其进行轮询使用。Fuse意味着熔断,相当于 Future 一旦完成,再次调用poll会直接返回Poll::Pending

只有实现了FusedFutureselect 才能配合 loop 一起使用。假如没有实现,就算一个 Future 已经完成了,它依然会被 select 不停的轮询执行。

Stream 稍有不同,它们使用的特征是 FusedStream。 通过.fuse()(也可以手动实现)实现了该特征的 Stream,对其调用.next() 或 .try_next()方法可以获取实现了FusedFuture特征的Future:

use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams() -> u8 {
    // mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    // mut s2: impl Stream<Item = u8> + FusedStream + Unpin,

    // .fuse() 方法可以让 Stream 实现 FusedStream 特征,
    let s1 = futures::stream::once(async { 10 }).fuse();
    let s2 = futures::stream::once(async { 20 }).fuse();

    // pin_mut 宏会为 Stream 实现 Unpin 特征
    pin_mut!(s1, s2);

    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
            default => panic!(), // 该分支永远不会运行,因为`Future`会先运行,然后是`complete`
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }
    println!("add_two_streams,total = {total}");
    total
}
fn main() {
    executor::block_on(add_two_streams());
}

注:select!宏还支持 default 和 complete 分支:

  • complete 分支:当所有的 Future 和 Stream 完成后才会被执行,它往往配合loop使用,loop用于循环完成所有的 Future
  • default分支:若没有任何 Future 或 Stream 处于 Ready 状态, 则该分支会被立即执行

在使用select宏过程中,推荐两个很实用的函数和类型:

  • Fuse::terminated()函数 :可以在select循环构建一个空的 Future(已实现FusedFuture),后面按需填充新 future
  • FuturesUnordered类型 :可以使某个 Future 有多个拷贝且都可以同时并发运行
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn future_in_select() {
    // 创建一个空 Future,并且已经实现了 FusedFuture 
    let fut = Fuse::terminated();
    // 创建一个 FuturesUnordered 类型,可以多次拷贝
    let mut async_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = i32>>>> = FuturesUnordered::new();
    async_tasks.push(Box::pin(async { 1 }));
    
    pin_mut!(fut);
    
    let mut total = 0;
    loop {
        select! {
            // select_next_some 函数可以用在 `select` 上,并且只运行从 stream 返回的 `Some(_)` 值而忽略 `None`
            num = async_tasks.select_next_some() => {
                println!("first num is {num} and total is {total}");
                total += num;
                println!("total is {total}");
                if total >= 10 { break; }
                // 判断是否已经终止
                if fut.is_terminated() {
                    // 按需填充新 future
                    fut.set(async { 1 }.fuse());
                }
            },
            num = fut => {
                println!("second num is {num} and total is {total}");
                total += num;
                println!("now total is {total}");
                async_tasks.push(Box::pin(async { 1 }));
            },
            complete => break,
            default => panic!(),
        };
    }

    println!("total finally is {total}");
}

fn main() {
    executor::block_on(future_in_select());
}

总结

futures 包中提供了很多实用的并发执行 Future 的工具,比如:

  1. join!宏:并发的运行多个不同 Future,且等待必须等待所有 Future 全部完成,才算结束,可以理解为必须全部完成的任务并发模式
  2. try_join!宏:并发的运行多个不同 Future,当有某一个 Future 报错后就立即停止所有 Future 的执行,Future 返回是Result,可以提前结束的任务并发模式
  3. select!宏:并发的运行多个不同 Future,且只要任何一个 Future 完成,都可以立即被处理,可以理解成任务赛跑模式
  4. 使用select!宏所必须的满足: FusedFuture + Unpin, 通过fuse方法和pin_mut宏实现

参考

转载自:https://juejin.cn/post/7217669171579420732
评论
请登录