likes
comments
collection
share

【Rust 进阶教程】 05 通道

作者站长头像
站长
· 阅读数 16

0x00 开篇

既然存在多线程,那就有线程间互相传递数据的需求。而本篇文章介绍的通道(Channel)就是把值从一个线程发送到另一个线程的单向管道。它是一个线程安全的队列

0x01 什么是通道

如果你熟悉 Linux 操作系统,那么一定使用过 管道。Rust 的通道与 Linux 的管道非常类似。如果把一个非 Copy 类型的值从发送线程转移到了接收线程,那么它的所有权也将转移。通过通道(Channel),Rust可以是现在线程之间通信,这也是一种相对简单的线程间的通信方式,因为它并不需要借助  或者 共享内存。下图是线程间通信的简图。

【Rust 进阶教程】 05 通道

0x02 mpsc 和 mpmc

我们通过 std::sync::mpsc 中的 channel 函数创建一个通道。源码如下:

// std::sync::mpsc
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let (tx, rx) = mpmc::channel();
    (Sender { inner: tx }, Receiver { inner: rx })
}

#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let (tx, rx) = mpmc::channel();
    (Sender { inner: tx }, Receiver { inner: rx })
}

// std::sync::mpmc
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let (s, r) = counter::new(list::Channel::new());
    let s = Sender { flavor: SenderFlavor::List(s) };
    let r = Receiver { flavor: ReceiverFlavor::List(r) };
    (s, r)
}

enum SenderFlavor<T> {
    /// Bounded channel based on a preallocated array.
    Array(counter::Sender<array::Channel<T>>),

    /// Unbounded channel implemented as a linked list.
    List(counter::Sender<list::Channel<T>>),

    /// Zero-capacity channel.
    Zero(counter::Sender<zero::Channel<T>>),
}

在 Rust 标准库中提供了两种不同类型的通道:mpsc 和 mpmc

mpsc 代表多个生产者单个消费者Multi-producer, Single-consumer)通道。该通道一般是由FIFO的队列来实现。允许有多个线程可以同时将消息发送到通道,但只有一个线程可以从中接收消息。其中生产者和消费者都可以非阻塞地发送和接收消息。如果通道缓冲区已满,则发送操作会阻塞,直到有空间可用。反之如果通道为空,则接收操作会阻塞,直到有通道内有消息产生。

mpmc 代表多个生产者多个消费者Multi-producer Multi-consumer)通道。允许多个线程可以同时将消息发送到通道,并且多个线程可以同时从中接收消息。允许生产者和消费者同时进行非阻塞的发送和接收操作。如果通道缓冲区已满,则发送操作会阻塞,直到有空间可用。类似地,如果通道为空,则接收操作会阻塞,直到有消息可用。因此,mpsc 适用于单个消费者的场景,而 mpmc 适用于多个消费者的场景。

0x03 创建通道

我们本文介绍的是mpsc类型的通道,通道保存的数据是有类型的,所以 channel 函数会有一个泛型。

use std::sync::mpsc::channel;

fn main() {
    let (sender,receiver) = channel::<String>();
}

上面的代码表示,创建一个传输 String 类型的通道,channel 函数返回一个元组,分别是消息发送者和接收者。channel 底层的实现是一个链表(Linked List)。

0x04 发送与接收

发送值

先上代码吧:

fn main() {
    let (sender, receiver) = channel::<String>();

    let handle1 = thread::spawn(move || {
        sleep(Duration::from_millis(1000));
   		let hello = "hello".to_string();
        //  发送者的所有权转移至线程内,发送 hello 字符串
        sender.send(hello).unwrap();
    });

    handle1.join().unwrap();
}

首先我们创建一个线程,在线程中延迟1s后通过通道发送 hello 字符串。这时候 hello 字符串被转移到通道的缓冲区内。hello 被发送后,其所有权也相应的被转移。

接收者

代码如下:

fn main {
	// ... 省略发送者代码
	let handle2 = thread::spawn(move || {
        // 接收者接收 hello 字符串
        let receive_hello = receiver.recv().unwrap();
        println!("receive_hello: {}", receive_hello);
    });

    handle2.join().unwrap();
    // ...
}
// 运行结果
// receive_hello: hello

接收者,通过 receiver 中的 recv 方法来接收通道中的值。接收者是可以迭代的,上面的接收代码可以写成下面这样:

receiver.into_iter().for_each(|item| {
            println!("receive_hello: {}", item);
        });

另外,接收者的 recv 方法是阻塞的,如果等不到发送者发送消息,那么它会一直阻塞线程。接收者会在通道为空并且发送者 Sender 被清除时正常退出。在上面的代码中,线程 handle1 的发送者发送字符串结束后,闭包也结束,线程退出。线程 handle2 接收者接收完成数据后,通道缓冲区为空,符合正常退出的情况,则不会一直阻塞线程。

【Rust 进阶教程】 05 通道0x05 多个发送者

channel 函数返回的元组是 (Sender<T>, Receiver<T>),其中 Sender<T> 实现了 Clone 类型,但是 Receiver<T> 没有实现 Clone 类型。所以。要获得一个拥有多个发送者的通道,只需要创建一个常规的通道,然后克隆多个 Sender,需要多少就克隆多少,最后再将 Sender 转移至不同的线程。只要其中有一个 Sender 没有被销毁,那么Receiver 就将会一直阻塞。示例代码如下:

fn main() {
	let (sender, receiver) = channel::<String>();

    let sender1 = sender.clone();
    // let sender2 = sender.clone();

    let handle1 = thread::spawn(move || {
        sleep(Duration::from_millis(1000));
        let hello = "hello".to_string();
        sender1.send(hello).unwrap();
    });

    let handle2 = thread::spawn(move || {
        sleep(Duration::from_millis(1000));
        let rust = "rust".to_string();
        sender.send(rust).unwrap();
    });

    let handle3 = thread::spawn(move || {
        // 迭代
        receiver.into_iter().for_each(|item| {
            println!("receive_hello: {}", item);
        });
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
    handle3.join().unwrap();
}
// 运行结果
// receive_hello: hello
// receive_hello: rust

如果我们克隆一个 sender2,将线程 handle2 中的 sender 替换为 sender2 ,那 handle3 中的 receiver 将会一直阻塞。

0x06 小结

本篇文章主要介绍了如何在多个线程中传递数据,我们借助了 Rust 中的通道(channel)。其实上面介绍的通道还会存在一个缺点,如果发送值的速度超过接收处理的速度,会造成通道内的数据堆积过多。解决这个问题,我们可以使用 sync_channel 同步通道,使用方法与常规通道一样。由于篇幅有限,后面有机会再介绍吧。另外,实现多发送者,多消费者模型需要借助 Mutex,后面章节会介绍。