likes
comments
collection
share

Rust 异步编程之 Stream 流处理

作者站长头像
站长
· 阅读数 7

Stream trait 类似于 Future trait, Future 对应的是一个 item 的状态的变化,但 Stream 与标准库中的 Iterator trait 类似,在结束之前前可以生成多个值。或者我们可以简单的理解为,Stream 是由一系列的 Future 组成,我们可以从 Stream 读取各个 Future 的结果,直到 Stream 结束。

Rust 异步编程之 Stream 流处理

Stream 的定义

Future 是异步开发中最基础的概念了,如果说 Future 代表了一次性的异步值,那么 Stream 则代表了一系列的异步值。Future 是1,Stream是0,1或者N。 Stream 签名如下:

pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

Stream 对应了同步原语中的 Iterator 的概念,回忆一下,是不是连签名都是如此的相像呢!

pub trait Iterator {
    type Item;

    fn next(&mut self) -> Option<Self::Item>;
}

Stream 用来抽象源源不断的数据源,当然也可以断(当 pollNone 的时候)

比如关于 Stream 的一个常见例子是消息通道( futures 包中的)的消费者 Receiver。每次有消息从 Send 端发送后,它都可以接收到一个 Some(val) 值, 一旦 Send 端关闭(drop),且消息通道中没有消息后,它会接收到一个 None 值。

use futures::channel::mpsc;
use futures::{executor::block_on, SinkExt, StreamExt};

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    println!("tx: Send 1, 2");
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` 类似于 `Iterator::next`, 但是前者返回的不是值,
    // 而是一个 `Future<Output = Option<T>>`,因此还需要使用`.await`来获取具体的值
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

fn main() {
    block_on(send_recv());   
}

Iterator 和 Stream 的区别:

  • Iterator 可以不断调用next()方法,获得新的值,直到 Iterator 返回None。Iterator 是阻塞式返回数据的,每次调用 next(),必然 独占 CPU 直到得到一个结果,而异步的 Stream 是非阻塞的,在等待的过程中会空出 CPU 做其他事情。
  • Streampoll_next()方法,它跟 Future 的poll()方法很像,和 Iterator 版本的 next() 的作用类似。然而poll_next()调用起来不方便,需要自己处理 Poll 状态,这不是很友好,所以 Rust 提供了 StreamExt,作为 Stream 的扩展,提供了next()方法,返回一个实现了 Future trait 的Next结构体,这样就可以直接通过stream.next().await来迭代一个值了。

注:StreamExt 是 StreamExtension 的简写。在 Rust 中,通常的做法是只在一个文件中放入最小定义(比如 Stream),且在另一个扩展的相关文件中放入额外的 api(比如 StreamExt)。

注:Stream trait 还没有像 future 一样在 Rust 的核心库(std::core)中,它在 future_utils crate 中,而 StreamExtensions 也不在标准库中。这意味着,由于不同的库提供不同的导入,你可能会得到冲突的导入。例如,tokio 提供不同的 StreamExt 与 futures_utils。如果可以的话,尽量使用 futures_utils,因为它是 async/await 最常用的 crate

StreamExt 的 next() 方法以及 Next 结构的实现:

pub trait StreamExt: Stream {
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
        assert_future::<Option<Self::Item>, _>(Next::new(self))
    }
}

// next 返回了 Next 结构
pub struct Next<'a, St: ?Sized> {
    stream: &'a mut St,
}

// 如果 Stream Unpin 那么 Next 也是 Unpin
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}

impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
    pub(super) fn new(stream: &'a mut St) -> Self {
        Self { stream }
    }
}

// Next 实现了 Future,每次 poll() 实际上就是从 stream 中 poll_next()
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
    type Output = Option<St::Item>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.stream.poll_next_unpin(cx)
    }
}

Stream 的创建

futures 库提供了一些有用的方法来创建一些基本 Stream 流,例如:

  • empty():生成一个空的 Stream
  • once():生成一个只包含单个值的 Stream
  • pending():生成一个不包含任何值,只返回 Poll::Pending 的 Stream
  • repeat():生成一个一直返回相同值的 Stream
  • repeat_with():通过闭包函数无穷尽地返回数据的 Stream
  • poll_fn():通过一个返回 Poll 的闭包来产生 Stream
  • unfold():通过初始值和返回 Future 的闭包来产生 Stream
use futures::prelude::*;

#[tokio::main]
async fn main() {
    let mut st = stream::iter(1..10)
        .filter(|x| future::ready(x % 2 == 0))
        .map(|x| x * x);

    // 迭代
    while let Some(x) = st.next().await {
        println!("Got item: {}", x);
    }
}

上面代码使用 stream::iter 生成了一个 Stream,并对其进行 filter / map 的操作。最后,遍历整个 stream,把获得的数据打印出来。

当你不关心 async/await 的东西,而只对流感兴趣时,Stream::iter 对于测试很有用。另一个有趣的是 repeat_with,在这里你可以传递一个闭包,来按需惰性生成流,比如:

use futures::stream::{self, StreamExt};

// From the zeroth to the third power of two:
async fn stream_repeat_with(){
    let mut curr = 1;
    let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp });

    assert_eq!(Some(1), pow2.next().await);
    assert_eq!(Some(2), pow2.next().await);
    assert_eq!(Some(4), pow2.next().await);
    assert_eq!(Some(8), pow2.next().await);
}

Stream 的实现

创建自己的 Stream 流涉及两个步骤:

  1. 首先创建一个结构体struct来保存流的状态
  2. 然后为该 结构体struct 实现 Stream

让我们创建一个名为 Counter 的流,它从 1 到 5 计数:

#![feature(async_stream)]

// 首先,结构体:
/// 从一数到五的流
struct Counter {
    count: usize,
}

// 我们希望计数从一开始,所以让我们添加一个 new() 方法来提供帮助。
// 这不是严格必要的,但很方便。
// 请注意,我们将 `count` 从零开始,我们将在下面的 `poll_next () ` 的实现中看到其原因。
impl Counter {
    fn new() -> Counter {
        Counter { count: 0 }
    }
}

// 然后,我们为 `Counter` 实现 `Stream`:
impl Stream for Counter {
    // 我们将使用 usize 进行计数
    type Item = usize;

    // poll_next() 是唯一需要的方法
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // 增加我们的数量。这就是为什么我们从零开始。
        self.count += 1;

        // 检查我们是否已经完成计数。
        if self.count < 6 {
            Poll::Ready(Some(self.count))
        } else {
            Poll::Ready(None)
        }
    }
}

Stream Traits

Rust 中提供的 trait 有多个,比如 StreamTryStreamFusedStream

  • Stream 与它对应的 Iterator 非常相似,只是当它返回 None 表示流耗尽时,此时不应该继续轮询流。如果这样做,就会进入未定义行为的范围,并且可能会出现一些乱七八糟的结果。
  • TryStream 是一个针对返回 Result<value, error> 流定制的特殊 trait。TryStream 提出了可以轻松匹配和转换内部结果的函数。你可以将它们视为产生 Result 项的流的 API,并且这个 API 更加方便。
  • FusedStream 和流是一样的,不过它可以让用户知道流在返回 None之后是否真的耗尽,或者是否可以再次轮询它。例如,假设你想创建一个由循环缓冲区支持的流。在第一次迭代之后,FusedStream 将返回 None,但是在此之后重新轮询 FusedStream 是安全的,以便重新恢复该缓冲区新一轮的迭代。

迭代和并发

跟迭代器 Iterator 类似,Stream也可以迭代。 例如使用mapfilterfoldfor_eachskip等方法,以及它们的遇到错误提前返回的版本: try_maptry_filtertry_foldtry_for_each等等。

跟迭代器 Iterator 又有所不同的是for循环无法迭代Stream,但是命令式风格的循环while letloop,并不断显式地调用next 和 try_next 方法,比如可以使用下面两种循环读取的方式。

// 迭代方式 1
while let Some(value) = s.next().await {}

// 迭代方式 2
loop {
  match s.next().await {
    Some(value) => {}
    None => break;
  }
}

一个对 stream 流迭代计算(sum)的例子:

use futures_util::{pin_mut, Stream, stream, StreamExt};

async fn sum(stream: impl Stream<Item=usize>) -> usize {
    // 不要忘记在迭代流之前固定(pin)它
    pin_mut!(stream);
    let mut sum: usize = 0;
    // 迭代 stream
    while let Some(item) = stream.next().await {
        sum = sum + item;
    }
    sum
}

如果你选择一次只处理一个值,可能会造成无法并发,这就失去了异步编程的意义。如果要让一个 Stream 并发处理多个值,可以使用for_each_concurrenttry_for_each_concurrent方法:

use std::{pin::Pin, io};
use futures_util::{Stream, TryStreamExt};

async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> {
    // 引入 `try_for_each_concurrent`
    stream.try_for_each_concurrent(100, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

async fn jump_n_times(num: i32)-> Result<(), io::Error> {
    println!("jump_n_times :{}", num+1);
    Ok(())
}
async fn report_n_jumps(num: i32)-> Result<(), io::Error>{
    println!("report_n_jumps : {}", num);
    Ok(()) 
}

总结

Stream 和 Future 类似,但是 Future 对应的是一个 item 的状态的变化,而 Stream 则是类似于 iterator,在结束之前能够得到多个值。或者我们可以简单的理解为,Stream 是由一系列的 Future 组成,我们可以从 Stream 读取各个 Future 的结果,直到 Stream 结束,是异步迭代器。

Stream 的 poll_next 函数有三种可能的返回值,分别如下:

  • Poll::Pending:说明下一个值还没有就绪,仍然需要等待
  • Poll::Ready(Some(val)):表示已经就绪,成功返回一个值,可以通过调用poll_next再获取下一个值
  • Poll::Ready(None): 表示 Stream 已经结束,不应该调用poll_next

参考

转载自:https://juejin.cn/post/7217487697677156407
评论
请登录