likes
comments
collection
share

Rust异步随笔

作者站长头像
站长
· 阅读数 35

本文是How it is implemented?的其中一节,文章正在完善,且时间较久,于是单独抽出来了。

先来看一个简单的异步:

struct SimpleTimeout {
   duration: Duration,
   set: bool,
}

impl Future for SimpleTimeout {
   type Output = ();

   fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
       if self.set {
           Poll::Ready(())
      } else {
           let duration = self.duration;
           let waker = cx.waker().clone();
           std::thread::spawn(move || {
               std::thread::sleep(duration);
               waker.wake();
          });
           self.set = true;
           Poll::Pending
      }
  }
}

async fn timer_test() {
   let mut vec = vec![1, 2, 3];
   let timer = SimpleTimeout {
       duration: Duration::from_secs(vec[0]),
       set: false,
  };
   timer.await;
   // a reference cross the await point
   vec.push(4);
   println!("{:?}", vec);
   let timer2 = SimpleTimeout {
       duration: Duration::from_secs(1),
       set: false,
  };
   let display = format!("{:?}", vec);
   timer2.await;
   println!("{}", display);
}

fn main() {
   let rt = tokio::runtime::Runtime::new().unwrap();
   rt.block_on(async {
       println!("test start");
       timer_test().await;
       println!("test end");
  });
}

代码看起来很简单,我们自定义了一个简单的Timeout,然后睡眠一秒之后对Vec添加元素,接着再睡眠一秒,打印Vec。

那么编译之后的展开是什么样的呢?按照使用经验来看,Future.await本质是对Future::poll()的调用,并且根据返回值类型决定是否向上返回Pending还是向下执行Ready。

但是这里自然会想到,在某一点,.await返回Pending之后,下次执行是怎么做到从当前位置继续的?

在另一篇文章有详细介绍过异步机制,这里不再阐述,但是为了方便查阅,直接使用yield和Generator分别模拟:

#![feature(generators)]
#![feature(generator_trait)]

use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::ops::Generator;
use std::time::Duration;

#[derive(Clone)]
struct SimpleWaker {
   wake_fn: Arc<Mutex<Box<dyn FnMut() -> () + Send + 'static>>>,
}

impl SimpleWaker {
   fn wake(&self) {
       let mut wake_fn = self.wake_fn.lock().unwrap();
       wake_fn();
  }

   fn empty() -> Self {
       SimpleWaker {
           wake_fn: Arc::new(Mutex::new(Box::new(|| {}))),
      }
  }

   fn set_wake_fn(&self, wake_fn0: Box<dyn FnMut() -> () + Send + 'static>) {
       let mut wake_fn = self.wake_fn.lock().unwrap();
       *wake_fn = wake_fn0;
  }
}

struct SimpleTimeout {
   duration: Duration,
   timeout: bool,
}

impl SimpleTimeout {
   fn poll(mut self: Pin<&mut Self>, waker: &SimpleWaker) -> Poll<()> {
       if self.timeout {
           return Poll::Ready(());
      }
       let waker_clone = waker.clone();
       let duration = self.duration;
       std::thread::spawn(move || {
           std::thread::sleep(duration);
           waker_clone.wake();
      });
       self.timeout = true;
       Poll::Pending
  }
}

fn main() {
   let waker0 = SimpleWaker::empty();
   let waker = waker0.clone();
   let mut generator = move || {
       println!("test start");
       let mut vec = vec![1, 2, 3];
       let mut timer = SimpleTimeout {
           duration: Duration::from_secs(vec[0]),
           timeout: false,
      };
       loop {
           match Pin::new(&mut timer).poll(&waker) {
               Poll::Ready(()) => {
                   break;
              }
               Poll::Pending => {
                   yield ();
              }
          }
      }
       vec.push(4);
       println!("{:?}", vec);
       let display = format!("{:?}", vec);
       let mut timer = SimpleTimeout {
           duration: Duration::from_secs(1),
           timeout: false,
      };
       loop {
           match Pin::new(&mut timer).poll(&waker) {
               Poll::Ready(()) => {
                   break;
              }
               Poll::Pending => {
                   yield ();
              }
          }
      }
       println!("{}", display);
       println!("test end");
       return ();
  };
   waker0.set_wake_fn(Box::new(move || {
       Pin::new(&mut generator).resume(());
  }));
   waker0.wake();
   let (_tx, rx) = std::sync::mpsc::channel::<()>();
   _ = rx.recv();
}

首先是yield的模拟,比较通俗易懂,其中对于子Future::poll在loop中进行,以处理子Future无法一次满足的情况。

之后是展开更加具体的自定义Generator模拟:

#![feature(generators, generator_trait)]

use std::{
   marker::PhantomData,
   pin::Pin,
   sync::{Arc, Mutex},
   task::Poll,
   time::{Duration, SystemTime, UNIX_EPOCH},
};

struct SimpleWakerFn(Box<dyn FnMut() -> ()>);

// implement `Send` manually for simple test.
unsafe impl Send for SimpleWakerFn {}

#[derive(Clone)]
struct SimpleWaker {
   wake_fn: Arc<Mutex<SimpleWakerFn>>,
}

impl SimpleWaker {
   fn wake(&self) {
       let mut wake_fn = self.wake_fn.lock().unwrap();
       wake_fn.0();
  }

   fn empty() -> Self {
       SimpleWaker {
           wake_fn: Arc::new(Mutex::new(SimpleWakerFn(Box::new(|| {})))),
      }
  }

   fn set_wake_fn(&self, wake_fn0: Box<dyn FnMut() -> ()>) {
       let mut wake_fn = self.wake_fn.lock().unwrap();
       *wake_fn = SimpleWakerFn(wake_fn0);
  }
}

enum SimpleGenState<Y, R> {
   Yielded(Y),
   Complete(R),
}

enum SimpleGenInternal {
   Start,
   State1 {
       vec: Vec<i64>,
       vec_mut_ref: *mut Vec<i64>,
       // mark1
       future: SimpleTimeout,
       _marker: PhantomData<Vec<i64>>,
  },
   State2 {
       display: String,
       future: SimpleTimeout,
  },
   End,
}

trait SimpleGen<Arg = ()> {
   type Yield;
   type Return;
   // the waker corresponding to the Context<'_> in Future::poll()
   fn resume(
       self: Pin<&mut Self>,
       arg: Arg,
       waker: &SimpleWaker,
  ) -> SimpleGenState<Self::Yield, Self::Return>;
}

impl SimpleGen for SimpleGenInternal {
   // corresponding to latest yield value type;
   type Yield = String;
   type Return = ();

   fn resume(
       self: Pin<&mut Self>,
       arg: (),
       waker: &SimpleWaker,
  ) -> SimpleGenState<Self::Yield, Self::Return> {
       let this = unsafe { self.get_unchecked_mut() };
       match this {
           SimpleGenInternal::Start => {
               println!("test start");
               let vec: Vec<i64> = vec![1, 2, 3];
               let mut timer = SimpleTimeout {
                   duration: Duration::from_secs(vec[0] as u64),
                   set_done: false,
                   timeout: Arc::new(Mutex::new(false)),
              };
               *this = SimpleGenInternal::State1 {
                   vec,
                   vec_mut_ref: std::ptr::null_mut(),
                   future: timer,
                   _marker: PhantomData,
              };
               if let SimpleGenInternal::State1 {
                   ref mut vec,
                   ref mut vec_mut_ref,
                  ..
              } = *this
              {
                   *vec_mut_ref = vec;
              }
               Pin::new(this).resume(arg, waker)
          }
           SimpleGenInternal::State1 {
               ref mut vec,
               ref mut vec_mut_ref,
               ref mut future,
              ..
          } => {
               match Pin::new(future).poll(waker) {
                   Poll::Ready(()) => {
                       println!("test ready1 {}", timestamp());
                  }
                   Poll::Pending => {
                       println!("test yield1 {}", timestamp());
                       return SimpleGenState::Yielded("".to_string());
                  }
              }
               let vec_mut_ref = unsafe { &mut **vec_mut_ref };
               vec_mut_ref.push(4);
               println!("{:?}", vec_mut_ref);
               let display = format!("{:?}", vec);
               let mut timer = SimpleTimeout {
                   duration: Duration::from_secs(2),
                   set_done: false,
                   timeout: Arc::new(Mutex::new(false)),
              };
               *this = SimpleGenInternal::State2 {
                   display,
                   future: timer,
              };
               Pin::new(this).resume(arg, waker)
          }
           SimpleGenInternal::State2 {
               display,
               ref mut future,
          } => {
               match Pin::new(future).poll(waker) {
                   Poll::Ready(()) => {
                       println!("test ready1 {}", timestamp());
                  }
                   Poll::Pending => {
                       println!("test yield2 {}", timestamp());
                       return SimpleGenState::Yielded("".to_string());
                  }
              }
               println!("{}", display);
               println!("test end");
               *this = SimpleGenInternal::End;
               SimpleGenState::Complete(())
          }
           SimpleGenInternal::End => {
               panic!("resume after completion");
          }
      }
  }
}

struct SimpleTimeout {
   duration: Duration,
   set_done: bool,
   timeout: Arc<Mutex<bool>>,
}

impl SimpleTimeout {
   fn poll(mut self: Pin<&mut Self>, waker: &SimpleWaker) -> Poll<()> {
       if !self.set_done {
           let waker = waker.clone();
           let duration = self.duration;
           let timeout = self.timeout.clone();
           std::thread::spawn(move || {
               std::thread::sleep(duration);
               waker.wake();
               // simulate multiple wake.
               std::thread::sleep(duration);
               *timeout.lock().unwrap() = true;
               waker.wake();
          });
           self.set_done = true;
           Poll::Pending
      } else {
           if *self.timeout.lock().unwrap() {
               Poll::Ready(())
          } else {
               Poll::Pending
          }
      }
  }
}

fn main() {
   let waker0 = SimpleWaker::empty();
   let waker = waker0.clone();
   let generator = SimpleGenInternal::Start;
   let mut pin = Box::pin(generator);
   let wake_fn = Box::new(move || {
       pin.as_mut().resume((), &waker);
  });
   waker0.set_wake_fn(wake_fn);
   waker0.wake();
   let (_tx, rx) = std::sync::mpsc::channel::<()>();
   _ = rx.recv();
}

pub fn timestamp() -> u64 {
   let start = SystemTime::now();
   let since_the_epoch = start
      .duration_since(UNIX_EPOCH)
      .expect("Time went backwards");
   let millis = since_the_epoch.as_millis() as u64;
   millis
}

上述代码为了演示使用,强行设置部分属性为Send(因为我们确实知道这不会有问题),实际实现会考虑更多健壮性问题。

现在来解释一下上述代码,这和异步文章中的Generator略有不同(这个更准确)。

首先思考一下调用链,如果当前Future存在对于其他Future的调用,假设为A call B,其中B存在多个await等待点,则A必然需要多次poll()(在Generator里则是resume())B才能最终得到Ready的状态。所以这里该怎么设计A的状态机转换呢?

稍加思索可以想到,对await切分为前后上下文,定义状态S:保存await之前的上下文,当前await对应的poll()操作作为初始执行放在当前状态首部;代码中的mark1则是表示当前状态保存的Future

需要注意的是,在实际的Rust中,Future类型为一个匿名结构体,这是编译器编译期间生成并附加上的。

如果poll结果为Pending,则保持状态不变,又因为此await之前的上下文已经存储在当前状态中,所以返回Yield表示需要继续resume();待到B二次唤醒,此时依旧是此状态开始执行,重新poll()同样的await,直到Ready事件发生,继续后上下文。

简单对比可以了解,我们把yield中的loop转换成了对同一状态的多次调用,直到Ready触发,loop触发break,而状态推进。

异步文章中未考虑B需要多次poll()的情况,所以这里重新设计了状态机。

最后,Generator和yield都可以模拟async/await实现,Rust实际实现为Generator,即状态机。

转载自:https://juejin.cn/post/7270830614202712127
评论
请登录