Rust:スレッドとメッセージパッシングで効率的な並行処理

現代のソフトウェア開発では、複数のタスクを同時に処理する並行性が重要です。

Rustは、スレッドとメッセージパッシングのモデルを使用して、安全で効率的な並行プログラミングをサポートしています。

Rustの所有権システムにより、並行プログラムでよく見られるデータ競合やメモリの不整合を防ぐことができます。

この記事では、Rustにおけるスレッドとメッセージパッシングの基本を学び、並行処理を実現するための手法を解説します。


Rustでのスレッドの基本

スレッドは、複数のタスクを並行して実行するための基本単位です。

Rustでは、標準ライブラリにスレッドを生成するための機能が用意されています。

スレッドを使用すると、メインスレッドとは別に新しいスレッドを作成し、処理を実行することができます。

スレッドの生成

Rustでスレッドを生成するには、std::threadモジュールを使用します。

thread::spawn 関数を使用して、新しいスレッドを生成し、並行処理を行うことができます。

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..5 {
            println!("別スレッドの処理: {}", i);
            thread::sleep(Duration::from_millis(500));
        }
    });

    for i in 1..3 {
        println!("メインスレッドの処理: {}", i);
        thread::sleep(Duration::from_millis(1000));
    }
}

ポイント:

  • thread::spawn を使って新しいスレッドを生成し、無名関数(クロージャ)でその中の処理を定義しています。
  • thread::sleep を使用して、スレッドを一時停止させながら並行に処理を進めています。

スレッドの終了を待つ

スレッドが終了するまでメインスレッドが待機するためには、生成したスレッドに対して join メソッドを呼び出す必要があります。

use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("別スレッド: {}", i);
        }
    });

    handle.join().unwrap(); // スレッドが終了するまで待機

    println!("すべてのスレッドが終了しました。");
}

ポイント:

  • join メソッドは、スレッドが終了するまでメインスレッドが待機するために使用されます。
  • unwrap はエラーハンドリングを簡略化するために使われています。スレッドが正常に終了しない場合にエラーを返します。

メッセージパッシング

Rustでは、複数のスレッド間でデータを安全に共有するために、メッセージパッシングを利用することが推奨されます。

Rustの標準ライブラリには、チャネルを使ってメッセージを送受信する機能が用意されています。

チャネルの基本

チャネルは、データを送信する「送信側」と、データを受信する「受信側」に分かれています。

std::sync::mpsc モジュールを使用して、チャネルを作成します。

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("こんにちは");
        tx.send(val).unwrap(); // メッセージを送信
    });

    let received = rx.recv().unwrap(); // メッセージを受信
    println!("受信したメッセージ: {}", received);
}

ポイント:

  • mpsc::channel でチャネルを作成し、送信側と受信側が生成されます。tx は送信側、rx は受信側です。
  • tx.send(val) で別スレッドからメッセージを送信し、rx.recv() でメッセージを受信しています。

複数メッセージの送受信

チャネルを使って複数のメッセージを送信し、受信することができます。

以下の例では、複数のメッセージをスレッド間でやりとりしています。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("メッセージ1"),
            String::from("メッセージ2"),
            String::from("メッセージ3"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
    });

    for received in rx {
        println!("受信したメッセージ: {}", received);
    }
}

ポイント:

  • for received in rx を使って、チャネルに到着するメッセージをすべて受信しています。
  • 別スレッドから複数のメッセージが順次送信され、受信側はそれらを逐次処理します。

スレッド間でのデータ共有

Rustの所有権システムでは、同時に複数のスレッドがデータを安全に扱うために、Arc(アトミック参照カウント)を使います。

これにより、データを複数のスレッドで安全に共有できます。

Arc<T> の基本

Arc<T> は、Rc<T> と似ていますが、スレッド間で安全に使用できる点が異なります。

スレッド間で所有権を共有する場合には Arc<T> を使用します。

use std::sync::Arc;
use std::thread;

fn main() {
    let data = Arc::new(vec![1, 2, 3]);

    let mut handles = vec![];

    for i in 0..3 {
        let data = Arc::clone(&data);
        let handle = thread::spawn(move || {
            println!("スレッド {}: {:?}", i, data);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

ポイント:

  • Arc::new で参照カウント付きのデータを作成し、各スレッドに Arc::clone でデータを共有しています。
  • 複数のスレッドで安全にデータを参照しながら処理を行っています。

実践的な並行処理

並行計算の例

次に、並行して複数のタスクを実行し、その結果を集約する例を見てみましょう。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    for i in 1..4 {
        let tx = tx.clone();
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(500 * i));
            tx.send(i).unwrap();
        });
    }

    for received in rx {
        println!("タスクからの結果: {}", received);
    }
}

ポイント:

  • 複数のスレッドが並行して計算を行い、その結果をメインスレッドで受信しています。
  • チャネルとスレッドを組み合わせることで、効率的な並行処理が実現されています。

練習問題

練習問題1: メッセージパッシングを使った並行処理

次のコードにスレッドを追加し、複数のタスクからメッセージを受け取るようにしてください。

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // ここにスレッドを追加

    for received in rx {
        println!("受信したメッセージ: {}", received);
    }
}

まとめ

今回の記事では、Rustの並行性を支えるスレッドとメッセージパッシングについて学びました。

スレッドを使って複数のタスクを並行して実行し、チャネルを使ったメッセージパッシングでスレッド間の通信を安全に行う方法を理解しました。

これらの技術を使うことで、並行処理を効率的に行うプログラムを構築することが可能です。

次回予告のデザイン

次回予告

次回は「Rustでの非同期プログラミング:asyncとawait」をテーマに、非同期処理を効率的に行う方法を学びます。
非同期関数とその活用方法について詳しく解説します。

Author