【Rust 进阶教程】 05 通道
0x00 开篇
既然存在多线程,那就有线程间互相传递数据的需求。而本篇文章介绍的通道(Channel)就是把值从一个线程发送到另一个线程的单向管道。它是一个线程安全的队列。
0x01 什么是通道
如果你熟悉 Linux
操作系统,那么一定使用过 管道
。Rust 的通道与 Linux
的管道非常类似。如果把一个非 Copy
类型的值从发送线程转移到了接收线程,那么它的所有权也将转移。通过通道(Channel),Rust可以是现在线程之间通信,这也是一种相对简单的线程间的通信方式,因为它并不需要借助 锁
或者 共享内存
。下图是线程间通信的简图。
0x02 mpsc 和 mpmc
我们通过 std::sync::mpsc
中的 channel
函数创建一个通道。源码如下:
// std::sync::mpsc
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = mpmc::channel();
(Sender { inner: tx }, Receiver { inner: rx })
}
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = mpmc::channel();
(Sender { inner: tx }, Receiver { inner: rx })
}
// std::sync::mpmc
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = counter::new(list::Channel::new());
let s = Sender { flavor: SenderFlavor::List(s) };
let r = Receiver { flavor: ReceiverFlavor::List(r) };
(s, r)
}
enum SenderFlavor<T> {
/// Bounded channel based on a preallocated array.
Array(counter::Sender<array::Channel<T>>),
/// Unbounded channel implemented as a linked list.
List(counter::Sender<list::Channel<T>>),
/// Zero-capacity channel.
Zero(counter::Sender<zero::Channel<T>>),
}
在 Rust 标准库中提供了两种不同类型的通道:mpsc
和 mpmc
。
mpsc
代表多个生产者单个消费者(Multi-producer, Single-consumer)通道。该通道一般是由FIFO的队列来实现。允许有多个线程可以同时将消息发送到通道,但只有一个线程可以从中接收消息。其中生产者和消费者都可以非阻塞地发送和接收消息。如果通道缓冲区已满,则发送操作会阻塞,直到有空间可用。反之如果通道为空,则接收操作会阻塞,直到有通道内有消息产生。
mpmc
代表多个生产者多个消费者(Multi-producer Multi-consumer)通道。允许多个线程可以同时将消息发送到通道,并且多个线程可以同时从中接收消息。允许生产者和消费者同时进行非阻塞的发送和接收操作。如果通道缓冲区已满,则发送操作会阻塞,直到有空间可用。类似地,如果通道为空,则接收操作会阻塞,直到有消息可用。因此,mpsc
适用于单个消费者的场景,而 mpmc
适用于多个消费者的场景。
0x03 创建通道
我们本文介绍的是mpsc
类型的通道,通道保存的数据是有类型的,所以 channel
函数会有一个泛型。
use std::sync::mpsc::channel;
fn main() {
let (sender,receiver) = channel::<String>();
}
上面的代码表示,创建一个传输 String
类型的通道,channel
函数返回一个元组,分别是消息发送者和接收者。channel
底层的实现是一个链表(Linked List)。
0x04 发送与接收
发送值
先上代码吧:
fn main() {
let (sender, receiver) = channel::<String>();
let handle1 = thread::spawn(move || {
sleep(Duration::from_millis(1000));
let hello = "hello".to_string();
// 发送者的所有权转移至线程内,发送 hello 字符串
sender.send(hello).unwrap();
});
handle1.join().unwrap();
}
首先我们创建一个线程,在线程中延迟1s后通过通道发送 hello
字符串。这时候 hello
字符串被转移到通道的缓冲区内。hello
被发送后,其所有权也相应的被转移。
接收者
代码如下:
fn main {
// ... 省略发送者代码
let handle2 = thread::spawn(move || {
// 接收者接收 hello 字符串
let receive_hello = receiver.recv().unwrap();
println!("receive_hello: {}", receive_hello);
});
handle2.join().unwrap();
// ...
}
// 运行结果
// receive_hello: hello
接收者,通过 receiver
中的 recv
方法来接收通道中的值。接收者是可以迭代的,上面的接收代码可以写成下面这样:
receiver.into_iter().for_each(|item| {
println!("receive_hello: {}", item);
});
另外,接收者的 recv
方法是阻塞的,如果等不到发送者发送消息,那么它会一直阻塞线程。接收者会在通道为空并且发送者 Sender 被清除时正常退出。在上面的代码中,线程 handle1
的发送者发送字符串结束后,闭包也结束,线程退出。线程 handle2
接收者接收完成数据后,通道缓冲区为空,符合正常退出的情况,则不会一直阻塞线程。
0x05 多个发送者
channel
函数返回的元组是 (Sender<T>, Receiver<T>)
,其中 Sender<T>
实现了 Clone
类型,但是 Receiver<T>
没有实现 Clone
类型。所以。要获得一个拥有多个发送者的通道,只需要创建一个常规的通道,然后克隆多个 Sender,需要多少就克隆多少,最后再将 Sender 转移至不同的线程。只要其中有一个 Sender
没有被销毁,那么Receiver
就将会一直阻塞。示例代码如下:
fn main() {
let (sender, receiver) = channel::<String>();
let sender1 = sender.clone();
// let sender2 = sender.clone();
let handle1 = thread::spawn(move || {
sleep(Duration::from_millis(1000));
let hello = "hello".to_string();
sender1.send(hello).unwrap();
});
let handle2 = thread::spawn(move || {
sleep(Duration::from_millis(1000));
let rust = "rust".to_string();
sender.send(rust).unwrap();
});
let handle3 = thread::spawn(move || {
// 迭代
receiver.into_iter().for_each(|item| {
println!("receive_hello: {}", item);
});
});
handle1.join().unwrap();
handle2.join().unwrap();
handle3.join().unwrap();
}
// 运行结果
// receive_hello: hello
// receive_hello: rust
如果我们克隆一个 sender2
,将线程 handle2
中的 sender
替换为 sender2
,那 handle3
中的 receiver
将会一直阻塞。
0x06 小结
本篇文章主要介绍了如何在多个线程中传递数据,我们借助了 Rust 中的通道(channel)。其实上面介绍的通道还会存在一个缺点,如果发送值的速度超过接收处理的速度,会造成通道内的数据堆积过多。解决这个问题,我们可以使用 sync_channel
同步通道,使用方法与常规通道一样。由于篇幅有限,后面有机会再介绍吧。另外,实现多发送者,多消费者模型需要借助 Mutex
,后面章节会介绍。
转载自:https://juejin.cn/post/7231383831852548151