Rust异步编程(1)-async、await、future、move

async、await、future、move

1. 概述

  1. async/.await是Rust编写异步的内置工具
  2. async将一个代码块转化为实现了future特征的状态机
    • 状态机四要素:状态、动作、事件、跳转
  3. future作用:在同步方法中调用阻塞函数(async转化的函数)会阻塞整个线程,但阻塞的future会让出线程控制权,允许其它future运行。
    1. 通俗说:当有多个任务进入同一个阻塞函数,转化为多个future,每个future执行各自的任务,如果当前任务被阻塞(时间片到期等原因),则切换成另一个future来执行任务。
    2. 就是说,多个任务在同一个线程下,高速切换来执行
  4. 定义的async方法调用的时候不会自己执行,必须要通过executor来执行
  5. async关键词,目前只支持在方法和代码块上使用,不支持闭包(不稳定版本支持,好几年了)

2. async和executor简单案例

依赖:
futures = “0.3.4”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use futures::executor;

async fn hello(){
println!("hello");
}

//该hello等价于:
//==>
//fn hello1() -> impl Future<Output=()> {
// async {
// println!("hello");
// }
//}

fn main() {
let f= hello();
executor::block_on(f); //阻塞,如果不使用executor,hello方法不会被执行

//由于executor的block_on,阻塞了线程,my_functiion方法必须等待
my_function();
}

fn my_function() {
println!("my function!");
}

3. await案例

await用于阻塞当前future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use futures::{self,executor};
use std::thread::sleep;
use std::time::Duration;

async fn learn_song(){
sleep(Duration::from_secs(5)); //这个阻塞了整个线程,非async的
println!("learn song");
}

async fn sing_song(){
println!("sing song");
}

async fn dance(){
println!("dance");
}
œ
async fn learn_and_sing_song(){
learn_song().await; //await阻塞当前执行,确保先后顺序,此处应该是future的阻塞
sing_song().await;
}

async fn async_main(){
//f1和f2并行执行
let f1 = learn_and_sing_song();
let f2 = dance();
futures::join!(f1,f2); //表示f1和f2可以并发执行
}

fn main() {
executor::block_on(async_main());
}

4. async生命周期

为确保方法传入到future的变量生命周期和async一样,可以在变量外面包一层async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use futures::executor;
use std::future::Future;

async fn foo(x: &u8) -> u8 {
*x
}

//上面的foo等价于该方法
/*fn foo_expand(x: &'a u8) -> impl Future<Output=u8> + 'a {
async {
*x
}
}

//由于future挂起,会导致x生命周期结束,该方法无法执行
fn bad() -> impl Future<Output=u8> {
let x = 5;
foo_expand(&x)
}

//将x包在async中
fn good() -> impl Future<Output=u8> {
async {
let x = 5;
foo_expand(&x).await
}
}*/

fn main() {
let x = 5;
let f = foo(&x);
executor::block_on(f);
}

5. move(重点)

在async中的变量,要确保能在不同线程之间传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use futures::executor;

async fn move_block(){
let my_string = "my_string".to_string();
let f = async move{
println!("String:{}",my_string);
};
//已经被移动到async里了,外面不能再调用my_string
// println!("after move,String:{}",my_string);
f.await
}

fn main() {
executor::block_on(move_block());
}

6. 官方future(重点)

1
2
3
4
5
6
7
8
9
10
// //通过Pin可以创建不可移动的Future。不可移动的对象可以在它们的字段之间存储指针(就是说,堆空间移动的时候,栈指针也会跟着移动),例如:
// struct MyFut {
// a: i32,
// ptr: *const i32,
// }

// pub trait Future {
// type Output;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
// }

7. 简单实现异步执行原理(future)

sleeper是一个future;reactor检测状态,然后通过wake通知future是否可执行
future中的poll是一个检测方法,用于接收wake提供的信息,并判读是否能够执行该future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use std::thread;
use std::time::Duration;

trait SimpleFuture {
type Output;
//fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
fn poll(&mut self, wake: u32) -> Poll<Self::Output>;
}

enum Poll<T> {
Ready(T),
Pending,
}

struct MySleeper {
polls: u64,
wake: u32,
}

static mut FINISHED: bool = false;

impl MySleeper {
fn new(wake:u32) -> Self {
MySleeper {
polls: 0,
wake,
}
}
}

impl SimpleFuture for MySleeper {
type Output = ();
fn poll(&mut self, wake: u32) -> Poll<Self::Output> {
unsafe {
if FINISHED {
Poll::Ready(())
} else {
self.wake = wake;
self.polls += 1;
println!("polls={}", self.polls);
Poll::Pending
}
}
}
}

struct MyReactor {
wake: u32,
handle: Option<thread::JoinHandle<()>>,
}

impl MyReactor {
fn new() -> Self {
MyReactor {
wake: 0,
handle: None,
}
}
fn add_wake(&mut self,wake:u32){
self.wake = wake;
}
fn check_status(&mut self){
if self.handle.is_none(){
let _wake = self.wake;
let handle = thread::spawn(|| loop{
thread::sleep(Duration::from_secs(5));
unsafe { //模拟future就绪,调用wake
FINISHED = true;
}
});
self.handle = Some(handle);
}

}
}

struct MyExecutor;

impl MyExecutor{
fn block_on<F:SimpleFuture>(mut myfuture:F,wake:u32){
loop{
match myfuture.poll(wake){
Poll::Ready(_)=>{
println!("my future is ok!");
break;
},
Poll::Pending=>{
unsafe {
while !FINISHED{
thread::sleep(Duration::from_secs(1));
}
}
}
}
}
}
}
fn main() {
let mut reactor = MyReactor::new();
let sleeper = MySleeper::new(5);
let wake = sleeper.wake;
reactor.add_wake(wake);
reactor.check_status();
MyExecutor::block_on(sleeper,wake);
}

总结

本文编辑完毕

参考

[1] Rust 异步编程

  • Copyrights © 2017-2023 Jason
  • Visitors: | Views:

谢谢打赏~

微信