tokio异步框架(1)-综合介绍

根据官方文档(官方英文) (官方文档-社区中文翻译),总结个人理解。
每节标题与官方文档标题相对应

1. 介绍

官方通过一个简易版的redis项目Mini Redis 来讲解tokio的使用方式和原理
建议先根据文档安装好这个项目

2. Hello Tokio

启动mini redis,编写客户端代码,实现互通:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use mini_redis::{client, Result};

#[tokio::main]
pub async fn main() -> Result<()> {
// 打开链接到mini-redis的链接
let mut client = client::connect("127.0.0.1:6379").await?;

// 设置 "hello" 键的值为 "world"
client.set("hello", "world".into()).await?;

// 获取"hello"的值
let result = client.get("hello").await?;

println!("got value from the server; result={:?}", result);

Ok(())
}

通过基础案例,引入async/await概念。其中async将本段代码标记为异步,await执行并将结果和控制权返回到当前线程。也就是说,只有程序调用await时,异步代码才被真正执行。
#[tokio::main]一个特殊的异步入口,为异步环境提供一个运行时环境

3. Spawning(产生、创建)

  1. 官方案例中,使用代码来调用启动mini redis的服务器端,通过Spawning来确保可以同时接收并处理多个请求,可以支持存储或读取数据(这块在这一章目前数据不共享,就是说相同的key,客户端不同请求拿到的结果是不相同的,官方在为下一个概念做埋笔)
  2. 并发和并行不是同一个概念,并发是指多个任务交替执行任务,其实一次只执行一个,只是根据算法高频率交替执行;并行是多个任务各自同时执行,互不干扰。
  3. tokio的优点是异步代码可以让你同时处理许多并发任务, 而不必使用普通线程并行处理它们. Tokio可以在单个线程上并发处理许多任务.
  4. 为确保线程安全,外部变量必须move到异步块中,
  5. await会保存异步块中的状态,在异步块中执行的任务(主要是外部传入的变量)需要实现Send trait,如果不实现,则在调用await之前,必须要完成业务逻辑。否则程序会异常。
    • 原理,被await的任务,会找别的合适的线程来执行该任务,也就是说该任务中的变量等等,均会被移动到别的线程。如果不实现Send,则该变量不是线程安全的。

4. 共享状态

  1. 可以通过两种概念方式来共享数据:
    1. 互斥锁
    2. 通道
  2. 这里介绍的是互斥锁,通道后面专门会讲
  3. 区分:
    1. 调用Rust官方的同步互斥锁Mutex - std::sync::Mutex,可以在当前线程调用lock来锁定,请求量不大的时候,使用这个即可
      1. 同步互斥锁Mutexawait前要释放掉,否则会被await移动,造成异常。可以加一个作用域。因为.lock()返回的MutexGuard没有实现Send,无法线程之间传递
      2. 最好将该锁放到结构体中,包装一个方法来操作,避免死锁问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 因为Mutex实现了Send,也就是CanIncrement也实现了Send
use std::sync::Mutex;

struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// 这个函数没有标识为异步函数
// 方法释放,则lock也释放结束
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
  1. tokio的异步互斥锁Mutex - tokio::sync::Mutex,通过调用await在内部后台来锁定,使用成本较高。一般建议用上面的方式。
1
2
3
4
5
6
7
8
9
10
use tokio::sync::Mutex; // 注意这里是使用的 tokio的 Mutex

// 这段代码是能编译的
// (但是这种情况下选择重构代码会更好,就是结构体方式)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;

do_something_async().await;
} // lock 在这里超出范围

5. 通道

  1. tokio提供了多种通道:
    1. mpsc : 多生产者(multi-producer)单消费者(single-consumer)通道. 可以发送许多的值.
    2. oneshot : 单生产者(single-producer)单消费者(single-consumer)通道. 可以发送单个值.
    3. broadcast : 多生产者多消费者(广播). 可以发送许多值,每一个接收者都能看到每一个值.
    4. watch : 单生产者多消费者. 可以发送许多值,但是不会保留历史记录. 接收者仅能看到最新的值.
  2. 如果你需要一个多生产者多消费者通道且仅仅只想让一个消费者看到所有消息, 你可以使用async-channel
  3. 这些通道通过阻塞线程来等待消息, 这在异步代码中是不允许的:std::sync::mpsc 和 crossbeam::channel
  4. rust默认库中,没有上限控制的队列,长时间下去,会占用完系统的所有内存,造成无法预测的后果。但tokio由于await机制,可以避免该问题的产生,一个await执行完后,才会执行下一个。
  5. 这一部分,官方使用通道重构了客户端连接mini redis的代码。其中,使用mpsc发送请求给mini redis,使用oneshot接收mini redis返回的响应,具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use bytes::Bytes;
use tokio::sync::mpsc;
use mini_redis::client;
use tokio::sync::oneshot;

#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Vec<u8>,
resp: Responder<()>,
},
}

type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(32);
let mut tx2 = tx.clone();

let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
tx.send(cmd).await.unwrap();
let res = resp_rx.await;
println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: b"bar".to_vec(),
resp: resp_tx,
};
tx2.send(cmd).await.unwrap();
let res = resp_rx.await;
println!("GOT = {:?}", res)
});

// move 关键字用来移动 rx 所有权到task中去
let manager = tokio::spawn(async move {
// 建立与Server的链接
let mut client = client::connect("127.0.0.1:6379").await.unwrap();

// 开始接收消息
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// 忽略错误
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val.into()).await;
// 忽略错误
let _ = resp.send(res);
}
}
}
});
}

6. I/O

  1. 异步读写,功能类似于Rust标准库的
    1. async fn read()
    2. async fn read_to_end()
    3. async fn write()
    4. async fn write_all()
  2. 辅助函数
    与 std 包中一样, tokio::io模块也包含了一些有用的实用函数和用于处理标准输入,输出,错误的API. standard input, standard output, standard error . 比如, tokio::io::copy 可以异步将reader中的全部内容复制到writer中去.
  3. 通过copy案例,展示I/O的实现

7. Frame

这个是通过mini redis的数据解析功能,进一步来展示I/O的使用,没仔细看,
这块的源码需要到:Mini Redis 来查看,有兴趣可以了解下Frame的实现

8. 深入异步

  1. 这一节将会深入讲解tokio异步的原理,从根本上解释整个逻辑。
  2. Futures
    1. 与其它语言实现的future不一样, 一个Rust的future不代表在后台发生的计算,而是Rust的future就是计算本身. future的所有者通过轮询future来推进计算. 这是通过调用 Future::poll来完成的.
    2. async返回的是future值,它实现了标准库中的 std::future::Future trait
    3. 总结
      1. Rust的异步操作是惰性的,需要调用者对其进行轮询.
      2. Wakers被传递给future,以将future与调用它的任务联系起来.
      3. 当一个资源没有准备好完成时,Poll::Pending被返回并记录任务的唤醒程序(waker).
      4. 当一个资源变为就绪状态时,就会通知任务的唤醒程序(waker).
      5. 执行器接收到通知并安排任务来执行.
      6. 任务再一次被轮询,这一次资源是就绪状态并且任务能够取得进展.

9. Select

  1. tokio::select! 宏允许等待多个异步计算且当单个计算完成时返回(多个并发或并行异步计算任务,返回最先完成的那个).
  2. select!宏能处理超过2个以上的分支. 当前最大限制64个分支.
  3. tokio::spawn 与 select! 都可以运行并发异步操作. 但是用于运行并发操作的策略有所不同. tokio::spawn 函数传入一个异步操作并产生一个 新的任务去运行它. 任务是一个tokio运行时调度的对象. Tokio独立调度两个不同的任务. 它们可以在不同的操作系统线程上同时运行. 因此产生的任务与 产生的线程都有相同的限制: 不可借用.
  4. select!宏能在同一个任务上同时运行所有分支. 因为select!宏上的所有分支被同一个任务执行,它们永远不会同时运行. select!宏的多路复用 异步操作也在单个任务上运行.

10. Streams

流是一个一系列异步值的称呼. 它与Rust的 std::iter::Iterator 异步等效且由 Stream trait表示. 流能在async函数中迭代. 它们也可以使用适配器进行 转换. Tokio在 StreamExt trait上提供了一些通用适配器.

总结

本文编辑完毕

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2017-2023 Jason
  • Visitors: | Views:

谢谢打赏~

微信