Rust异步编程之Future初探

Rust
329
0
0
2024-03-26

RustFuture是用来实现异步编程的。今天我们围绕其了解下Rust的异步编程是如何构建。

Rustasync就能轻松创建开销很小的可异步执行的函数,在await时其才会被调度执行。

其比较轻量级,有别于异步多线程,依托在操作系统线程之上,构建大量并发则需要大量的线程资源,对资源的消耗比较大。

比如下边用async构建异步任务:

async fn async_fn() {
    // handle async logic
}

#[tokio::main]
async fn main() {
    async_fn().await
}

文章目录

  • 状态机
  • 调度
  • 运行时
  • async
  • pin

状态机

async其实也是帮你自动实现了下边的Future trait,用结构体维护了一个状态机

trait Future {
    type Output;
    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

Future定义一个poll方法,可以查询异步任务状态。对于异步任务,有PendingReady两种状态,Pending时会让出控制,等待可以处理时再被唤醒继续处理,如此重复,直到Ready

我们来尝试通过实现一个DelayFuture了解这个状态流转的过程

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
        if Instant::now() >= self.when {
            Poll::Ready("done")
        } else {
            // 还未ready,注册下一次唤醒
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(3);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

Delay每次poll时会检查,时间是否满足,满足则Ready,否则 schedule 下一次执行并返回Pending

状态机是有了,Future怎么调度呢?

调度

Rust需要运行时runtime来调度异步任务taskruntime负责调度,检查future的状态。

调度一般在Pending时会交出task的控制,并schedule下一次什么时候唤醒(wake)。

流程处理展开来说,常规Ready处理:

Pending时, future要被schedule下一次唤醒,而每次唤醒可能不会都是在同一个task上执行。这里用于唤醒的waker会在每次poll时以context传递下去,

运行时

了解了调度,我们再展开说下运行时。rust的运行时没在标准库中实现,需要依赖第三方的运行时,常用的有tokio

就比如如下的tokio宏实际是添加了一个多线程(multi thread)的运行时,会阻塞当前线程直到异步任务完成。

#[tokio::main]
async fn main() {
    println!("hello");
}

// tranform to
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

当然也可以用单线程的运行时(current thread

#[tokio::main(flavor = "current_thread")]
async fn main() {
    println!("Hello world");
}
// tranform to
fn main() {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

async

其实一般很少直接去实现Future trait, 直接使用async去自动实现Future trait就足够了。上边Delay完全可以这么实现,简洁且高效

use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::Notify;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify_clone.notify_one();
    });

    notify.notified().await;
}

#[tokio::main]
async fn main() {
    delay(Duration::from_secs(1)).await;
}

pin

还记得future trait上参数有个Pin<&mut Self>, 为什么要Pin future的引用?

来看下边一段代码:

async fn my_async_fn() {
    // async logic here
}

#[tokio::main]
async fn main() {
    let mut future = my_async_fn();
    (&mut future).await;
    // error:
    // within `impl Future<Output = ()>`, the trait `Unpin` is not implemented for `[async fn body@src/main.rs:1:24: 3:2]`
}

当尝试执行一个异步函数的引用时,编译器会报错要求其是Unpin trait

为什么呢?

future本质是一个封装的状态机结构体,调度时会被移动,如果其包含引用,引用的地址要能保证生命周期至少在其完成前还存活,不然就会出现引用一个已失效的地址。

所以 Rust 引入了Unpin trait。这个Unpin是代表其不需要固定地址,可以安全引用。

常规的类型一般都是实现了的。对于未实现的!Unpin类型,一般可以将其Box::pin到堆上或用宏pin!到栈上来确保其地址在future移动期间是有效的。

代码如:

use tokio::pin;

async fn my_async_fn() {
    // async logic here
}

#[tokio::main]
async fn main() {
    let future = my_async_fn();
    // option 1
    pin!(future);
    (&mut future).await;

    // option 2
    // let pinned_fut = Box::pin(future);
    // pinned_fut.await;
}

好了,今天就聊到这里,下一篇我们再聊聊多个异步同时怎么处理。

Pin感兴趣可以看看官方更详细的文档:Pinning[1]

异步编程更深入了解的话也推荐看下 tokio 的这篇:Async in depth[2]

参考资料

[1] Pinning: https://rust-lang.github.io/async-book/04_pinning/01_chapter.html

[2] Async in depth: https://tokio.rs/tokio/tutorial/async