并发
rust 中奇怪的规定很多,当然都是为了程序的安全考虑,比如说避免数据竞争,正因如此,rust 中的并发会很容易变得安全
并发的英文是 Concurrent, 是指同时有多个任务处于执行状态的现象,是很重要的东西,由于多个任务之间可能需要联系,互相影响,并发的实现需要解决很多问题
- 数据共用
并发程序中,如果两个线程同时对一个数据对象进行修改,那么就会发生问题,解决这一问题,通常是对锁的使用,Mutex就能很好地担任这一任务,当有一个线程在使用一个变量时,只有等他使用完了,其他线程才能使用这个变量
- 数据回收
数据回收的问题是由数据共用引起的,在单线程数据的生命周期是可以被判断出来的,但是在多线程的程序里,如果一个线程结束之后会把所有的都变量都回收,而其他线程可能也需要使用那些变量的时候就会出现问题,解决这个最好的办法解释使用 Rc,计数为 0 的时候就回收数据
- 死锁
打个比方,有两个线程 X、Y,现在这个时间节点 X 对 变量 A 上锁,Y 对变量 B 上锁,过了一会,X 需要对 B 上锁,而 Y 需要对 A 上锁,但是这些变量都被对方上锁了,然后就两个程序都等待对方结束,而结束的条件是上锁之后运行完程序,所以就陷入了死循环…
这就是死锁,也是并发程序里面较为难预防的一种,目前解决方案主要有两种:一种是计时法,给上锁过程加上时间限制,超过了时间限制就放弃执行,但是这会吧把可恢复错误演变成不可恢复错误,影响系统稳定性;另一种方式是避免单线程对多个互斥锁上锁,这种方法就会稳定很多
- 线程通信
线程通信也是很重要的一点,目前可靠的方式是消息传递,不断监听来自消息通道的消息,类似发电报
多线程
进程的英文是 Process,线程的英文是 Thread,计算机中的一个进程可以包含若干线程,大多数应用都是单进程多线程的,多线程机制可以实现单进程应用的并发
rust 中使用的是 std::thread 标准库的 spawn 函数
use std::thread;
use std::time::Duration;
fn sub_thread() {
for i in 1..5 {
println!("From Sub :{i}");
}
}
fn main() {
thread::spawn(sub_thread).join().unwrap();
for i in 1..5 {
println!("From Main : {i}");
}
}
spawn 函数接受一个无参函数,这个函数也可以是个 lambda 表达式
thread::spawn(||{
for i in 1..5 {
println!("From SUb :{i}");
}
}).join().unwrap();
join 方法是 spawn 返回的类的方法,他的作用是吧子线程和当前线程连接起来,效果就是会等到子线程结束之后再继续进行,如果不使用 join 方法就能让子线程独立于当前线程了,不过要注意不能让当前线程比子线程先结束了
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(||{
for i in 1..500 {
println!("From SUb :{i}");
}
});
for i in 1..2 {
println!("From Main : {i}");
thread::sleep(Duration::from_millis(1));
}
}
std::thread::sleep 能让线程延时,好让主线程不结束的那么快,虽然这段程序用我的电脑还是没有跑完
线程通讯
上面那个例子是通过线程延时来防止主线程比子线程结束早的,但是通常情况下,这是不可行的,应该采取联系父线程的方式,在完成任务的时候通知父线程结束,线程通信就能实现这一点
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
println!("sub sub");
thread::sleep(Duration::from_secs(3));
sender.send("sub finish").unwrap();
});
println!("bebefore");
let received = receiver.recv().unwrap();
println!("receive: {}", received);
}
std::sync::mpsc::channel 方法能够创建一个消息通道,返回一个元组,包含一个发送者和一个接收者,发送者主动发送消息给接收者,move 关键字用来转移 sender 的所有权,主线程会等待 receiver 受到消息为止,所以会等待到子线程里 sleep 三秒后给消息通道发消息
消息通道可以单纯给主线程用来等待子线程结束
use std::thread;
use std::sync::mpsc;
fn main() {
let (s1, r1) = mpsc::channel();
let (s2, r2) = mpsc::channel();
thread::spawn(move || {
for i in 1..5 {
println!("sub 1 : {i}");
}
s1.send(0).unwrap();
});
thread::spawn(move || {
for i in 1..5 {
println!("sub 2 : {i}");
}
s2.send(0).unwrap();
});
r1.recv().unwrap();
r2.recv().unwrap();
}
Arc
之前介绍的 Rc 允许一个数据有多个所有者,它是使用一个计数器来计变量的被引用数,但是在多线程环境下,多个线程可能同时增加或减少计数器,从而导致计数器不正确地更新。这可能会导致内存泄漏或使用-after-free 错误等问题,因此需要在多线程环境下使用更加线程安全的智能指针类型,Arc,Atomic Reference Counted,基于原子操作实现计数器的增减操作,从而保证了线程安全
什么是原子操作呢?
GPT:
原子操作是一个计算机系统中的基本概念,它们是通过硬件机制来保证线程安全和数据一致性的。在现代 CPU 中,原子操作通常是使用 CPU 缓存和总线锁定等硬件机制实现的,这些机制可以保证一个操作在执行期间不会被中断或干扰,以保证它的原子性。
在并发编程中,原子操作是一种重要的同步机制,可以避免多个线程同时访问和修改共享的变量或资源而导致的竞态条件和数据不一致问题。因此,了解原子操作的工作原理和使用方法对于编写高效、线程安全的并发程序至关重要。
使用方法和 Rc 差不多
use std::thread;
use std::sync::Arc;
use std::sync::mpsc;
fn main() {
let array = Arc::new(vec![1, 2, 3, 4, 5]);
let copy_1 = array.clone();
let (s1, r1) = mpsc::channel::<i32>();
let (s2, r2) = mpsc::channel::<i32>();
thread::spawn(move || {
println!("t1: {:?}", copy_1);
s1.send(0).unwrap();
});
let copy_2 = array.clone();
thread::spawn(move || {
println!("t2: {:?}", copy_2);
s2.send(0).unwrap();
});
r1.recv().unwrap();
r2.recv().unwrap();
}
Arc 和 Rc 一样,都是不可变的,如果希望子线程能够更改某个变量必须用 Mutex 互斥锁和 Arc 一起使用
use std::thread;
use std::sync::{Arc, Mutex, mpsc};
fn main() {
let (s1, r1) = mpsc::channel();
let (s2, r2) = mpsc::channel();
let sum = Arc::new(Mutex::new(0_u32));
let copy_1 = sum.clone();
let copy_2 = sum.clone();
thread::spawn(move || {
let mut sum_value = copy_1.lock().unwrap();
for i in 1..11 {
*sum_value += i;
}
s1.send(0).unwrap();
});
thread::spawn(move || {
let mut sum_value = copy_2.lock().unwrap();
for i in 11..21 {
*sum_value += i;
}
s2.send(0).unwrap();
});
r1.recv().unwrap();
r2.recv().unwrap();
let locked_sum = sum.lock().unwrap();
println!("{locked_sum}");
}
在多线程编程中,Mutex 一般 都是和 Arc 一起食用的
死锁
下面是一个为了死锁而实现的死锁
use std::thread;
use std::sync::{Mutex, mpsc, Arc};
fn main() {
let (s1, r1) = mpsc::channel::<i32>();
let (s2, r2) = mpsc::channel::<i32>();
let (ss1, rr1) = mpsc::channel::<i32>();
let (ss2, rr2) = mpsc::channel::<i32>();
let a = Arc::new(Mutex::new(1));
let b = Arc::new(Mutex::new(1));
let a_c1 = a.clone();
let a_c2 = a.clone();
let b_c1 = b.clone();
let b_c2 = b.clone();
thread::spawn(move || {
println!("Thread 1 try to get the lock of a...");
let p = a_c1.lock().unwrap();
println!("Thread 1 get the lock of a!");
ss1.send(0).unwrap();
rr2.recv().unwrap();
println!("Thread 1 try to get the lock of b...");
let p = b_c1.lock().unwrap();
println!("Thread 1 to get the lock of b!");
s1.send(0).unwrap();
});
thread::spawn(move || {
println!("Thread 2 try to get the lock of b...");
let p = b_c2.lock().unwrap();
println!("Thread 2 get the lock of b!");
rr1.recv().unwrap();
ss2.send(0).unwrap();
println!("Thread 2 try to get the lock of a...");
let p = a_c2.lock().unwrap();
println!("Thread 2 get the lock of a!");
s2.send(0).unwrap();
});
r1.recv().unwrap();
r2.recv().unwrap();
}
首先声明,正常程序不会这么设计的,但是在运行过程中,难免会发生这种情况,这是一个概率问题,当经常出现死锁问题的时候,就可以考虑优化程序了,书中给出了两种保障死锁情况的方法,一种是用一个互斥锁来保护这些数据,另一个方法是使用标志互斥锁来保护
用一个互斥锁
其实就是吧多个变量放在元组里用同一个锁来管理 (23333
use std::thread;
use std::sync::{Arc, Mutex, mpsc};
fn main() {
let (s1, r1) = mpsc::channel();
let (s2, r2) = mpsc::channel();
let data = Arc::new(Mutex::new((1, 3)));
let copy_1 = data.clone();
thread::spawn(move || {
let mut data = copy_1.lock().unwrap();
(*data).0 += (*data).1;
s1.send(0).unwrap();
});
let copy_2 = data.clone();
thread::spawn(move || {
let mut data = copy_2.lock().unwrap();
(*data).1 += (*data).0;
s2.send(0).unwrap();
});
r1.recv().unwrap();
r2.recv().unwrap();
let lock_data = data.lock().unwrap();
println!("value of data is {:?}", lock_data);
}
你在期待什么
用标志互斥锁
也不是什么高级的东西,很朴素,就是在可能会发生死锁的进程执行之前都上一个无意义的互斥锁,这样每次只有等一个线程执行完全部操作之后其他线程才能工作
use std::thread;
use std::sync::{Arc, Mutex, mpsc};
fn main() {
let (s1, r1) = mpsc::channel();
let (s2, r2) = mpsc::channel();
let flag_mutex = Arc::new(Mutex::new(0));
let data_1 = Arc::new(Mutex::new(1_u32));
let data_2 = Arc::new(Mutex::new(2_u32));
let flag = flag_mutex.clone();
let copy_1_data_1 = data_1.clone();
let copy_1_data_2 = data_2.clone();
thread::spawn(move || {
let flag = flag.lock().unwrap();
let mut data_1 = copy_1_data_1.lock().unwrap();
let data_2 = copy_1_data_2.lock().unwrap();
*data_1 += *data_2;
s1.send(*flag).unwrap();
});
let flag = flag_mutex.clone();
let copy_2_data_1 = data_1.clone();
let copy_2_data_2 = data_2.clone();
thread::spawn(move || {
let flag = flag.lock().unwrap();
let mut data_2 = copy_2_data_2.lock().unwrap();
let data_1 = copy_2_data_1.lock().unwrap();
*data_2 += *data_1;
s2.send(*flag).unwrap();
});
r1.recv().unwrap();
r2.recv().unwrap();
let locked_data_1 = data_1.lock().unwrap();
let locked_data_2 = data_2.lock().unwrap();
println!("data_1 is {locked_data_1}, data_2 is {locked_data_2}");
}
比前一个效率相对低一点,但也更灵活,更依赖开发者的习惯一点
2023/04/26