eyecatch

Atomic Operation in Rust

Posted on 2024/01/03
# Technology

Introduction

I read in-depth about Rust's atomic operations and locks up to chapter 2, and learned about atomic operations between threads. I will write a memo blog for reference based on what I learned. Since I usually use Rust in web applications, learning about thread operations is enjoyable.

O'REILLY: Rust Atomics and Locks
Author: Mara Bos

About Atomics

In my understanding, atomic operations involve manipulating values placed in shared memory between threads. One rule is that when one process is in the process of manipulating a value, it should be in a state where operations from other processes are impossible. This is a crucial concept when dealing with shared values in scenarios like multithreading, allowing the avoidance of issues related to exclusive control.

Atomics in Rust

Rust provides std::sync::atomic. As an example, let see AtomicI32

impl AtomicI32 {
  pub fn load(&self, ordering: Ordering) -> i32;
  pub fn store(&self, value: i32, ordering: Ordering);
}

load and store functions are handle value in Atomic class.
The book shows a stop flag logic as an example for the Atomics.

use std::io::stdin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::thread;

fn main() {
    static STOP: AtomicBool = AtomicBool::new(false);

    // the background thread runs task_job()
    let background_thread = thread::spawn(|| {
        while !STOP.load(Relaxed) {
            task_job();
        }
    });

    // accept user input
    for line in stdin().lines() {
        match line.unwrap().as_str() {
            "help" => println!("help"),
            "stop" => {
                break;
            }
            cmd => println!("unknown command: {}", cmd),
        }
    }

    // stop the thread if user enter stop by adding true into STOP atomic
    STOP.store(true, Relaxed);

    // .join().unwrap() waits til the background thread is finish
    background_thread.join().unwrap();
}

The above example involves waiting for user input, and when stop is entered, it changes the value of STOP atomic to true, concluding the process.
We will skip the details of Ordering: Relaxed for now (I will write a blog if I delve deeper into it).

Share Atomics between threads

This time, we'll try sharing and manipulating an Atomic named num_done, representing the progress of a task, between the background and main threads.
The following code is an example of a commonly seen task progress report.

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::thread;
use std::time::Duration;

fn main() {
    let num_done = AtomicUsize::new(0);
    let main_thread = thread::current();

    thread::scope(|s| {
        // background thread run tasks for 100 times
        s.spawn(|| {
            for i in 0..100 {
                // present as slow task
                thread::sleep(Duration::from_millis(100));
                // increment num_done
                num_done.store(i + 1, Relaxed);
                // unpark main thread
                main_thread.unpark();
            }
        });

        // main thread
        loop {
            let n = num_done.load(Relaxed);
            if n == 100 {
                break;
            }
            println!("{}/100 tasks done", n);

            // the main thread waits every 1sec or until some thread runs main_thread.unpark()
            thread::park_timeout(Duration::from_secs(1));
        }
    });

    println!("All done!");
}

It simulates a time-consuming process with a for loop. Every 100ms, the background thread increments num_done, and the main thread displays the progress. The thread::park_timeout(Duration::from_secs(1)); in the main thread pauses the process until 1 second has passed or main_thread.unpark(); is executed somewhere.

Useful functions of Atomics

fetch_add

pub fn fetch_add(&self, val: $int_type, order: Ordering) -> $int_type {
  // SAFETY: data races are prevented by atomic intrinsics.
  unsafe { atomic_add(self.v.get(), val, order) }
}

The fetch_add operation stores a value in an Atomic while returning the data before the update. It is useful when you want to simultaneously retrieve and increment IDs, like with NEXT_ID. There are other operations like fetch_sub or fetch_or that seem to offer various functionalities as well.

compare_exchange

Another introduced function is compare_exchange

pub fn compare_exchange(&self,
  current: $int_type,
  new: $int_type,
  success: Ordering,
  failure: Ordering) -> Result<$int_type, $int_type> {
    // SAFETY: data races are prevented by atomic intrinsics.
    unsafe { 
      atomic_compare_exchange(self.v.get(), current, new, success, failure)
    }
}

It compares the arguments, current and new, and returns Ok if they are the same, and Err if they are different. It seems to be used, for example, in cases of delayed initialization of keys.

fn get_key() -> u64 {
    // default value is 0
    static KEY: AtomicU64 = AtomicU64::new(0);
    let key = KEY.load(Relaxed);
    if key == 0 {
        let new_key = generate_random_key();

        // Due to possible contention with other threads, we use compare_exchange
        // If another thread has written a non-zero value first, return that value
        // Otherwise, write new_key and return new_key
        match KEY.compare_exchange(0, new_key, Relaxed, Relaxed) {
            Ok(_) => new_key,
            Err(key) => key,
        }
    } else {
        key
    }
}

Conclusion

In Rust, the Atomic type is provided for atomic operations.