查看原文
其他

[译]深入理解RUST异步-async

秋风不度镇南关 码农真经 2023-12-25

async, await特性的加入使得Rust中的异步编程变得像nodejs, go一样变得简单易用,极大提高了Rust异步编程的幸福感。

理解异步编程一直是初学者的难点,本篇文章参考async-book, tokio官网的例子,通过实现一个简单的Future, Executor来增加异步编程的理解

Rust中,Future的定义如下

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
type Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
}

关联类型Output是当Future完成时,产生的返回值。

与其他语言不同,Rust的Future并不是在后台运行的一种计算,而是计算本身,计算的拥有者负责拉取 Future, 以使计算得以向前进行。这可以通过调用一个方法 Future::poll

下面我们来实现一个简单的Future, 该Future将

  1. 一直等待,直到未来的某个时刻

  2. 在控制台打印一些消息

  3. 返回一个字符串

代码如下

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
when: Instant,
}

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// 暂时忽略该行
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };

let out = future.await;
assert_eq!(out, "done");
}
async fn 是 一种 Future

在主函数中,我们实例化了一个future, 并在其上调用await方法,在async函数内部,我们可以调用任何实现了Future trait的await方法。反过来,调用async函数将返回一个实现了Future的匿名类型。在 async fn main() 这个例子中,生成的匿名类型大致如下:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}

impl Future for MainFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;

loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}

Rust Future实际上是状态机,上面代码中MainFuture是一个枚举,Future的起始状态是State0, 当poll调用时,future将尽可能的改变、推进自己的内部状态。如果future已经完成,Poll::Ready将包含运行结果返回给调用者。

如果future未完成,通常是由于它正在等待的资源未准备好,Poll::Pending此时会返回。调用者收到Poll::Pending,表示future将在之后某个时间完成,调用者需要再次调用poll。

同时我们也看到futures内部也包含了其他futures. 在外部future调用poll将导致内部future的poll被调用。

Executors

Rust异步函数返回futures. 要想futures内部的状态被推进/变化,必须调用Futures的poll方法。由于Futures又由其他futures组成,那么最外层的futures的poll方法又是由谁来调用呢?

回忆我们之前的例子,为了运行异步函数,异步函数必须被放入 tokio::spwan代码块内部或者是在被#[tokio::main]标注的main方法中,这样其实等同于把最外部的future提交到Tokio executor中. Executor负责调用最外部future的poll方法,以驱动异步计算完成。

Mini Tokio

为了更好的理解以上说明,我们将实现一个简化版的Tokio, 代码如下

use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;

fn main() {
let mut mini_tokio = MiniTokio::new();

mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };

let out = future.await;
assert_eq!(out, "done");
});

mini_tokio.run();
}

struct MiniTokio {
tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}

/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}

fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);

while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}

这段代码运行了一个async代码块。一个Delay实例被创建了出来,并调用了await方法。但是我们当前的实现有个主要缺点,我们的executor永远不会休眠。这个executor不断的循环调用spwan生成的future的poll方法。大多数时间,这些future还没有完成计算,状态没有更新,从而只能再次返回Future::Pending,这些过程只会空消耗CPU,executore因而会很低效。

理想情况下,我们希望mini-tokio只拉取那些内部状态已经发生变化的futures. 比如当某些资源变得可用时,等待该资源的任务就可以响应操作请求。如果一个任务要从TCP Socket中读取数据,那么我们只需要收到数据时,再拉取该任务。在我们的例子中,任务会一直阻塞,直到Instant到期。理想情况,mini-tokio只需要在instant到期时,再拉取任务。

为了实现这个机制,当资源被拉取,且任务未就绪,该资源会发送一个通知当它的状态变为就绪时。

Wakers

Wakers正是以上说明中缺失的那部分, 它充当了这样的一种机制: 当资源变得可用,可以通知等待他们的任务,可以做出进一步的操作了。

让我们再看下Future::Poll的定义

fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;

poll函数中的Context参数有一个wake()方法。该方法返回一个绑定到当前task的waker实例。Waker有一个wake()方法。调用这个方法将会给executor发送一个当前关联的task可以被重新调度和执行的消息。

更新 Delay

我们更新Delay, 以使用waker

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

struct Delay {
when: Instant,
}

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;

// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

waker.wake();
});

Poll::Pending
}
}
}

现在,只要请求的时间到了,任务将会被通知,executor将确定任务可以重新调度了。下一步我们将更新mini-tokio,让它监听唤醒通知。

现在我们的Delay还有一些问题,我们将会在后面把这些问题解决。

当future返回Poll::Pending,它必须确保waker会在某个地方被通知。忘记这样做将导致任务被永远挂起。当future返回Poll::Pending,忘记唤醒任务是常见bug的根源。

回顾我们之前的第一个迭代版本的Delay实现,以下是future的实现

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

返回Poll::Pending之前,我们先调用cx.waker().wake_by_ref(),这是为了满足future的契约。如果返回了Poll::Pending,我们就必须负责通知waker。由于我们还没有实现计时器线程,暂时先在当前地方(Poll::Pending上一行)通知waker。这么做将会使future被立即重新调度,再次执行,执行完后,future中的计算可能还是没有完成(状态还是没变化)。

应该认识到,以上方式,我们发送了多余的通知给waker, 因为有些时候task未准备好,我们也发了通知。

更新Mini Tokio

下一步,我们将更新Mini Tokio,以使它可以接受waker的通知。我们的executor将只执行那些已经唤醒的task。为了实现这个目的,Mini Tokio需要有自己的waker。当waker被调用时,与之关联的task会被入队以待执行。当Mini Tokio poll future时,它将这个waker传递给future。

更新后的Mini Tokio将使用一个channel来保存待调度的tasks。Channels允许tasks排队等待任何线程执行他们。Waker必须是Send和Sync,由于标准库中的channel不是Sync, 所以我们选择crossbeam crate中的channel。

Send, Sync 在rust并发中是个标记trait,可以做线程间传递的类型成为Send。Rust中大多数类型都是Send,但有些并不是,比如Rc。那些可被并发访问的不可变引用,则是Sync。有些类型可以Send,但不可以Sync,其中一个例子是Cell,即使将它声明为不可变引用,但它扔可以被修改(由于其内部可变性),因而不能被安全的并发访问。

添加以下依赖到Cargo.toml中,以使用channels

crossbeam = "0.8"

然后,更新MiniTokio结构体

use crossbeam::channel;
use std::sync::Arc;

struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}

struct Task {
// This will be filled in soon.
}

Wakers是Sync且可以克隆。当wake被调用时,task必须被调度以便后续执行。为了实现这个目的,我们有了channel。当waker.wake()调用时,task被放入channel的发送队列。我们的task结构将实现wake逻辑。为了实现它,Task需要同时包含future和channel的发送队列。

use std::sync::{Arc, Mutex};

struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `future` at any given time. The
// `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}

impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}

为了调度task,Arc被克隆并通过channel发送。现在我们需要做schedule函数中使用std::task::Waker来打桩。标准库提个了一个底层的API来做这个事情,该API使用了manual vtable construction。这种方式给实现者提供了最大的灵活性,但却需要写很多繁琐的unsafe代码。我们不需要直接使用RawWakerVTable,而是使用futures crate提供的ArcWake工具包。这让我们可以实现一个很多简单的trait,以暴露我们的结构体Task为一个waker.

添加以下依赖到Cargo.toml中,以获取futures crate

futures = "0.3"

然后实现 futures::task::ArcWake.

use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}

当以上的计时器线程调用waker.wake(),task被推到channel中。下一步,我们在MiniTokio::run函数中接受与执行task.

impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}

/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();

MiniTokio { scheduled, sender }
}

/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}

impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);

// No other thread ever tries to lock the future
let mut future = self.future.try_lock().unwrap();

// Poll the future
let _ = future.as_mut().poll(&mut cx);
}

// Spawns a new taks with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});

let _ = sender.send(task);
}

}

上面代码中,做了很多事情。首先,实现了MiniTokio::run()。这个函数在一个无限循环中不断的从channel中获取调度就绪的task。因为当task被唤醒时,他们将会被放入channel的队列中,当这些任务被执行时,他们中的内部状态确实可以推进了。

此外,MiniTokio::new() , MiniTokio::spawn()这2个函数做了个修改,使用channel取代VecDeque。当新的task被创建,它内部将持有channel的发送队列,这个队列将让他可以在运行时中调度自己。

Task::poll()函数使用futures crate中的 ArcWake工具创建了waker。该waker将用于创建task::Context,task::Context将被传递给poll。

总结

我们一步步地实现了一个异步Rust的例子。Rust的async/await特性是基于trait构建的。这允许第三方crate,像Tokio,提供实现细节。

  • 异步Rust是懒惰的,需要一个调用者来poll它们

  • Waker会传递给futures,以便让调用waker的task和对应的future 链接起来

  • 当资源未就绪而无法进一步操作时,Poll::Pending会返回,并且task的waker也会记录下来

  • 当资源已就绪,task的waker会被通知

  • 当executor接受到通知,它会调度对应的task以执行

  • 当task被再次poll,这次如果资源已就绪那么task就可以继续向前执行了

再闲扯一些

回顾下我们实现过的Delay future,当时提过还有一些地方需要修改。Rust的异步模型运行future在执行时,在多个tasks间移动,考虑以下例子。

use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;

#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });

poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});

Poll::Ready(())
}).await;
}

poll_fn函数用闭包创建了个future。上面的代码创建了一个Delay实例,调用它的poll方法,然后将Delay实例传递到新的task中,然后调用await。在这个例子中,Delay::poll被不同的waker调用多次。当这些情况发生时,你必须确保调用最近调用的poll中的waker的wake方法。

当实现一个future时,须谨记,每次调用poll,提供不同的waker实例是非常重要的。poll函数必须使用新的waker更新之前保存的waker。

我们早前的Delay实现,每次poll时,都创建新的线程。这个没啥问题,但如果poll调用太多则很低效(比如,你通过select! 使用future和其他future,无论他们中哪个事件触发,2个future的poll都会被调用)。一种替代方案是,记下已经创建的线程,仅仅在未创建线程时创建线程。然而,当你这么做时,必须确保线程的waker在poll调用之后被更新,因为如果不这样,你更新的waker就不是最近的那一个。

为了修复之前的版本,我们可以修改如下

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

struct Delay {
when: Instant,
// This Some when we have spawned a thread, and None otherwise.
waker: Option<Arc<Mutex<Waker>>>,
}

impl Future for Delay {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// First, if this is the first time the future is called, spawn the
// timer thread. If the timer thread is already running, ensure the
// stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();

// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());

// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}

// Once the waker is stored and the timer thread is started, it is
// time to check if the delay has completed. This is done by
// checking the current instant. If the duration has elapsed, then
// the future has completed and `Poll::Ready` is returned.
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// The duration has not elapsed, the future has not completed so
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}
}

这里有点复杂,但总体思路是,每当调用poll,future会检查当前的waker是否和之前记录的waker是一样的。如果2个waker一样,那不需要做任何事情,否则记录的waker必须更新。

Notify 工具箱

我们演示了Delay future如何被我们使用wakers实现的。Wakers是异步rust实现的基础。通常,我们不需要深入到这个级别来理解异步rust。比如在Delay的例子中,我们可以使用 tokio::sync::Notify来完全地实现async/await。该工具提供了基本的task通知机制。它处理了wakers的细节,包括确保给定的waker匹配当前的task.

使用Notify,我们实现的异步delay函数可以是这样

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

notify2.notify_one();
});


notify.notified().await;
}
参考

https://rust-lang.github.io/async-book/02_execution/01_chapter.html

https://tokio.rs/tokio/tutorial/async

往期推荐

2021,RUST的趋势很明显了

Rust并发编程之多线程

Rust-主导未来40年的编程语言

Rust实战之使用Nom 解析 Http Response 消息

编程语言PK之 Go vs Rust,谁更好?【英文字幕】

继续滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存