Rust异步编程(3)-async、Pin、Stream原理

async、Pin、Stream原理

1. async和Pin结合代码讲解原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use futures::executor;

async fn async_function1() {
println!("async function1 ++++ !");
}

async fn async_function2() {
println!("async function2 ++++ !");
}

async fn async_main() {
let f1 = async_function1();
let f2 = async_function2();

//重点关注这里---------
let f = async move {
f1.await;
f2.await;
};
//---------------------
f.await;
}

////分析编译器展开的过程(先后等f1和f2执行完毕,才开始f3):
//// (1)创建一个匿名结构体
//// (2)为结构体定义了对应的状态
//// (3)实现Future trait
//struct AsyncFuture {
// fut_one: FutFunction1,
// fut_two: FutFunction2,
// state: State,
//}
//
//enum State {
// AwaitFut1,
// AwaitFut2,
// Done,
//}
//
//impl Future for AsyncFuture {
// type Output = ();
//
// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// loop {
// match self.state {
// State::AwaitFut1 => match self.fut_one.poll(...) {
// Poll::Ready(()) => self.state = State::AwaitFut2,
// Poll::Pending => return Poll::Pending,
// }
// State::AwaitFut2 => match self.fut_two.poll(...) {
// Poll::Ready(()) => self.state = State::Done,
// Poll::Pending => return Poll::Pending,
// }
// State::Done => return Poll::Ready(());
// }
// }
// }
//}


fn main() {
executor::block_on(async_main());
}

//------------------------------------------------------
// 分析pin

async fn async_put_data_to_buf(mut buf: &[u8]) {
//to do something
}

async fn async_main() {
let f = async {
let mut x = [0: 128];
let async_put = async_put_data_to_buf(&mut x);
async_put.await;
};
}

////编译器展开
//struct AsyncFuture {
// x: [u8: 128],
// async_put: PutIntoBuf<'what_lifetime>,
//}
//
//struct PutIntoBuf {
// buf: &'a mut[u8],
//}

////如果AsyncFuture发生移动,那么x和async_put都发生移动,
//// 但是async_put.buf还是指向移动之前的x,显然不是我们期望的
//// 我们期望的是async_put.buf指向移动之后的x
//
//// Pin类型包着指针类型,保证指针类型背后的值不被移动
//// Pin<T>
//// 大多数类型都不存移动的问题,这些类型实现UnPin trait,u8
//// let f = Pin<AsyncFuture>
//
//let f = async {}
//
//fn my_function<T>(T: impl UnPin)
//
//let fut = Box::pin(f);
//pin_mut!(fut);
//my_function(fut);

2. Stream原理

Stream和Future类似,但是Future对应的是一个item的状态变化,而Stream则是类似于iterator,在结束之前能够得到多个值。或者我们可以简单的理解为,Stream是由一系列的Future组成,我们可以从Stream读取各个Future的结果,直到Stream结束。

1
2
3
4
5
//定义
trait Stream{
type Item;
fn poll_next(self:Pin<&mut Self>,lw:&LocalWaker )-> Poll<Option<Self::Item>>;
}

其中,poll_next函数有三种可能的返回值,分别如下:

  • Poll::Pending 说明下一个值还没有就绪,仍然需要等待
  • Poll::Ready(Some(val))已经就绪,成功返回一个值,程序可以通过调用poll_next再获取下一个值
  • Poll::Ready(None)表示Stream已经结束,不应再调用poll_next。

2.1 迭代

和同步的Iterator类似,Stream可以迭代处理其中的值,如使用map,filter,fold,try_map,try_filter,和 try_fold等,但Stream不支持使用for,而while let和next/try_next则是允许的。
如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async fn sum_with_next(mut stream:Pin<&mut dyn Stream<Item=i32>>)->i32{
use futures::stream::StreamExt; //for next
let mut sum = 0;
while let Some(item)=stream.next().await{
sum += item;
}
sum
}

async fn sum_with_try_next(mut stream:Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>)-> Result<i32,io::Error>{
use futures::stream::TryStreamExt;
let mut sum = 0;
while let Some(item) = stream.try_next().await?{
sum += item;
}
Ok(sum)
}

2.2 并发

上面的使用的迭代处理,如果要并发的处理流,则应该使用for_each_concurrent和try_for_each_concurrent,如下:

1
2
3
4
5
6
7
8
9
10
async fn jump_around(mut stream:Pin<&mut dyn Stream<Item = Result<u8,io::Error>>>) -> Result<(),io::Error>{
use futures::stream::TryStreamExt;
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_eacn_concurrent(MAX_CONCURRENT_JUMPERS,|num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}

总结

本文编辑完毕

参考

[1] Rust 异步编程

  • Copyrights © 2017-2023 Jason
  • Visitors: | Views:

谢谢打赏~

微信