查看原文
其他

Rust并发编程之多线程

秋风不度镇南关 码农真经 2023-12-25

Rust对并发编程提供了非常丰富的支持,有传统的多线程方式,也提供流行的异步原语async, await,本篇文章主要介绍多线程方面的基本用法。以下将分为5部分进行讲解

  1. 线程的创建

  2. 原子变量

  3. 管道 , 条件变量

  4. 生产者消费者的实现


线程的创建


线程的创建非常的简单

let thread = std::thread::spawn(||{
println!("hello world");
});
thread.join(); //等待线程结束

Rust语言和其他语言不一样的地方是,如果线程里使用了外部变量,则会报错

let data = String::from("hello world");
let thread = std::thread::spawn(||{
println!("{}", data);
});
thread.join();

原因如下:
error[E0373]: closure may outlive the current function, but it borrows `data`, which is owned by the current function
--> src\main.rs:36:37
|
36 | let thread = std::thread::spawn(||{
| ^^ may outlive borrowed value `data`
37 | println!("{}", data);
| ---- `data` is borrowed here

线程中使用了其他线程的变量是不合法的,必须使用move表明线程拥有data的所有权

let data = String::from("hello world");
let thread = std::thread::spawn(move ||{ //使用move 把data的所有权转到线程内
println!("{}", data);
});
thread.join();


如果想要在多线程间读写数据,通常需要加锁,如java中的 synchornized。与之对应,在Rust中需要使用Mutex,由于Mutex是跨线程使用,线程会转移Mutex的所有权,所以必须配合Arc使用。

fn main() {
let counter = Arc::new(Mutex::new(0));
let counter2 = counter.clone();
let thread = std::thread::spawn(move ||{
let mut i = counter2.lock().unwrap();//获取锁,不需要手动释放,rust的锁和变量的生命周期一样,离开作用域时,锁会自动释放
*i = *i + 1;
});
thread.join();
let counter = counter.lock().unwrap();
assert_eq!(1, *counter);
}


原子变量


上面的例子中,我们使用锁来实现对数据的安全访问,锁作用的范围是调用lock到锁对象的scope结束,在这段范围内的代码同一时间只能被一个线程访问,从这点来看,使用锁来实现对单一数据的安全访问就有点重了(当然从锁和原子变量的实现机制来说,锁也远比原子变量重),这时候使用原子变量效率就会高很多。

fn main() {
let counter = Arc::new(AtomicU32::new(0));
let counter2 = counter.clone();
let thread = std::thread::spawn(move ||{
counter2.fetch_add( 1, Ordering::SeqCst);
});
counter.fetch_add( 1, Ordering::SeqCst);
thread.join();
counter.load(Ordering::SeqCst);
assert_eq!(2, counter.load(Ordering::SeqCst));
}

管道与条件变量


线程间的通信、协作,需要有一定的机制来支持,管道和条件变量就是这样的机制。

  1. 管道(Channel) Rust的channle包含了2个概念,发送者和接收者。发送者可以将消息放入管道,接收者则从管道中读取消息

fn main() {
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
let sender2 = sender.clone();
thread::spawn(move|| {
sender2.send(123).unwrap(); //线程1 发送消息
});
thread::spawn(move|| {
sender.send(456).unwrap(); //线程2 发送消息
});
while let Ok(res) = receiver.recv() { //主线程 接收消息
println!("{:?}", res);
}
}

值得注意的是接收者(receiver), 是唯一的,不像发送者(sender)那样可以有多个

  1. 条件变量 条件变量Condvar,不能单独使用,需要和监视器MutexGuard配合使用。线程之间通过调用 condvar.wait, condvar.notify_all, condvar.notify_one来实现线程间通信。

fn println(msg: &str){
use chrono::Local;
let date = Local::now();
println!("{} {}", date.format("%Y-%m-%d %H:%M:%S"), msg)
}
fn main() {
use std::sync::{Arc, Mutex, Condvar};
use std::thread;

let mutex_condva = Arc::new((Mutex::new(false), Condvar::new()));
let m_c = mutex_condva.clone();

thread::spawn(move || {
println("sub thread start..");
let (lock, cvar) = &*m_c;
let mut started = lock.lock().unwrap();
*started = true; //将业务参数设置为true
std::thread::sleep(Duration::from_secs(5));
cvar.notify_all(); // 唤醒条件变量等待者
println("sub thread finished..");
});
println("main thread start..");

let (lock, cvar) = &*mutex_condva;
let mut started = lock.lock().unwrap();
println("main thread begin wait..");
while !*started { //等待条件变量被唤醒,且等待关注的业务参数为真。这里需要注意,要在循环中判断started,因为条件变量被唤醒时,有可能业务条件并未为true
started = cvar.wait(started).unwrap();
}
println("main thread finished..");
}

实现生产者消费者


下面的例子使用条件变量Condar来实现多生产者 ,多消费者(使用管道比较容易实现,且只能由一个消费者,这里就不介绍了)

struct Queue<T>{
inner:Vec<T>,
capacity: usize
}
impl<T> Queue<T> {
fn new(cap:usize) -> Queue<T> {
Queue{
inner: Vec::new(),
capacity: cap
}
}
fn push(&mut self, data:T) -> bool {
if !self.is_full() {
self.inner.push(data);
true
} else {
false
}
}
fn pop(&mut self) -> Option<T> {
self.inner.pop()
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn is_full(&self) -> bool {
if self.inner.len() == self.capacity {true} else {false}
}
}


struct Producer<T>{
inner:Arc<(Mutex<Queue<T>>, Condvar)>
}
impl<T:Display> Producer<T> {
fn new(inner:Arc<(Mutex<Queue<T>>, Condvar)>) -> Producer<T> {
Producer{inner}
}
fn produce(&self, data:T) {
let mut queue = self.inner.0.lock().unwrap();
while (*queue).is_full() {
println("[Producer] Queue is full, waiting queue to not full");
queue = self.inner.1.wait(queue).unwrap();
println("[Producer22] Queue is full, waiting queue to not full");
}
println("[Producer] Queue is not full, push data to queue");
queue.push(data);
self.inner.1.notify_all();
}
}

struct Consumer<T>{
inner: Arc<(Mutex<Queue<T>>, Condvar)>
}
impl<T:Display> Consumer<T> {
fn new(inner:Arc<(Mutex<Queue<T>>, Condvar)>) -> Consumer<T> {
Consumer{inner}
}
fn consume(&self) -> Option<T> {
let mut queue = self.inner.0.lock().unwrap();
while (*queue).is_empty() {
println("[Consumer] Queue is empty, waiting queue to have data");
queue = self.inner.1.wait(queue).unwrap();
}
println("[Consumer] Queue has data, pop data");
let data = queue.pop();
self.inner.1.notify_all();
data
}
}

fn println(msg: &str){
use chrono::Local;
let date = Local::now();
println!("{:?} {} {}", std::thread::current().id(), date.format("%Y-%m-%d %H:%M:%S"), msg)
}
fn main() {
let mc = Arc::new((Mutex::new(Queue::<usize>::new(3)), Condvar::new()));
produce(&mc);
consume(&mc);
std::thread::sleep(Duration::from_secs(1000));//主线程等待,不然程序会提早退出
}

fn produce(mc: &Arc<(Mutex<Queue<usize>>, Condvar)>){
for i in 0 .. 10 {
let mc_clone = mc.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(random::<u64>() % 10));
let producer = Producer::new(mc_clone);
producer.produce(i);
});
}
}
fn consume(mc: &Arc<(Mutex<Queue<usize>>, Condvar)>){
for i in 0 .. 2 {
let mc_clone = mc.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(random::<u64>() % 2));
let consumer = Consumer::new(mc_clone);
loop {
consumer.consume();
}
});
}
}

往期推荐

各领域最值得推荐的入门书籍(合集)

编程语言PK之 Go vs Rust,谁更好?【英文字幕】

2021,RUST的趋势很明显了

Rust错误处理

你心心念念的必应壁纸,可以一键打包下载了...

继续滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存