Rust进阶(10)-多线程、通道、互斥器

多线程和通道

1. 多线程

  1. 进程是资源分配的最小单位,线程是CPU调度的最小单位
  2. 在使用多线程时,经常会遇到一些问题:
    1. 竞争状态:多个线程以不一致的顺序访问数据或资源
    2. 死锁:两个线程相互等待对方停止使用其所拥有的资源,造成两者都永久等待;
    3. 只会发生在特定情况下且难以稳定重现和修复的bug
  3. 编程语言提供的线程叫做绿色线程,入golang,在底层实现了M:N的模型,即M个绿色线程对应N个OS线程。但是,Rust标准库只提供1:1的线程模型的实现,即一个Rust线程对应一个0s线程
  4. 运行时代表二进制文件中包含的由语言本身提供的代码,这些代码根据语言的不同可大可小,不过非汇编语言都会有一定数量的运行时代码。通常,大家说一个语言"没有运行时",是指这个语言的"运行时"很小。Rust、c都是几乎没有运行时的。

1.1 基本实现

包括join的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use std::thread;
use std::time::Duration;

fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("number {} in spawn thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

for i in 1..5 {
println!("number {}", i);
thread::sleep(Duration::from_millis(1));
}

//等待线程执行结束,有join的地方先等待线程执行结束,才执行后续内容
handle.join().unwrap();
}

1.2 move闭包

下方代码会报错:

1
2
3
4
5
6
7
8
9
10
11
use std::thread;

fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
//线程中借用了外部的变量
//不确定v的有效期是多久
println!("v: {:?}", v);
});
handle.join().unwrap();
}

解决方案,使用move,子线程获取变量数据所有权:

1
2
3
4
5
6
7
8
fn main() {
let v = vec![1, 2, 3];
//使用move将v的所有权移动到子 线程
let handle = thread::spawn(move || {
println!("v: {:?}", v);
});
handle.join().unwrap();
}

2. 通道

  1. Rust中一个实现消息传递并发的主要工具是通道。通道由两部分组成,一个是发送端,一个是接收端,发送端用来发送消息,接收端用来接收消息。发送者或接受者任一被丢弃时就可以认为通道被关闭了
  2. 通道介绍
    1. 通过mpsc::channel,创建通道,mpsc是多个生产者,单个消费者;
    2. 通过spmc::channel,创建通道,spmc是一个生产者,多个消费者;
    3. 创建通道后返回的是发送者和消费者,示例:
      1. let (tx,rx) = mpsc::channel();
      2. let (tx,rx) = spmc::channel();
  3. 重点:
    1. 发送者的send方法返回的是一个Result<T,E>,如果接收端已经被丢弃了,将没有发送值的目标,此时发送会返回错误
    2. 接收者的recv返回值也是一个Result<T,E>,当通道发送端关闭时,返回一个错误值。
    3. 这里使用当recv方法,会阻塞到有一个消息到来。我们也可以使用try_recv(),不会阻塞,会立即返回

2.1 通道简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use std::thread;
use std::sync::mpsc;

fn main() {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
//发送,会将val的所有权交给通道(move)
tx.send(val).unwrap();
//val的所有权已经交给通道,因此执行下面打印会报错
//println!("val = {}",val);
});
//接收
let received = rx.recv().unwrap();
println!("Got: {}",received);
}

2.2 一个生产者连续发送多个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
//发送
for val in vals{
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
//接收
for recv in rx{
println!("Got: {}",recv);

}
}

2.3 多个生产者连续发送多个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
let (tx,rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
//发送
for val in vals{
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

thread::spawn(move || {
let vals = vec![
String::from("A"),
String::from("B"),
String::from("C"),
String::from("D"),
];
//发送
for val in vals{
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

//接收
for recv in rx{
println!("Got: {}",recv);

}
}

3. 互斥器

  1. 即mutex
  2. Mutex
  3. 通道类似于单所有权的方式,值传递到通道后,发送者就无法再使用这个值
  4. 任意时刻,只允许一个线程来访问某些数据
  5. 互斥器使用时,需要先获取到锁,使用后需要释放锁
  6. Mutex是一个智能指针,lock调用返回一个叫做MutexGuard的智能指针,内部提供了drop方法,实现当MutexGuard离开作用域时自动释放锁

3.1 基本实现

1
2
3
4
5
6
7
8
9
10
use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}//离开作用域时,自动释放锁
println!("m = {:?}",m)
}

3.2 线程中使用锁

线程中共享数据:
Rc非线程安全,不得使用;因此使用Arc智能指针,功能类似于Rc,但线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let cnt = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = cnt.lock().unwrap();
*num += 1;
});
handles.push(handle)
}

for handle in handles {
handle.join().unwrap();
}
println!("result = {}", *counter.lock().unwrap());
}

3.3 RefCell/Rc 与Mutex/Arc比较

  1. Mutex提供内部可变性,类似于RefCell
  2. RefCell/Rc非线程安全,Mutex/Arc线程安全

4. 重点:send和sync trait介绍

  1. 有两个并发概念内嵌于rust语言中:std:marker中的Sync和Send trait
  2. 通过send允许在线程间转移所有权
    1. Send标记trait表明类型的所有权可以在线程间传递信息。几乎所有的Rust类型都是Send,但是例外:例如Rc是不能Send的
    2. 任何完全由Send类型组成的类型也会自动被标记为Send,就是说,一个结构体中,每个数据类型都有Send,则这个结构体也可以Send
  3. Sync允许多线程访问
    1. Sync标记trait表明一个实现了Sync的类型可以安全的在多个线程中拥有其值的引用,即对于任意类型T,如果&T(T的引用)是Send的话,T就是Sync的,这意味着其引用就可以安全的发送到另一个线程。
    2. 智能指针Rc也不是Sync的,处于其不是Send的相同原因。RefCell和Cell系列类型不是Sync的。RefCell在运行时所进行的借用检查也不是线程安全的,Mutex是线程安全的
  4. 手动实现Send和Sync是不安全的
    1. 通常并不需要手动实现Send和Sync trait,因为由Send和Sync的类型组成的类型,自动就是Send和Sync的。因为它们是标记trait,甚至都不需要实现任何方法。他们只是用来加强并发相关的不可变性的。

总结

本文编辑完毕

参考

[1] Rust 程序设计语言

  • Copyrights © 2017-2023 Jason
  • Visitors: | Views:

谢谢打赏~

微信