Rust
的Future
是用来实现异步编程的。今天我们围绕其了解下Rust
的异步编程是如何构建。
Rust
用async
就能轻松创建开销很小的可异步执行的函数,在await
时其才会被调度执行。
其比较轻量级,有别于异步多线程,依托在操作系统线程之上,构建大量并发则需要大量的线程资源,对资源的消耗比较大。
比如下边用async
构建异步任务:
async fn async_fn() { | |
// handle async logic | |
} | |
async fn main() { | |
async_fn().await | |
} |
文章目录
- 状态机
- 调度
- 运行时
- async
- pin
状态机
async
其实也是帮你自动实现了下边的Future trait
,用结构体维护了一个状态机
trait Future { | |
type Output; | |
fn poll( | |
self: Pin<&mut Self>, | |
cx: &mut Context<'_>, | |
) -> Poll<Self::Output>; | |
} |
Future
定义一个poll
方法,可以查询异步任务状态。对于异步任务,有Pending
和Ready
两种状态,Pending
时会让出控制,等待可以处理时再被唤醒继续处理,如此重复,直到Ready
。
我们来尝试通过实现一个Delay
的Future
了解这个状态流转的过程
use std::future::Future; | |
use std::pin::Pin; | |
use std::task::{Context, Poll}; | |
use std::time::{Duration, Instant}; | |
struct Delay { | |
when: Instant, | |
} | |
impl Future for Delay { | |
type Output = &'static str; | |
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { | |
if Instant::now() >= self.when { | |
Poll::Ready("done") | |
} else { | |
// 还未ready,注册下一次唤醒 | |
cx.waker().wake_by_ref(); | |
Poll::Pending | |
} | |
} | |
} | |
async fn main() { | |
let when = Instant::now() + Duration::from_millis(3); | |
let future = Delay { when }; | |
let out = future.await; | |
assert_eq!(out, "done"); | |
} |
Delay
每次poll
时会检查,时间是否满足,满足则Ready
,否则 schedule 下一次执行并返回Pending
状态机是有了,Future
怎么调度呢?
调度
Rust
需要运行时runtime
来调度异步任务task
,runtime
负责调度,检查future
的状态。
调度一般在Pending
时会交出task
的控制,并schedule
下一次什么时候唤醒(wake
)。
流程处理展开来说,常规Ready
处理:
而Pending
时, future
要被schedule
下一次唤醒,而每次唤醒可能不会都是在同一个task
上执行。这里用于唤醒的waker
会在每次poll
时以context
传递下去,
运行时
了解了调度,我们再展开说下运行时。rust
的运行时没在标准库中实现,需要依赖第三方的运行时,常用的有tokio
。
就比如如下的tokio
宏实际是添加了一个多线程(multi thread
)的运行时,会阻塞当前线程直到异步任务完成。
async fn main() { | |
println!("hello"); | |
} | |
// tranform to | |
fn main() { | |
tokio::runtime::Builder::new_multi_thread() | |
.enable_all() | |
.build() | |
.unwrap() | |
.block_on(async { | |
println!("Hello world"); | |
}) | |
} |
当然也可以用单线程的运行时(current thread
)
async fn main() { | |
println!("Hello world"); | |
} | |
// tranform to | |
fn main() { | |
tokio::runtime::Builder::new_current_thread() | |
.enable_all() | |
.build() | |
.unwrap() | |
.block_on(async { | |
println!("Hello world"); | |
}) | |
} |
async
其实一般很少直接去实现Future trait
, 直接使用async
去自动实现Future trait
就足够了。上边Delay
完全可以这么实现,简洁且高效
use std::sync::Arc; | |
use std::thread; | |
use std::time::{Duration, Instant}; | |
use tokio::sync::Notify; | |
async fn delay(dur: Duration) { | |
let when = Instant::now() + dur; | |
let notify = Arc::new(Notify::new()); | |
let notify_clone = notify.clone(); | |
thread::spawn(move || { | |
let now = Instant::now(); | |
if now < when { | |
thread::sleep(when - now); | |
} | |
notify_clone.notify_one(); | |
}); | |
notify.notified().await; | |
} | |
async fn main() { | |
delay(Duration::from_secs(1)).await; | |
} |
pin
还记得future trait
上参数有个Pin<&mut Self>
, 为什么要Pin future
的引用?
来看下边一段代码:
async fn my_async_fn() { | |
// async logic here | |
} | |
async fn main() { | |
let mut future = my_async_fn(); | |
(&mut future).await; | |
// error: | |
// within `impl Future<Output = ()>`, the trait `Unpin` is not implemented for `[async fn body@src/main.rs:1:24: 3:2]` | |
} |
当尝试执行一个异步函数的引用时,编译器会报错要求其是Unpin trait
。
为什么呢?
future
本质是一个封装的状态机结构体,调度时会被移动,如果其包含引用,引用的地址要能保证生命周期至少在其完成前还存活,不然就会出现引用一个已失效的地址。
所以 Rust 引入了Unpin trait
。这个Unpin
是代表其不需要固定地址,可以安全引用。
常规的类型一般都是实现了的。对于未实现的!Unpin
类型,一般可以将其Box::pin
到堆上或用宏pin!
到栈上来确保其地址在future
移动期间是有效的。
代码如:
use tokio::pin; | |
async fn my_async_fn() { | |
// async logic here | |
} | |
async fn main() { | |
let future = my_async_fn(); | |
// option 1 | |
pin!(future); | |
(&mut future).await; | |
// option 2 | |
// let pinned_fut = Box::pin(future); | |
// pinned_fut.await; | |
} |
好了,今天就聊到这里,下一篇我们再聊聊多个异步同时怎么处理。
对Pin
感兴趣可以看看官方更详细的文档:Pinning[1]
异步编程更深入了解的话也推荐看下 tokio 的这篇:Async in depth[2]
参考资料
[1] Pinning: https://rust-lang.github.io/async-book/04_pinning/01_chapter.html
[2] Async in depth: https://tokio.rs/tokio/tutorial/async