Rust
的Future
是用来实现异步编程的。今天我们围绕其了解下Rust
的异步编程是如何构建。
Rust
用async
就能轻松创建开销很小的可异步执行的函数,在await
时其才会被调度执行。
其比较轻量级,有别于异步多线程,依托在操作系统线程之上,构建大量并发则需要大量的线程资源,对资源的消耗比较大。
比如下边用async
构建异步任务:
async fn async_fn() {
// handle async logic
}
#[tokio::main]
async fn main() {
async_fn().await
}
文章目录
- 状态机
- 调度
- 运行时
- async
- pin
状态机
async
其实也是帮你自动实现了下边的Future trait
,用结构体维护了一个状态机
trait Future {
type Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
Future
定义一个poll
方法,可以查询异步任务状态。对于异步任务,有Pending
和Ready
两种状态,Pending
时会让出控制,等待可以处理时再被唤醒继续处理,如此重复,直到Ready
。
我们来尝试通过实现一个Delay
的Future
了解这个状态流转的过程
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
if Instant::now() >= self.when {
Poll::Ready("done")
} else {
// 还未ready,注册下一次唤醒
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(3);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}
Delay
每次poll
时会检查,时间是否满足,满足则Ready
,否则 schedule 下一次执行并返回Pending
状态机是有了,Future
怎么调度呢?
调度
Rust
需要运行时runtime
来调度异步任务task
,runtime
负责调度,检查future
的状态。
调度一般在Pending
时会交出task
的控制,并schedule
下一次什么时候唤醒(wake
)。
流程处理展开来说,常规Ready
处理:
而Pending
时, future
要被schedule
下一次唤醒,而每次唤醒可能不会都是在同一个task
上执行。这里用于唤醒的waker
会在每次poll
时以context
传递下去,
运行时
了解了调度,我们再展开说下运行时。rust
的运行时没在标准库中实现,需要依赖第三方的运行时,常用的有tokio
。
就比如如下的tokio
宏实际是添加了一个多线程(multi thread
)的运行时,会阻塞当前线程直到异步任务完成。
#[tokio::main]
async fn main() {
println!("hello");
}
// tranform to
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
当然也可以用单线程的运行时(current thread
)
#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("Hello world");
}
// tranform to
fn main() {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
async
其实一般很少直接去实现Future trait
, 直接使用async
去自动实现Future trait
就足够了。上边Delay
完全可以这么实现,简洁且高效
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify_clone.notify_one();
});
notify.notified().await;
}
#[tokio::main]
async fn main() {
delay(Duration::from_secs(1)).await;
}
pin
还记得future trait
上参数有个Pin<&mut Self>
, 为什么要Pin future
的引用?
来看下边一段代码:
async fn my_async_fn() {
// async logic here
}
#[tokio::main]
async fn main() {
let mut future = my_async_fn();
(&mut future).await;
// error:
// within `impl Future<Output = ()>`, the trait `Unpin` is not implemented for `[async fn body@src/main.rs:1:24: 3:2]`
}
当尝试执行一个异步函数的引用时,编译器会报错要求其是Unpin trait
。
为什么呢?
future
本质是一个封装的状态机结构体,调度时会被移动,如果其包含引用,引用的地址要能保证生命周期至少在其完成前还存活,不然就会出现引用一个已失效的地址。
所以 Rust 引入了Unpin trait
。这个Unpin
是代表其不需要固定地址,可以安全引用。
常规的类型一般都是实现了的。对于未实现的!Unpin
类型,一般可以将其Box::pin
到堆上或用宏pin!
到栈上来确保其地址在future
移动期间是有效的。
代码如:
use tokio::pin;
async fn my_async_fn() {
// async logic here
}
#[tokio::main]
async fn main() {
let future = my_async_fn();
// option 1
pin!(future);
(&mut future).await;
// option 2
// let pinned_fut = Box::pin(future);
// pinned_fut.await;
}
好了,今天就聊到这里,下一篇我们再聊聊多个异步同时怎么处理。
对Pin
感兴趣可以看看官方更详细的文档:Pinning[1]
异步编程更深入了解的话也推荐看下 tokio 的这篇:Async in depth[2]
参考资料
[1] Pinning: https://rust-lang.github.io/async-book/04_pinning/01_chapter.html
[2] Async in depth: https://tokio.rs/tokio/tutorial/async