likes
comments
collection
share

Rust异步编程: futures 库探究 (futures-rs)

作者站长头像
站长
· 阅读数 3

在学习 Rust 异步编程过程中,不管是主动还是被动,都会接触到不少使用 futures 库的场景,要深入学习 Rust 异步编程,需要从总体上的探究下 futures 的全貌,以便更好的掌握 Rust 异步编程。

futures-rs 是 rust 官方提供的一个类库(即:futures 项目库),它是 Rust 异步编程的基础,提供异步运行时(Runtime)。包括关键 trait 的定义如 Stream,以及宏如join!select! 以及各种 future 组合子用来控制异步流程。

注:futures-rs 中定义的 future 类型是标准库中 future 的原始实现。Rust 将核心的 Future trait 移入标准库中并改为std::future::Future以实现 async/await 语法。在某种意义上讲,std::future::Future 可以看作是futures::future::Future的最小子集。

futures-rs 库提供了许多功能,掌握它们对异步编程很有帮助:

  • 提供了许多并发工具:join!select!
  • 提供了TryFutureFusedFutureStream等 trait
  • 提供了许多FutureTryFutureStream的扩展方法
  • 提供了异步 IO 的支持
  • 提供了 Sink
  • 提供了线程池、异步执行器

官方文档:docs.rs/futures/lat…

future-rs包含一系列的 crate,比如:

crate 名称描述
futures仅仅重新导出了下面 crate ,并提供对外调用的 API
futures-core包含了核心的 trait 和类型
futures-task包含了用于处理任务(task)的工具
futures-channel包含了用于异步通信的各种 channel
futures-executor基于 future-rs 库的异步任务执行器
futures-io包含了 IO 相关抽象的 trait
futures-sink包含了 Sink trait
futures-macro包含了 join!、try_join!、select! 等宏实现
futures-util包含了一些常用的工具并扩展了一些 trait

代码结构

官方代码:github.com/rust-lang/f…

  • ├── ci // 包含一个 sh 文件
  • ├── examples // 包含两个demo
  • ├── futures-test // 用于测试 futures-rs 的通用工具类库(非测试用例)
  • ├── futures // 提供重新导出了下面 crate 对外调用的 API,但没有导出全部
  • ├── futures-channel // 异步通信 channel实现,包含 onshot 和 mpsc
  • ├── futures-core // futures 库的核心 trait 和 type
  • ├── futures-executor // futures 库的异步任务执行器
  • ├── futures-io // 异步 IO 相关抽象的 trait,比如 AsyncRead、AsyncWrite、AsyncSeek、AsyncBufRead 等
  • ├── futures-macro // 提供join!、try_join!、select!、pin_mut!等宏实现
  • ├── futures-sink // Sink trait
  • ├── futures-task // 处理 task 的工具,如 FutureObj/LocalFutureObj struct、 Spawn/LocalSpawn ArcWake trait、
  • ├── futures-util // 通用工具类库、扩展 trait(如AsyncReadExt、SinkExt、SpawnExt)

使用 futures 库

使用futures可以引入 futures-rs 下的全部功能,也可以只引入需要的功能,可以无需标准库即可工作,引入只需在 cargo.toml 添加依赖,比如:

[dependencies]
// 方案1 :引入futures库的全部 crate
futures = "0.3"
// 方案2 ::只引入线程池 crate
futures = { version = "0.3", features = ["thread-pool"] }
// 方案3 :要在`[no_std]`环境中使用 `futures`,请使用:
futures = { version = "0.3", default-features = false }

示例代码:

// main.rs 
use futures::channel::mpsc;   // 消息通道
use futures::executor;  // future 执行器
use futures::executor::ThreadPool; // 线程池
use futures::StreamExt; // Stream 流扩展(比如一些流操作组合)

fn main() {
    let pool = ThreadPool::new().expect("Failed to build pool");
    let (tx, rx) = mpsc::unbounded::<i32>();

    // 使用 async 块创建一个 future,返回一个 Future 的实现
    // 此时尚未提供 executor 给这个 future,因此它不会运行
    let fut_values = async {
        // 创建另外一个 async 块,同样会生成 Future 的实现,
        // 它在父async块里面,因此会在父 async 块执行后,提供 executor 给子 async 块执行
        // executor 连接是由 Future::poll的第二个参数 std::task::Context 完成,
        // 它代表了我们的 executor ,子 async 块生成的 Future 实现它能被 polled (使用父 async 块的 executor)
        let fut_tx_result = async move {
            (0..100).for_each(|v| {
                tx.unbounded_send(v).expect("Failed to send");
            })
        };
        
        // 使用 thread pool 的 spawn 方法传输生成的 future
        pool.spawn_ok(fut_tx_result);

        // 操作组合因子
        let fut_values = rx
            .map(|v| v * 2)
            .collect();

        //使用 async 块提供的 executue 去等待 fut_values 完成
        fut_values.await
    };
    // 真正的去调用上面的 fut_values future,执行它的 poll 方法和 Future 里的子 future 的 poll 方法,最终驱动 fut_values被驱动完成,返回结果
    let values: Vec<i32> = executor::block_on(fut_values);

    println!("Values={:?}", values);
}

futures Crate

futures crate 仅仅是重新导出了futures-rs 下的其他 crate ,并提供对外调用的 API,但没有导出全部,而是提供了常用的一部分模块,trait,函数和宏的导出。

注:futures-rs 对外提供的 api 种类繁多,我们实际用到的可能只是一小部分,stjepang将其中的一部分抽取出来,形成了一个轻量级的 futures 版本,详见 github.com/stjepang/fu…

比如提供重新导出(Re-exports) 异步相关的模块、trait 、宏等:

// trait 导出
pub use futures_core::future::Future;
pub use futures_core::future::TryFuture;
pub use futures_util::future::FutureExt;
pub use futures_util::future::TryFutureExt;
pub use futures_core::stream::Stream;
pub use futures_core::stream::TryStream;
pub use futures_util::stream::StreamExt;
pub use futures_util::stream::TryStreamExt;
pub use futures_sink::Sink;
pub use futures_util::sink::SinkExt;
pub use futures_io::AsyncBufRead;
pub use futures_io::AsyncRead;
pub use futures_io::AsyncSeek;
pub use futures_io::AsyncWrite;
pub use futures_util::AsyncBufReadExt;
pub use futures_util::AsyncReadExt;
pub use futures_util::AsyncSeekExt;
pub use futures_util::AsyncWriteExt;

// 模块导出
pub use futures_util::{future, sink, stream, task};

// 宏导出
pub use futures_util::{join, try_join, select, select_biased, pin_mut};

futures/src/lib.rs 源码如下:

//! Abstractions for asynchronous programming.
//! This crate provides a number of core abstractions for writing asynchronous code:
// futures crate 提供一些列异步编程相关的核心抽象,作用仅仅是重新导出了futures-rs 下的其他 crate,并提供对外调用的 API

#[doc(no_inline)]
pub use futures_core::future::{Future, TryFuture}; // 导出Future和TryFuture
#[doc(no_inline)]
pub use futures_util::future::{FutureExt, TryFutureExt}; // 导出FutureExt和FutureExt

#[doc(no_inline)]
pub use futures_core::stream::{Stream, TryStream}; // 导出Stream和TryStream
#[doc(no_inline)]
pub use futures_util::stream::{StreamExt, TryStreamExt};// 导出StreamExt和TryStreamExt

#[doc(no_inline)]
pub use futures_sink::Sink; // 导出Sink
#[doc(no_inline)]
pub use futures_util::sink::SinkExt; // 导出SinkExt

#[cfg(feature = "std")]
#[doc(no_inline)]
// 导出 AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
#[doc(no_inline)]
// 导出 AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt
pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

// Macro reexports
pub use futures_core::ready; // 导出ready宏
pub use futures_util::pin_mut; // 导出pin_mut宏
#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::select; // 导出select宏
#[cfg(feature = "async-await")]
// 导出一些列宏,比如:join, pending, poll, select_biased, try_join
pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-await

// Module reexports
#[doc(inline)]
// 导出一些列模块,比如:future, sink, stream, task
pub use futures_util::{future, sink, stream, task};

#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::stream_select; // 导出stream_select宏

#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_channel as channel;  // 导出futures_channel
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_util::lock; // 导出lock 工具

#[cfg(feature = "std")]
#[doc(inline)]
pub use futures_util::io; // 导出io工具

#[cfg(feature = "executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
pub mod executor {
    //! Built-in executors and related tools.
    //! This module is only available when the `executor` feature of this library is activated.
    // 导出执行器下的一些列工具,比如block_on, block_on_stream等
    pub use futures_executor::{
        block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
        LocalSpawner,
    };

    #[cfg(feature = "thread-pool")]
    #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
    // 导出执行器下的线程池:ThreadPool和ThreadPoolBuilder
    pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
}

#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
pub mod compat {
    //! Interop between `futures` 0.1 and 0.3.
    //! This module is only available when the `compat` feature of this library is activated.
    // 导出 futures_util 库下 feature 兼容相关一些工具
    pub use futures_util::compat::{
        Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
        Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
    };

    #[cfg(feature = "io-compat")]
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
     // 导出 futures_util 库下 IO 相关的一些的兼容工具
    pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
}

/// 导出 futures crate一些预导入常用项,需要手动导入:use futures::prelude::*;
pub mod prelude {
    //! A "prelude" for crates using the `futures` crate.
    
    pub use crate::future::{self, Future, TryFuture};
    pub use crate::sink::{self, Sink};
    pub use crate::stream::{self, Stream, TryStream};

    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::future::{FutureExt as _, TryFutureExt as _};
    #[doc(no_inline)]
    pub use crate::sink::SinkExt as _;
    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::stream::{StreamExt as _, TryStreamExt as _};

    #[cfg(feature = "std")]
    pub use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};

    #[cfg(feature = "std")]
    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::io::{
        AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
    };
}

官方文档: