P.SAF.MTH.01 并发场景下应避免数据竞争和死锁
【描述】
Safe Rust 多线程同步锁并发情况下有效地防止数据竞争,但是使用无锁编程时,需要选择合理的内存顺 序才能避免数据竞争。
另外,在 Unsafe Rust 下如果需要手工为一些自定义类型实现 Sync 和 Send auto trait 时,也需要严 格确保该类型在多线程下传递不会出现数据竞争风险。
【正例】
- 自旋锁中 Acquire / Release 搭配使用的简易示例:
Rust
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
let lock = Arc::new(AtomicBool::new(false)); // 这个原子类型的值用来表示 "是否获取 到锁"
// ... distribute lock to threads somehow ...
// 线程 A 尝试通过设置为 true 来获取锁 //
// Acquire 内存顺序:
// 当与 load 结合使用时,若 load 的值是由具有 Release (或更强) 排序的 store 操作写入 的,则所有后续操作
// 都将在该 store 之后进行排序。特别是,所有后续 load 都将看到在 store 之前写入的数据。
while lock.compare_and_swap(false, true, Ordering::Acquire) { }
// 在循环外,意味着已经拿到了锁! // ...访问/操作数据 ...
// 线程 A 完成了数据操作,释放锁。
// 此处用 Release 内存顺序可以确保线程 B 在获取锁时能看到线程 A 释放了锁(对内存写入 false)
//
// Release 内存顺序:
// 当与 store 结合使用时,所有先前的操作都会在使用 Acquire(或更强)排序的任何 load 此 值之前排序。
// 特别是,所有先前的写入对执行 Acquire 此值(或更强)load 的线程都可见。
lock.store(false, Ordering::Release);
}【正例】
- Unsafe Rust 中实现 auto trait
Send / Sync的示例
Rust futures 库中发现的问题,错误的手工 Send / Sync 实现破坏了线程安全保证。受影响的版本 中, MappedMutexGuard 的 Send / Sync 实现只考虑了 T 上的差异,而 MappedMutexGuard 则取 消了对 U 的引用。当 MutexGuard::map() 中使用的闭包返回与 T 无关的 U 时,这可能导致安全 Rust 代码中的数据竞争。
这个问题通过修正 Send / Sync 的实现,以及在 MappedMutexGuard 类型中添加一个 PhantomData<&'a mut U> 标记来告诉编译器,这个防护也是在 U 之上。
Rust
// CVE-2020-35905: incorrect uses of Send/Sync on Rust's futures
pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> {
mutex: &'a Mutex<T>,
value: *mut U,
_marker: PhantomData<&'a mut U>, // + 修复代码
}
impl<'a, T: ?Sized> MutexGuard<'a, T> {
pub fn map<U: ?Sized, F>(this: Self, f: F)
-> MappedMutexGuard<'a, T, U>
where F: FnOnce(&mut T) -> &mut U {
let mutex = this.mutex;
let value = f(unsafe { &mut *this.mutex.value.get() });
mem::forget(this);
// MappedMutexGuard { mutex, value }
MappedMutexGuard { mutex, value, _marker: PhantomData } // + 修复代码
}
}
// unsafe impl<T: ?Sized + Send, U: ?Sized> Send
unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send // + 修复代码
for MappedMutexGuard<'_, T, U> {}
//unsafe impl<T: ?Sized + Sync, U: ?Sized> Sync
unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync // + 修复代码
for MappedMutexGuard<'_, T, U> {}
// PoC: this safe Rust code allows race on reference counter
* MutexGuard::map(guard, |_ | Box::leak(Box::new(Rc::new(true))));【反例】
- 在 Unsafe Rust 中手动为自定义类型实现 Sync 的反例:
Rust
fn main {
use std::cell::Cell;
use std::marker::Sync;
use std::sync::Arc;
struct Counter {
val: Cell<usize>,
}
// 不符合:Counter 在多线程共享不是安全的 // 错误的使用 unsafe impl Sync
unsafe impl Sync for Counter {}
impl Counter {
fn new() -> Counter {
Counter { val: Cell::new(0) }
}
fn increment(&self) {
self.val.set(self.val.get() + 1);
}
fn get_count(&self) -> usize {
self.val.get() }
}
let counter = Arc::new(Counter::new()); let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = std::thread::spawn(move || {
for _ in 0..100 {
counter.increment();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Counter value: {}", counter.get_count());
}【正例】
- 在 Unsafe Rust 中手动为自定义类型实现 Sync 的正例:
Rust
fn main() {
use std::marker::Sync;
use std::sync::{Arc, Mutex};
struct Counter {
count: Mutex<usize>,
}
// 符合:Counter 在多线程共享是安全的
unsafe impl Sync for Counter {}
impl Counter {
fn new() -> Counter {
Counter {count: Mutex::new(0), }
}
fn increment(&self) {
let mut count = self.count.lock().unwrap(); *count += 1;
}
fn get_count(&self) -> usize {
*self.count.lock().unwrap()
}
}
let counter = Arc::new(Counter::new());
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = std::thread::spawn(move || {
for _ in 0..100 {
counter.increment();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Counter value: {}", counter.get_count());
}【反例】
- 多线程通过 unsafe 修改静态变量
Rust
// 不符合:数据竞争
static mut SHARED_STATE: u8 = 0;
thread::spawn(move || unsafe { SHARED_STATE = 1 };);
thread::spawn(move || unsafe { SHARED_STATE = 2 };);【正例】
- 多线程通过 unsafe 修改静态变量
Rust
// 符合
static mut SHARED_STATE: u8 = AtomicU8::new(0);
thread::spawn(move || unsafe { SHARED_STATE.store(1, Ordering::SeqCst); });
thread::spawn(move || unsafe { SHARED_STATE.store(2, Ordering::SeqCst); });