Rust异步编程(4)-join!、try_jion!、select

join!、try_jion!、select

1. join!

  1. join!本身不返回结果
  2. 等待所有其中的future执行完毕,才返回Result告诉程序执行是否异常
  3. futures::join!可以将futures汇总到执行队列中。另外,下方代码,由于使用的tokio的time进行线程延迟管理,
    因此,执行future时,需要使用tokio的runtime,而不能直接使用标注库的executor
    依赖:
    futures = “0.3.5”
    tokio = {version = “0.2”,features= [“full”]}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use futures;
use tokio::runtime::Runtime;
async fn func1(){
tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
println!("func1 finished!");
}
async fn func2(){
println!("func2 finished!");
}
async fn async_main(){
let f1 = func1();
let f2 = func2();
futures::join!(f1,f2);
}
fn main() {
let mut runtime = Runtime::new().unwrap();
runtime.block_on(async_main()); //因为上面的time延迟使用的是tokio的,这里必须用runtime配套使用,不能再用标准库的executor
}

2. try_join!

  1. try_join!会返回Result信息,因此每个异步async函数需要加上返回Result类型,具体使用如下:
  2. try_join!中,只要有一个future执行异常,则立马返回Result告诉程序执行是否异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use futures;
use tokio::runtime::Runtime;
use std::io::Result;

async fn func1() -> Result<()>{
tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
println!("func1 finished!");
Ok(())
}

async fn func2() -> Result<()>{
println!("func2 finished!");
Ok(())
}

async fn async_main(){
let f1 = func1();
let f2 = func2();
if let Err(_) = futures::try_join!(f1,f2){
println!("Err");
}
}

fn main() {
let mut runtime = Runtime::new().unwrap();
runtime.block_on(async_main()); //因为上面的time延迟使用的是tokio的,这里必须用runtime配套使用,不能再用标准库的executor
}

3. select

  1. 异步执行future,只要有一个future执行完,就会返回执行结果,
  2. 需要用select接收执行结果并做进一步处理
  3. select中使用的Future必须实现Unpin trait和FusedFuture trait
    • 必须实现unpin的原因是,select中使用的future不是按值获取的,而是按照可变引用获取的,也就是没有获取future的所有权,所以在调用select后,未完成的futrue可以继续使用,如下 代码,a_fut是引用,因此每次select!都能获取到指向的数据,进而继续操作
1
2
3
4
select! {
a = a_fut => total += a,
b = b_fut => total += b,
}
  1. 必须实现FusedFuture的原因,select完成后不会再轮询future,因此需要实现FusedFuture来跟踪future是否完成
  2. 同样的,对应到stream上,会有一个FusedStream trait’
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};
async fn add_two_streams(
mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
let mut total = 0;
loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
};
if let Some(next_num) = item {
total += next_num;
}
}
total
}
  1. Fuse::terminated()允许构建一个已经终止的空的Future;
  2. 当需要同时运行同一Future的多个副本时,可以使用FuturesUnordered类型

3.1 基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use futures::{select,future::FutureExt,pin_mut};
use tokio::runtime::Runtime;
use std::io::Result;

async fn func1()->Result<()>{
tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await;
println!("func1 finished!");
Ok(())
}

async fn func2()->Result<()>{
println!("func2 finished!");
Ok(())
}

async fn async_main(){
let f1 = func1().fuse();
let f2 = func2().fuse();
pin_mut!(f1,f2);
select!{

_ = f1 => println!("func1 finished+++++++++"),
_ = f2 => println!("func2 finished+++++++++"),
}
}


fn main() {
let mut runtime= Runtime::new().unwrap();
runtime.block_on(async_main());
println!("Hello, world!");
}

3.1 select中default、complete的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use futures::{future, select, executor};

async fn count() {
//快速创建两个future
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;
loop {
select! {
//表示a或者b分支就绪
a = a_fut => total += a,
b = b_fut => total += b,
//表示所有分支全部就绪,并且不会再取得进展的情况
complete => break,
//若极端情况,默认处理方式,这里表示没有分支完成
default => unreachable!(),
}
assert_eq!(total, 10);
}
}

fn main() {
executor::block_on(count());
}
  • Copyrights © 2017-2023 Jason
  • Visitors: | Views:

谢谢打赏~

微信