eyecatch

RustにおけるAtomic操作入門

Posted on 2024/01/03
# Technology

はじめに

詳解Rustアトミック操作とロックを読んでいて2章までの学びで Thread間のAtomic操作について学んだので参考程度にメモブログを書く。
普段 Rust は web application で使ってるので thread 操作の学びは楽しい。

Amazon: 詳解Rustアトミック操作とロック
Mara Bos 著
中田 秀基 訳

Atomic操作とは

自分の理解では Atomic操作とは Thread間で共有のメモリに置かれている値の操作を行うこと。日本語だと不可分操作と呼ぶ。
ルールの1つとして1つのプロセスが値の操作中の場合は他のプロセスからの操作が不可能な状態であること。
マルチスレッドなどで共有された値を操作する時に必須となる概念で、これにより排他制御の問題を回避することができる。

RustでのAtomic操作

Rustは std::sync::atomic を提供している。例として AtomicI32 をみてみる。

impl AtomicI32 {
  pub fn load(&self, ordering: Ordering) -> i32;
  pub fn store(&self, value: i32, ordering: Ordering);
}

load, store 関数は atomic変数に値を保存と読み込みを行う。
使い方としては例としてストップフラグのような実装がある。

use std::io::stdin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::thread;

fn main() {
    static STOP: AtomicBool = AtomicBool::new(false);

    // task_job()を実行するスレッド
    let background_thread = thread::spawn(|| {
        while !STOP.load(Relaxed) {
            task_job();
        }
    });

    // ユーザー入力を受け付ける
    for line in stdin().lines() {
        match line.unwrap().as_str() {
            "help" => println!("help"),
            "stop" => {
                break;
            }
            cmd => println!("unknown command: {}", cmd),
        }
    }

    // stopと入力されたら、STOPをtrueにして、スレッドを終了させる
    STOP.store(true, Relaxed);

    // .join().unwrap()でスレッドが終了するまで待つ
    background_thread.join().unwrap();
}

上記の例はユーザーの入力をまち stop と入力されたら STOP atomic の値を true に変更して処理を終える。
OrderingのRelaxedは今回割愛する。(より深く学んだらブログ書く)

Thread間でのAtomic共有

今回はタスクの進捗を表す num_done というAtomicをバックグラウンドとメインのスレッドで共有して操作してみる。下記のコードはよくみるタスクの進捗レポートの例である。

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::thread;
use std::time::Duration;

fn main() {
    let num_done = AtomicUsize::new(0);
    let main_thread = thread::current();

    thread::scope(|s| {
        // background threadで100回処理する
        s.spawn(|| {
            for i in 0..100 {
                // 擬似的に処理を遅くする
                thread::sleep(Duration::from_millis(100));
                // num_doneをインクリメントする
                num_done.store(i + 1, Relaxed);
                // main threadを起こす
                main_thread.unpark();
            }
        });

        // main thread
        loop {
            let n = num_done.load(Relaxed);
            if n == 100 {
                break;
            }
            println!("{}/100 tasks done", n);

            // 1秒ごとまたはmain_thread.unpark()されるまで待つ
            thread::park_timeout(Duration::from_secs(1));
        }
    });

    println!("All done!");
}

for loop で擬似的に時間がかかる処理を表現してる。 100ms 毎に background thread で num_done をインクリメントして行ってそれを main thread で進捗表示する。
main thread の中の thread::park_timeout(Duration::from_secs(1)); は1秒毎またはどこかで main_thread.unpark(); が実行されるまで処理を止める関数。

Atomicの便利な関数

fetch_add

pub fn fetch_add(&self, val: $int_type, order: Ordering) -> $int_type {
  // SAFETY: data races are prevented by atomic intrinsics.
  unsafe { atomic_add(self.v.get(), val, order) }
}

fetch_add はAtomicに値をストアしながら 更新前のデータを返し値とします。NEXT_ID など、IDを取得しながら同時にインクリメントさせたりしたい時に便利。 fetch_subfetch_or など他にもいろいろありそう。

compare_exchange

他にも紹介されてたのは compare_exchange だった。

pub fn compare_exchange(&self,
  current: $int_type,
  new: $int_type,
  success: Ordering,
  failure: Ordering) -> Result<$int_type, $int_type> {
    // SAFETY: data races are prevented by atomic intrinsics.
    unsafe { 
      atomic_compare_exchange(self.v.get(), current, new, success, failure)
    }
}

引数の currentnew を比較して同じであれば Ok 違う値であれば Err を返す。例ではkeyの遅延初期化などで使用されるそう。

fn get_key() -> u64 {
    // 初期値は0
    static KEY: AtomicU64 = AtomicU64::new(0);
    let key = KEY.load(Relaxed);
    if key == 0 {
        let new_key = generate_random_key();
        // 他のスレッドとの競合があるため、compare_exchangeを使う
        // 他のスレッドが先に0以外の値を書き込んでいたら、その値を返す
        // そうでなければ、new_keyを書き込んでnew_keyを返す
        match KEY.compare_exchange(0, new_key, Relaxed, Relaxed) {
            Ok(_) => new_key,
            Err(key) => key,
        }
    } else {
        key
    }
}

まとめ

RustではAtomic操作のために Atomic型が提供されている。