Atomic Operation in Rust
Introduction
I read in-depth about Rust's atomic operations and locks up to chapter 2, and learned about atomic operations between threads. I will write a memo blog for reference based on what I learned. Since I usually use Rust in web applications, learning about thread operations is enjoyable.
O'REILLY: Rust Atomics and Locks
Author: Mara Bos
About Atomics
In my understanding, atomic operations involve manipulating values placed in shared memory between threads. One rule is that when one process is in the process of manipulating a value, it should be in a state where operations from other processes are impossible. This is a crucial concept when dealing with shared values in scenarios like multithreading, allowing the avoidance of issues related to exclusive control.
Atomics in Rust
Rust provides std::sync::atomic. As an example, let see AtomicI32
impl AtomicI32 {
pub fn load(&self, ordering: Ordering) -> i32;
pub fn store(&self, value: i32, ordering: Ordering);
}
load
and store
functions are handle value in Atomic class.
The book shows a stop flag logic as an example for the Atomics.
use std::io::stdin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::thread;
fn main() {
static STOP: AtomicBool = AtomicBool::new(false);
// the background thread runs task_job()
let background_thread = thread::spawn(|| {
while !STOP.load(Relaxed) {
task_job();
}
});
// accept user input
for line in stdin().lines() {
match line.unwrap().as_str() {
"help" => println!("help"),
"stop" => {
break;
}
cmd => println!("unknown command: {}", cmd),
}
}
// stop the thread if user enter stop by adding true into STOP atomic
STOP.store(true, Relaxed);
// .join().unwrap() waits til the background thread is finish
background_thread.join().unwrap();
}
The above example involves waiting for user input, and when stop
is entered, it changes the value of STOP atomic
to true
, concluding the process.
We will skip the details of Ordering: Relaxed for now (I will write a blog if I delve deeper into it).
Share Atomics between threads
This time, we'll try sharing and manipulating an Atomic named num_done
, representing the progress of a task, between the background and main threads.
The following code is an example of a commonly seen task progress report.
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::thread;
use std::time::Duration;
fn main() {
let num_done = AtomicUsize::new(0);
let main_thread = thread::current();
thread::scope(|s| {
// background thread run tasks for 100 times
s.spawn(|| {
for i in 0..100 {
// present as slow task
thread::sleep(Duration::from_millis(100));
// increment num_done
num_done.store(i + 1, Relaxed);
// unpark main thread
main_thread.unpark();
}
});
// main thread
loop {
let n = num_done.load(Relaxed);
if n == 100 {
break;
}
println!("{}/100 tasks done", n);
// the main thread waits every 1sec or until some thread runs main_thread.unpark()
thread::park_timeout(Duration::from_secs(1));
}
});
println!("All done!");
}
It simulates a time-consuming process with a for loop. Every 100ms, the background thread increments num_done, and the main thread displays the progress. The thread::park_timeout(Duration::from_secs(1));
in the main thread pauses the process until 1 second has passed or main_thread.unpark();
is executed somewhere.
Useful functions of Atomics
fetch_add
pub fn fetch_add(&self, val: $int_type, order: Ordering) -> $int_type {
// SAFETY: data races are prevented by atomic intrinsics.
unsafe { atomic_add(self.v.get(), val, order) }
}
The fetch_add
operation stores a value in an Atomic while returning the data before the update. It is useful when you want to simultaneously retrieve and increment IDs, like with NEXT_ID. There are other operations like fetch_sub
or fetch_or
that seem to offer various functionalities as well.
compare_exchange
Another introduced function is compare_exchange
pub fn compare_exchange(&self,
current: $int_type,
new: $int_type,
success: Ordering,
failure: Ordering) -> Result<$int_type, $int_type> {
// SAFETY: data races are prevented by atomic intrinsics.
unsafe {
atomic_compare_exchange(self.v.get(), current, new, success, failure)
}
}
It compares the arguments, current
and new
, and returns Ok if they are the same, and Err if they are different. It seems to be used, for example, in cases of delayed initialization of keys.
fn get_key() -> u64 {
// default value is 0
static KEY: AtomicU64 = AtomicU64::new(0);
let key = KEY.load(Relaxed);
if key == 0 {
let new_key = generate_random_key();
// Due to possible contention with other threads, we use compare_exchange
// If another thread has written a non-zero value first, return that value
// Otherwise, write new_key and return new_key
match KEY.compare_exchange(0, new_key, Relaxed, Relaxed) {
Ok(_) => new_key,
Err(key) => key,
}
} else {
key
}
}
Conclusion
In Rust, the Atomic
type is provided for atomic operations.