上篇文章我们知道,Rust
的Future
是异步执行,await
时是阻塞在当前的异步任务task
上,直到完成。
当多个异步任务执行时,如果只能都阻塞一个个执行,那就变成同步串行执行了,当然不是我们通常希望的并发处理方式,今天就来聊聊多个异步任务的一些并发处理方式。
文章目录
- join
- try_join
- spawn
- select
- 顺序执行
- precondition
- 分支修改
- cancel
join
多个异步任务执行时,如果希望全部执行完成后统一返回,可以让他们都并发去执行,等全部完成后再一起返回。join!
宏就可以实现它。
async fn async_fn1() -> u32 { | |
1 | |
} | |
async fn async_fn2() -> u32 { | |
2 | |
} | |
async fn main() { | |
let (first, second) = tokio::join!(async_fn1(), async_fn2()); | |
assert_eq!(first, 1); | |
assert_eq!(second, 2); | |
} |
try_join
如果其中有失败的话,也会返回失败的Err
。如果想一有失败就立马返回,不等待其他任务完成,可以使用try_join!
。
async fn async_fn1() -> Result<u32, &'static str> { | |
Ok(1) | |
} | |
async fn async_fn2() -> Result<u32, &'static str> { | |
Err("async_fn2 failed") | |
} | |
async fn main() { | |
let res = tokio::try_join!(async_fn1(), async_fn2()); | |
match res { | |
Ok((first, second)) => { | |
println!("first = {}, second = {}", first, second); | |
} | |
Err(err) => { | |
println!("error: {}", err); | |
} | |
} | |
} |
spawn
上边join
虽然是让多个异步任务并发执行,但其实际还是在同一个task
上异步执行,如果想让每个异步任务都在一个新的task
上独立执行,可以用spawn
。
异步任务spawn
后会在后台立即开始运行,即便没有对其返回的JoinHandle
进行await
这就有点像多线程里的spawn
,只不过这里粒度不是线程,是task
。
use futures::future::join_all; | |
use tokio::{join, task::JoinHandle}; | |
async fn async_op(id: i32) -> String { | |
let s = format!("Start task {}", id); | |
println!("{}", s); | |
s | |
} | |
async fn main() { | |
let ops = vec![1, 2, 3]; | |
let mut tasks: Vec<JoinHandle<String>> = ops | |
.into_iter() | |
.map(|op| tokio::spawn(async_op(op))) | |
.collect(); | |
// option 1 | |
// let outputs = join!( | |
// tasks.pop().unwrap(), | |
// tasks.pop().unwrap(), | |
// tasks.pop().unwrap() | |
// ); | |
// println!("{:?}", outputs); | |
// tuple of results: | |
// (Ok("Start task 3"), Ok("Start task 2"), Ok("Start task 1")) | |
// option 2 | |
let outputs = join_all(tasks).await; | |
println!("{:?}", outputs); | |
// vector of results: | |
// [Ok("Start task 1"), Ok("Start task 2"), Ok("Start task 3")] | |
} |
select
如果是多个异步分支(branch
)有一个完成就返回,并取消(drop
来释放异步资源)其他异步分支的话,可以用select
async fn async_fn1() {} | |
async fn async_fn2() {} | |
async fn main() { | |
tokio::select! { | |
_ = async_fn1() => { | |
println!("async_fn1() completed first") | |
} | |
_ = async_fn2() => { | |
println!("async_fn2() completed first") | |
} | |
}; | |
} |
顺序执行
这里select
会对每个分支随机执行,顺序是不保证的。如果期望顺序执行,可以用biased
async fn main() { | |
let mut count = 0u8; | |
loop { | |
tokio::select! { | |
// 顺序执行 | |
biased; | |
_ = async {}, if count < 1 => { | |
count += 1; | |
assert_eq!(count, 1); | |
} | |
_ = async {}, if count < 2 => { | |
count += 1; | |
assert_eq!(count, 2); | |
} | |
else => { | |
break; | |
} | |
}; | |
} | |
} |
precondition
上边例子中,分支使用了if precondition
,如果当前select
循环中运行到该分支,条件满足则执行;不满足的话会标记分支为失效(disabled
)本次select
中不会执行。
如果在loop
中,下一次进入select
循环会重新标记disabled
状态
另外当前循环如果所以分支都被标记为disabled
状态,就必须要有else
分支,使select
仍可运行。不然就会收到panic
: all branches are disabled and there is no else branch
.
分支修改
select
的分支也可修改, 比如下边通过Pin::set
来修改Pin
住的异步任务。
use tokio::select; | |
async fn action(input: Option<i32>) -> Option<String> { | |
match input { | |
Some(input) => Some(input.to_string()), | |
None => return None, | |
} | |
} | |
async fn main() { | |
let (tx, mut rx) = tokio::sync::mpsc::channel(128); | |
let mut done = false; | |
let operation = action(None); | |
tokio::pin!(operation); | |
tokio::spawn(async move { | |
let _ = tx.send(1).await; | |
let _ = tx.send(3).await; | |
let _ = tx.send(2).await; | |
}); | |
loop { | |
select! { | |
res = &mut operation, if !done => { | |
println!("Got = {:?}", res); | |
done = true; | |
if let Some(_) = res { | |
return; | |
} | |
} | |
Some(v) = rx.recv() => { | |
if v % 2 == 0 { | |
// `.set` is a method on `Pin`. | |
operation.set(action(Some(v))); | |
done = false; | |
} | |
} | |
} | |
} | |
} |
这里第一个分支的precondition
是必须的,不然就会有可能出现多次执行一个已完成的异步任务,会panic
: async fn resumed after completion
。
cancel
最后在聊聊分支取消。
当select
有分支完成时,其他分支会被取消。取消依托于Drop
。当future
被drop
,其也会停止被异步调度。
比如下边代码,当oneshot::Receiver
被取消而Drop
时,会向Sender
发送close
通知,以便于清理sender
并中断其执行。
use tokio::sync::oneshot; | |
async fn main() { | |
let (mut tx1, rx1) = oneshot::channel::<u32>(); | |
let (tx2, rx2) = oneshot::channel(); | |
tokio::spawn(async move { | |
tokio::select! { | |
_ = tx1.closed() => { | |
// `val = rx1` is canceled | |
println!("tx1 closed"); | |
} | |
} | |
}); | |
tokio::spawn(async { | |
let _ = tx2.send("two"); | |
}); | |
tokio::select! { | |
val = rx1 => { | |
println!("rx1 completed first with {:?}", val); | |
} | |
val = rx2 => { | |
println!("rx2 completed first with {:?}", val); | |
} | |
} | |
} |
如果有用,点个 在看,让更多人看到