Rust异步编程: futures 库探究 (futures-rs)
在学习 Rust 异步编程过程中,不管是主动还是被动,都会接触到不少使用 futures 库的场景,要深入学习 Rust 异步编程,需要从总体上的探究下 futures 的全貌,以便更好的掌握 Rust 异步编程。
futures-rs
是 rust 官方提供的一个类库(即:futures 项目库),它是 Rust 异步编程的基础,提供异步运行时(Runtime)。包括关键 trait 的定义如 Stream
,以及宏如join!
, select!
以及各种 future 组合子用来控制异步流程。
注:futures-rs 中定义的 future 类型是标准库中 future 的原始实现。Rust 将核心的 Future trait 移入标准库中并改为
std::future::Future
以实现 async/await 语法。在某种意义上讲,std::future::Future
可以看作是futures::future::Future
的最小子集。
futures-rs
库提供了许多功能,掌握它们对异步编程很有帮助:
- 提供了许多并发工具:
join!
、select!
- 提供了
TryFuture
、FusedFuture
、Stream
等trait
- 提供了许多
Future
、TryFuture
、Stream
的扩展方法 - 提供了异步 IO 的支持
- 提供了
Sink
- 提供了线程池、异步执行器
官方文档:docs.rs/futures/lat…
future-rs
包含一系列的 crate,比如:
crate 名称 | 描述 |
---|---|
futures | 仅仅重新导出了下面 crate ,并提供对外调用的 API |
futures-core | 包含了核心的 trait 和类型 |
futures-task | 包含了用于处理任务(task)的工具 |
futures-channel | 包含了用于异步通信的各种 channel |
futures-executor | 基于 future-rs 库的异步任务执行器 |
futures-io | 包含了 IO 相关抽象的 trait |
futures-sink | 包含了 Sink trait |
futures-macro | 包含了 join!、try_join!、select! 等宏实现 |
futures-util | 包含了一些常用的工具并扩展了一些 trait |
代码结构
- ├── ci // 包含一个 sh 文件
- ├── examples // 包含两个demo
- ├── futures-test // 用于测试 futures-rs 的通用工具类库(非测试用例)
- ├── futures // 提供重新导出了下面 crate 对外调用的 API,但没有导出全部
- ├── futures-channel // 异步通信 channel实现,包含 onshot 和 mpsc
- ├── futures-core // futures 库的核心 trait 和 type
- ├── futures-executor // futures 库的异步任务执行器
- ├── futures-io // 异步 IO 相关抽象的 trait,比如 AsyncRead、AsyncWrite、AsyncSeek、AsyncBufRead 等
- ├── futures-macro // 提供join!、try_join!、select!、pin_mut!等宏实现
- ├── futures-sink // Sink trait
- ├── futures-task // 处理 task 的工具,如 FutureObj/LocalFutureObj struct、 Spawn/LocalSpawn ArcWake trait、
- ├── futures-util // 通用工具类库、扩展 trait(如AsyncReadExt、SinkExt、SpawnExt)
使用 futures 库
使用futures
可以引入 futures-rs 下的全部功能,也可以只引入需要的功能,可以无需标准库即可工作,引入只需在 cargo.toml 添加依赖,比如:
[dependencies]
// 方案1 :引入futures库的全部 crate
futures = "0.3"
// 方案2 ::只引入线程池 crate
futures = { version = "0.3", features = ["thread-pool"] }
// 方案3 :要在`[no_std]`环境中使用 `futures`,请使用:
futures = { version = "0.3", default-features = false }
示例代码:
// main.rs
use futures::channel::mpsc; // 消息通道
use futures::executor; // future 执行器
use futures::executor::ThreadPool; // 线程池
use futures::StreamExt; // Stream 流扩展(比如一些流操作组合)
fn main() {
let pool = ThreadPool::new().expect("Failed to build pool");
let (tx, rx) = mpsc::unbounded::<i32>();
// 使用 async 块创建一个 future,返回一个 Future 的实现
// 此时尚未提供 executor 给这个 future,因此它不会运行
let fut_values = async {
// 创建另外一个 async 块,同样会生成 Future 的实现,
// 它在父async块里面,因此会在父 async 块执行后,提供 executor 给子 async 块执行
// executor 连接是由 Future::poll的第二个参数 std::task::Context 完成,
// 它代表了我们的 executor ,子 async 块生成的 Future 实现它能被 polled (使用父 async 块的 executor)
let fut_tx_result = async move {
(0..100).for_each(|v| {
tx.unbounded_send(v).expect("Failed to send");
})
};
// 使用 thread pool 的 spawn 方法传输生成的 future
pool.spawn_ok(fut_tx_result);
// 操作组合因子
let fut_values = rx
.map(|v| v * 2)
.collect();
//使用 async 块提供的 executue 去等待 fut_values 完成
fut_values.await
};
// 真正的去调用上面的 fut_values future,执行它的 poll 方法和 Future 里的子 future 的 poll 方法,最终驱动 fut_values被驱动完成,返回结果
let values: Vec<i32> = executor::block_on(fut_values);
println!("Values={:?}", values);
}
futures Crate
futures crate 仅仅是重新导出了futures-rs 下的其他 crate ,并提供对外调用的 API,但没有导出全部,而是提供了常用的一部分模块,trait,函数和宏的导出。
注:futures-rs 对外提供的 api 种类繁多,我们实际用到的可能只是一小部分,stjepang将其中的一部分抽取出来,形成了一个轻量级的 futures 版本,详见 github.com/stjepang/fu…
比如提供重新导出(Re-exports) 异步相关的模块、trait 、宏等:
// trait 导出
pub use futures_core::future::Future;
pub use futures_core::future::TryFuture;
pub use futures_util::future::FutureExt;
pub use futures_util::future::TryFutureExt;
pub use futures_core::stream::Stream;
pub use futures_core::stream::TryStream;
pub use futures_util::stream::StreamExt;
pub use futures_util::stream::TryStreamExt;
pub use futures_sink::Sink;
pub use futures_util::sink::SinkExt;
pub use futures_io::AsyncBufRead;
pub use futures_io::AsyncRead;
pub use futures_io::AsyncSeek;
pub use futures_io::AsyncWrite;
pub use futures_util::AsyncBufReadExt;
pub use futures_util::AsyncReadExt;
pub use futures_util::AsyncSeekExt;
pub use futures_util::AsyncWriteExt;
// 模块导出
pub use futures_util::{future, sink, stream, task};
// 宏导出
pub use futures_util::{join, try_join, select, select_biased, pin_mut};
futures/src/lib.rs 源码如下:
//! Abstractions for asynchronous programming.
//! This crate provides a number of core abstractions for writing asynchronous code:
// futures crate 提供一些列异步编程相关的核心抽象,作用仅仅是重新导出了futures-rs 下的其他 crate,并提供对外调用的 API
#[doc(no_inline)]
pub use futures_core::future::{Future, TryFuture}; // 导出Future和TryFuture
#[doc(no_inline)]
pub use futures_util::future::{FutureExt, TryFutureExt}; // 导出FutureExt和FutureExt
#[doc(no_inline)]
pub use futures_core::stream::{Stream, TryStream}; // 导出Stream和TryStream
#[doc(no_inline)]
pub use futures_util::stream::{StreamExt, TryStreamExt};// 导出StreamExt和TryStreamExt
#[doc(no_inline)]
pub use futures_sink::Sink; // 导出Sink
#[doc(no_inline)]
pub use futures_util::sink::SinkExt; // 导出SinkExt
#[cfg(feature = "std")]
#[doc(no_inline)]
// 导出 AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
#[doc(no_inline)]
// 导出 AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt
pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
// Macro reexports
pub use futures_core::ready; // 导出ready宏
pub use futures_util::pin_mut; // 导出pin_mut宏
#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::select; // 导出select宏
#[cfg(feature = "async-await")]
// 导出一些列宏,比如:join, pending, poll, select_biased, try_join
pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-await
// Module reexports
#[doc(inline)]
// 导出一些列模块,比如:future, sink, stream, task
pub use futures_util::{future, sink, stream, task};
#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::stream_select; // 导出stream_select宏
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_channel as channel; // 导出futures_channel
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_util::lock; // 导出lock 工具
#[cfg(feature = "std")]
#[doc(inline)]
pub use futures_util::io; // 导出io工具
#[cfg(feature = "executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
pub mod executor {
//! Built-in executors and related tools.
//! This module is only available when the `executor` feature of this library is activated.
// 导出执行器下的一些列工具,比如block_on, block_on_stream等
pub use futures_executor::{
block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
LocalSpawner,
};
#[cfg(feature = "thread-pool")]
#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
// 导出执行器下的线程池:ThreadPool和ThreadPoolBuilder
pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
}
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
pub mod compat {
//! Interop between `futures` 0.1 and 0.3.
//! This module is only available when the `compat` feature of this library is activated.
// 导出 futures_util 库下 feature 兼容相关一些工具
pub use futures_util::compat::{
Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
};
#[cfg(feature = "io-compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
// 导出 futures_util 库下 IO 相关的一些的兼容工具
pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
}
/// 导出 futures crate一些预导入常用项,需要手动导入:use futures::prelude::*;
pub mod prelude {
//! A "prelude" for crates using the `futures` crate.
pub use crate::future::{self, Future, TryFuture};
pub use crate::sink::{self, Sink};
pub use crate::stream::{self, Stream, TryStream};
#[doc(no_inline)]
#[allow(unreachable_pub)]
pub use crate::future::{FutureExt as _, TryFutureExt as _};
#[doc(no_inline)]
pub use crate::sink::SinkExt as _;
#[doc(no_inline)]
#[allow(unreachable_pub)]
pub use crate::stream::{StreamExt as _, TryStreamExt as _};
#[cfg(feature = "std")]
pub use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
#[doc(no_inline)]
#[allow(unreachable_pub)]
pub use crate::io::{
AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
};
}
官方文档:
- futures-rs: github.com/rust-lang/f…
- futures crates:crates.io/crates/futu…
- futures doc:docs.rs/futures/lat…
- futures-future:docs.rs/futures/lat…
- Future trait:docs.rs/futures/lat…
- FutureExt tait:docs.rs/futures/lat…
- TryFutureExt tait:docs.rs/futures/lat…
转载自:https://juejin.cn/post/7220597357141753911