Rust异步随笔
本文是How it is implemented?的其中一节,文章正在完善,且时间较久,于是单独抽出来了。
先来看一个简单的异步:
struct SimpleTimeout {
duration: Duration,
set: bool,
}
impl Future for SimpleTimeout {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.set {
Poll::Ready(())
} else {
let duration = self.duration;
let waker = cx.waker().clone();
std::thread::spawn(move || {
std::thread::sleep(duration);
waker.wake();
});
self.set = true;
Poll::Pending
}
}
}
async fn timer_test() {
let mut vec = vec![1, 2, 3];
let timer = SimpleTimeout {
duration: Duration::from_secs(vec[0]),
set: false,
};
timer.await;
// a reference cross the await point
vec.push(4);
println!("{:?}", vec);
let timer2 = SimpleTimeout {
duration: Duration::from_secs(1),
set: false,
};
let display = format!("{:?}", vec);
timer2.await;
println!("{}", display);
}
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("test start");
timer_test().await;
println!("test end");
});
}
代码看起来很简单,我们自定义了一个简单的Timeout,然后睡眠一秒之后对Vec添加元素,接着再睡眠一秒,打印Vec。
那么编译之后的展开是什么样的呢?按照使用经验来看,Future.await本质是对Future::poll()的调用,并且根据返回值类型决定是否向上返回Pending还是向下执行Ready。
但是这里自然会想到,在某一点,.await返回Pending之后,下次执行是怎么做到从当前位置继续的?
在另一篇文章有详细介绍过异步机制,这里不再阐述,但是为了方便查阅,直接使用yield和Generator分别模拟:
#![feature(generators)]
#![feature(generator_trait)]
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::ops::Generator;
use std::time::Duration;
#[derive(Clone)]
struct SimpleWaker {
wake_fn: Arc<Mutex<Box<dyn FnMut() -> () + Send + 'static>>>,
}
impl SimpleWaker {
fn wake(&self) {
let mut wake_fn = self.wake_fn.lock().unwrap();
wake_fn();
}
fn empty() -> Self {
SimpleWaker {
wake_fn: Arc::new(Mutex::new(Box::new(|| {}))),
}
}
fn set_wake_fn(&self, wake_fn0: Box<dyn FnMut() -> () + Send + 'static>) {
let mut wake_fn = self.wake_fn.lock().unwrap();
*wake_fn = wake_fn0;
}
}
struct SimpleTimeout {
duration: Duration,
timeout: bool,
}
impl SimpleTimeout {
fn poll(mut self: Pin<&mut Self>, waker: &SimpleWaker) -> Poll<()> {
if self.timeout {
return Poll::Ready(());
}
let waker_clone = waker.clone();
let duration = self.duration;
std::thread::spawn(move || {
std::thread::sleep(duration);
waker_clone.wake();
});
self.timeout = true;
Poll::Pending
}
}
fn main() {
let waker0 = SimpleWaker::empty();
let waker = waker0.clone();
let mut generator = move || {
println!("test start");
let mut vec = vec![1, 2, 3];
let mut timer = SimpleTimeout {
duration: Duration::from_secs(vec[0]),
timeout: false,
};
loop {
match Pin::new(&mut timer).poll(&waker) {
Poll::Ready(()) => {
break;
}
Poll::Pending => {
yield ();
}
}
}
vec.push(4);
println!("{:?}", vec);
let display = format!("{:?}", vec);
let mut timer = SimpleTimeout {
duration: Duration::from_secs(1),
timeout: false,
};
loop {
match Pin::new(&mut timer).poll(&waker) {
Poll::Ready(()) => {
break;
}
Poll::Pending => {
yield ();
}
}
}
println!("{}", display);
println!("test end");
return ();
};
waker0.set_wake_fn(Box::new(move || {
Pin::new(&mut generator).resume(());
}));
waker0.wake();
let (_tx, rx) = std::sync::mpsc::channel::<()>();
_ = rx.recv();
}
首先是yield的模拟,比较通俗易懂,其中对于子Future::poll在loop中进行,以处理子Future无法一次满足的情况。
之后是展开更加具体的自定义Generator模拟:
#![feature(generators, generator_trait)]
use std::{
marker::PhantomData,
pin::Pin,
sync::{Arc, Mutex},
task::Poll,
time::{Duration, SystemTime, UNIX_EPOCH},
};
struct SimpleWakerFn(Box<dyn FnMut() -> ()>);
// implement `Send` manually for simple test.
unsafe impl Send for SimpleWakerFn {}
#[derive(Clone)]
struct SimpleWaker {
wake_fn: Arc<Mutex<SimpleWakerFn>>,
}
impl SimpleWaker {
fn wake(&self) {
let mut wake_fn = self.wake_fn.lock().unwrap();
wake_fn.0();
}
fn empty() -> Self {
SimpleWaker {
wake_fn: Arc::new(Mutex::new(SimpleWakerFn(Box::new(|| {})))),
}
}
fn set_wake_fn(&self, wake_fn0: Box<dyn FnMut() -> ()>) {
let mut wake_fn = self.wake_fn.lock().unwrap();
*wake_fn = SimpleWakerFn(wake_fn0);
}
}
enum SimpleGenState<Y, R> {
Yielded(Y),
Complete(R),
}
enum SimpleGenInternal {
Start,
State1 {
vec: Vec<i64>,
vec_mut_ref: *mut Vec<i64>,
// mark1
future: SimpleTimeout,
_marker: PhantomData<Vec<i64>>,
},
State2 {
display: String,
future: SimpleTimeout,
},
End,
}
trait SimpleGen<Arg = ()> {
type Yield;
type Return;
// the waker corresponding to the Context<'_> in Future::poll()
fn resume(
self: Pin<&mut Self>,
arg: Arg,
waker: &SimpleWaker,
) -> SimpleGenState<Self::Yield, Self::Return>;
}
impl SimpleGen for SimpleGenInternal {
// corresponding to latest yield value type;
type Yield = String;
type Return = ();
fn resume(
self: Pin<&mut Self>,
arg: (),
waker: &SimpleWaker,
) -> SimpleGenState<Self::Yield, Self::Return> {
let this = unsafe { self.get_unchecked_mut() };
match this {
SimpleGenInternal::Start => {
println!("test start");
let vec: Vec<i64> = vec![1, 2, 3];
let mut timer = SimpleTimeout {
duration: Duration::from_secs(vec[0] as u64),
set_done: false,
timeout: Arc::new(Mutex::new(false)),
};
*this = SimpleGenInternal::State1 {
vec,
vec_mut_ref: std::ptr::null_mut(),
future: timer,
_marker: PhantomData,
};
if let SimpleGenInternal::State1 {
ref mut vec,
ref mut vec_mut_ref,
..
} = *this
{
*vec_mut_ref = vec;
}
Pin::new(this).resume(arg, waker)
}
SimpleGenInternal::State1 {
ref mut vec,
ref mut vec_mut_ref,
ref mut future,
..
} => {
match Pin::new(future).poll(waker) {
Poll::Ready(()) => {
println!("test ready1 {}", timestamp());
}
Poll::Pending => {
println!("test yield1 {}", timestamp());
return SimpleGenState::Yielded("".to_string());
}
}
let vec_mut_ref = unsafe { &mut **vec_mut_ref };
vec_mut_ref.push(4);
println!("{:?}", vec_mut_ref);
let display = format!("{:?}", vec);
let mut timer = SimpleTimeout {
duration: Duration::from_secs(2),
set_done: false,
timeout: Arc::new(Mutex::new(false)),
};
*this = SimpleGenInternal::State2 {
display,
future: timer,
};
Pin::new(this).resume(arg, waker)
}
SimpleGenInternal::State2 {
display,
ref mut future,
} => {
match Pin::new(future).poll(waker) {
Poll::Ready(()) => {
println!("test ready1 {}", timestamp());
}
Poll::Pending => {
println!("test yield2 {}", timestamp());
return SimpleGenState::Yielded("".to_string());
}
}
println!("{}", display);
println!("test end");
*this = SimpleGenInternal::End;
SimpleGenState::Complete(())
}
SimpleGenInternal::End => {
panic!("resume after completion");
}
}
}
}
struct SimpleTimeout {
duration: Duration,
set_done: bool,
timeout: Arc<Mutex<bool>>,
}
impl SimpleTimeout {
fn poll(mut self: Pin<&mut Self>, waker: &SimpleWaker) -> Poll<()> {
if !self.set_done {
let waker = waker.clone();
let duration = self.duration;
let timeout = self.timeout.clone();
std::thread::spawn(move || {
std::thread::sleep(duration);
waker.wake();
// simulate multiple wake.
std::thread::sleep(duration);
*timeout.lock().unwrap() = true;
waker.wake();
});
self.set_done = true;
Poll::Pending
} else {
if *self.timeout.lock().unwrap() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
}
fn main() {
let waker0 = SimpleWaker::empty();
let waker = waker0.clone();
let generator = SimpleGenInternal::Start;
let mut pin = Box::pin(generator);
let wake_fn = Box::new(move || {
pin.as_mut().resume((), &waker);
});
waker0.set_wake_fn(wake_fn);
waker0.wake();
let (_tx, rx) = std::sync::mpsc::channel::<()>();
_ = rx.recv();
}
pub fn timestamp() -> u64 {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let millis = since_the_epoch.as_millis() as u64;
millis
}
上述代码为了演示使用,强行设置部分属性为Send(因为我们确实知道这不会有问题),实际实现会考虑更多健壮性问题。
现在来解释一下上述代码,这和异步文章中的Generator略有不同(这个更准确)。
首先思考一下调用链,如果当前Future存在对于其他Future的调用,假设为A call B,其中B存在多个await等待点,则A必然需要多次poll()(在Generator里则是resume())B才能最终得到Ready的状态。所以这里该怎么设计A的状态机转换呢?
稍加思索可以想到,对await切分为前后上下文,定义状态S:保存await之前的上下文,当前await对应的poll()操作作为初始执行放在当前状态首部;代码中的mark1则是表示当前状态保存的Future
。
需要注意的是,在实际的Rust中,Future类型为一个匿名结构体,这是编译器编译期间生成并附加上的。
如果poll结果为Pending,则保持状态不变,又因为此await之前的上下文已经存储在当前状态中,所以返回Yield
表示需要继续resume();待到B二次唤醒,此时依旧是此状态开始执行,重新poll()同样的await,直到Ready事件发生,继续后上下文。
简单对比可以了解,我们把yield中的loop转换成了对同一状态的多次调用,直到Ready触发,loop触发break,而状态推进。
异步文章中未考虑B需要多次poll()的情况,所以这里重新设计了状态机。
最后,Generator和yield都可以模拟async/await实现,Rust实际实现为Generator,即状态机。
转载自:https://juejin.cn/post/7270830614202712127