RustにおけるAtomic操作入門
はじめに
詳解Rustアトミック操作とロックを読んでいて2章までの学びで Thread間のAtomic操作について学んだので参考程度にメモブログを書く。
普段 Rust は web application で使ってるので thread 操作の学びは楽しい。
Amazon: 詳解Rustアトミック操作とロック
Mara Bos 著
中田 秀基 訳
Atomic操作とは
自分の理解では Atomic操作とは Thread間で共有のメモリに置かれている値の操作を行うこと。日本語だと不可分操作と呼ぶ。
ルールの1つとして1つのプロセスが値の操作中の場合は他のプロセスからの操作が不可能な状態であること。
マルチスレッドなどで共有された値を操作する時に必須となる概念で、これにより排他制御の問題を回避することができる。
RustでのAtomic操作
Rustは std::sync::atomic を提供している。例として AtomicI32
をみてみる。
impl AtomicI32 {
pub fn load(&self, ordering: Ordering) -> i32;
pub fn store(&self, value: i32, ordering: Ordering);
}
load, store 関数は atomic変数に値を保存と読み込みを行う。
使い方としては例としてストップフラグのような実装がある。
use std::io::stdin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::thread;
fn main() {
static STOP: AtomicBool = AtomicBool::new(false);
// task_job()を実行するスレッド
let background_thread = thread::spawn(|| {
while !STOP.load(Relaxed) {
task_job();
}
});
// ユーザー入力を受け付ける
for line in stdin().lines() {
match line.unwrap().as_str() {
"help" => println!("help"),
"stop" => {
break;
}
cmd => println!("unknown command: {}", cmd),
}
}
// stopと入力されたら、STOPをtrueにして、スレッドを終了させる
STOP.store(true, Relaxed);
// .join().unwrap()でスレッドが終了するまで待つ
background_thread.join().unwrap();
}
上記の例はユーザーの入力をまち stop
と入力されたら STOP atomic
の値を true
に変更して処理を終える。
OrderingのRelaxedは今回割愛する。(より深く学んだらブログ書く)
Thread間でのAtomic共有
今回はタスクの進捗を表す num_done
というAtomicをバックグラウンドとメインのスレッドで共有して操作してみる。下記のコードはよくみるタスクの進捗レポートの例である。
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::thread;
use std::time::Duration;
fn main() {
let num_done = AtomicUsize::new(0);
let main_thread = thread::current();
thread::scope(|s| {
// background threadで100回処理する
s.spawn(|| {
for i in 0..100 {
// 擬似的に処理を遅くする
thread::sleep(Duration::from_millis(100));
// num_doneをインクリメントする
num_done.store(i + 1, Relaxed);
// main threadを起こす
main_thread.unpark();
}
});
// main thread
loop {
let n = num_done.load(Relaxed);
if n == 100 {
break;
}
println!("{}/100 tasks done", n);
// 1秒ごとまたはmain_thread.unpark()されるまで待つ
thread::park_timeout(Duration::from_secs(1));
}
});
println!("All done!");
}
for loop で擬似的に時間がかかる処理を表現してる。 100ms 毎に background thread で num_done をインクリメントして行ってそれを main thread で進捗表示する。
main thread の中の thread::park_timeout(Duration::from_secs(1));
は1秒毎またはどこかで main_thread.unpark();
が実行されるまで処理を止める関数。
Atomicの便利な関数
fetch_add
pub fn fetch_add(&self, val: $int_type, order: Ordering) -> $int_type {
// SAFETY: data races are prevented by atomic intrinsics.
unsafe { atomic_add(self.v.get(), val, order) }
}
fetch_add
はAtomicに値をストアしながら 更新前のデータを返し値とします。NEXT_ID など、IDを取得しながら同時にインクリメントさせたりしたい時に便利。 fetch_sub
や fetch_or
など他にもいろいろありそう。
compare_exchange
他にも紹介されてたのは compare_exchange
だった。
pub fn compare_exchange(&self,
current: $int_type,
new: $int_type,
success: Ordering,
failure: Ordering) -> Result<$int_type, $int_type> {
// SAFETY: data races are prevented by atomic intrinsics.
unsafe {
atomic_compare_exchange(self.v.get(), current, new, success, failure)
}
}
引数の current
と new
を比較して同じであれば Ok
違う値であれば Err
を返す。例ではkeyの遅延初期化などで使用されるそう。
fn get_key() -> u64 {
// 初期値は0
static KEY: AtomicU64 = AtomicU64::new(0);
let key = KEY.load(Relaxed);
if key == 0 {
let new_key = generate_random_key();
// 他のスレッドとの競合があるため、compare_exchangeを使う
// 他のスレッドが先に0以外の値を書き込んでいたら、その値を返す
// そうでなければ、new_keyを書き込んでnew_keyを返す
match KEY.compare_exchange(0, new_key, Relaxed, Relaxed) {
Ok(_) => new_key,
Err(key) => key,
}
} else {
key
}
}
まとめ
RustではAtomic操作のために Atomic型が提供されている。