難しすぎてついていけない。もうただのコピペマン。
成果物
参考
シングルスレッドの問題
最初の接続が処理し終えるまで、2番目の接続を処理しない。リクエスト量が増えると処理するまでの遅延が膨大になってしまう。
use std::io::prelude::*; use std::net::TcpStream; use std::net::TcpListener; use std::fs::File; use std::thread; use std::time::Duration; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let mut file = File::open(filename).unwrap(); let mut contents = String::new(); file.read_to_string(&mut contents).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
cargo run
127.0.0.1:7878/sleep
にアクセス127.0.0.1:7878
にアクセス- 1から5秒間経過しないと2の応答が帰ってこない!
スレッドプール
スレッドプールは、待機し、タスクを処理する準備のできた一塊りの大量に生成されたスレッドです。
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } }
これはスレッドを無数に作るのでメモリを食いつぶしてシステムがフリーズしてしまう。
有限数のスレッド
以下のようなインタフェースで有限数のスレッドを作成したい。
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } }
コンパイラ駆動開発でThreadPool
構造体を実装する
ThreadPool
の実装はないのでエラー。
$ cargo run Compiling hello v0.1.0 (/tmp/work/Rust.HttpServer.MultiThread.20190709092539/src/2/hello) error[E0433]: failed to resolve: use of undeclared type or module `ThreadPool` --> src/main.rs:13:16 | 13 | let pool = ThreadPool::new(4); | ^^^^^^^^^^ use of undeclared type or module `ThreadPool`
コンパイルエラーを元に開発を進めていく。
src/lib.rs
pub struct ThreadPool;
src/main.rs
use hello::ThreadPool;
$ cargo run ... error[E0599]: no function or associated item named `new` found for type `hello::ThreadPool` in the current scope --> src/main.rs:14:28 | 14 | let pool = ThreadPool::new(4); | ------------^^^ | | | function or associated item not found in `hello::ThreadPool`
new
という関連関数がないと怒られる。以下を追記。
impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } }
$ cargo run ... warning: unused variable: `size` --> src/lib.rs:3:16 | 3 | pub fn new(size: usize) -> ThreadPool { | ^^^^ help: consider prefixing with an underscore: `_size` | = note: #[warn(unused_variables)] on by default error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope --> src/main.rs:17:14 | 17 | pool.execute(|| { | ^^^^^^^
impl ThreadPool { pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { }
エラーが消えて警告のみになる。
$ cargo run ... warning: unused variable: `size` --> src/lib.rs:3:16 | 3 | pub fn new(size: usize) -> ThreadPool { | ^^^^ help: consider prefixing with an underscore: `_size` | = note: #[warn(unused_variables)] on by default warning: unused variable: `f` --> src/lib.rs:6:30 | 6 | pub fn execute<F>(&self, f: F) | ^ help: consider prefixing with an underscore: `_f`
new
でスレッド数を検査する
impl ThreadPool { /// スレッドプールを生成する。 /// /// * size: プール数 /// /// # panic /// /// sizeが0ならパニックする。 pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool }
スレッドを格納するスペースを生成する
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T + Send + 'static, T: Send + 'static
use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // スレッドを生成してベクタに格納する // create some threads and store them in the vector } ThreadPool { threads } } // --snip-- }
Worker
構造体: ThreadPool
からスレッドにコードを送信する
src/lib.rs
pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { /// スレッドプールを生成する。 /// /// * size: プール数 /// /// # panic /// /// sizeが0ならパニックする。 pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } }
struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread, } } }
チャンネル経由でスレッドにリクエストを送信する
use std::thread; use std::sync::mpsc; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { /// スレッドプールを生成する。 /// /// * size: プール数 /// /// # panic /// /// sizeが0ならパニックする。 pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender, } } pub fn execute<F>(&self, f: F) // pub fn execute<F>(&self, f: F) -> JoinHandle<T> where F: FnOnce() + Send + 'static { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: mpsc::Receiever<Job>) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread, } } }
$ cargo run ... error[E0382]: use of moved value: `receiver` --> src/lib.rs:21:42 | 18 | let (sender, receiver) = mpsc::channel(); | -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait ... 21 | workers.push(Worker::new(id, receiver)); | ^^^^^^^^ value moved here, in previous iteration of loop
receiver
を複数のWorker
インスタンスに渡そうとしているためエラー。そこで、複数のスレッドに所有権をもたせつつ可変性ももたせるためArc<Mutex<T>>
を使う。
use std::sync::Arc; use std::sync::Mutex; // --snip-- impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender, } } // --snip-- } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- } }
execute
メソッドを実装する
型エイリアスで型を短くする。
type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(job).unwrap(); } }
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); (*job)(); } }); Worker { id, thread, } } }
$ cargo run ... error[E0161]: cannot move a value of type dyn std::ops::FnOnce() + std::marker::Send: the size of dyn std::ops::FnOnce() + std::marker::Send cannot be statically determined --> src/lib.rs:52:17 | 52 | (*job)(); | ^^^^^^
use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<FnBox + Send + 'static>; impl ThreadPool { /// スレッドプールを生成する。 /// /// * size: プール数 /// /// # panic /// /// sizeが0ならパニックする。 pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender, } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread, } } }
$ cargo run
ブラウザでhttp://127.0.0.1:7878
をリロードしまくる。以下のような出力が得られた。
Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing.
所感
前回からもうまったく完全に一切これっぽっちも、ついていけない。難易度高すぎ。Rustってこんなムズいのか。吐きそう。
対象環境
- Raspbierry pi 3 Model B+
- Raspbian stretch 9.0 2018-11-13
- bash 4.4.12(1)-release
- rustc 1.34.2 (6c2484dc3 2019-05-13)
- cargo 1.34.0 (6789d8a0a 2019-04-01)
$ uname -a Linux raspberrypi 4.19.42-v7+ #1219 SMP Tue May 14 21:20:58 BST 2019 armv7l GNU/Linux
前回まで
- Rustを学んでみたい(プログラミング言語)
- Rustの環境構築
- RustでHelloWorld
- Rustの和訳ドキュメント
- Cargoでプロジェクト作成・ビルド・実行
- クレートとは?
- Rustで関数を使ってみる
- Rustでモジュールを使ってみる
- Rustで乱数を生成する(rand)
- Rustで標準入力する(std::io::stdin().read_line())
- RustでMatch判定する(match)
- Rustでprintとread_lineを1行にする方法
- Rustで数当てゲーム
- クレート名にドット.が使えない
- Rustの変数と可変性(let, mut) error[E0384]: cannot assign twice to immutable variable
x
- Rustのimmutable束縛とconst定数の違い
- RustのREPL、evcxrのインストールに失敗した
- Rustでコンパイルするときの変数未使用warningを消す
- Rustの変数(再代入、再宣言(シャドーイング))
- Rustのシャドーイングについて
- イミュータブルについて(副作用)
- Rustの定数(const)
- Rustのデータ型(数値)
- Rustのデータ型(論理)
- Rustのデータ型(文字)
- Rustのデータ型(タプル)
- Rustのデータ型(配列)
- Rustの関数
- Rustのif式
- Rustのくりかえし文(loop)
- Rustのくりかえし文(while)
- Rustのくりかえし文(for)
- Rustの所有権(ムーブ)
- Rustの所有権(関数)
- Rustの所有権(スライス)
- Rustの構造体(定義とインスタンス化)
- Rustの構造体(プログラム例)
- Rustの構造体(メソッド)
- Rustの列挙型(enum)
- Rustの列挙型(enum)
- Rustの列挙型(enum)
- Rustのmatch(制御フロー演算子)
- RustでNULLを扱う(Option, Some, None)
- NULL参照は10億ドルの失敗だった
- Rustの列挙型に独自表示を実装する(E0277 対策 std::fmt::Display 実装)
- RustのIfLet(matchの糖衣構文)
- Rustのプロジェクト構造
- Rustのcargoでライブラリ&テスト(単体、結合)
- Rustのモジュール(mod)
- Rustのモジュール(pub)
- Rustのmod参照方法(
mod 子モジュール名;
,use 要素名;
,extern crate クレート名;
,super
) - Rustのインポートまとめ(Rust2018)
- RustのコレクションVec型
- RustのコレクションString型
- RustのコレクションHashMap型
- Rustのコレクション(練習問題)
- Rustのエラー処理
- Rustのジェネリクス
- Rustのトレイト
- Rustのライフタイム1
- Rustのライフタイム2(構造体の定義)
- Rustのライフタイム3(ライフタイム省略)
- Rustのライフタイム4(impl定義)
- Rustの静的ライフタイム5('static)
- Rustのライフタイム6(ジェネリクス、トレイト境界とともに)
- Rustのテストコードを書く
- Rustのテスト実行
- Rustのテスト体系化
- Rustでコマンドライン引数を受け取る
- Rustのファイル読込
- Rustでリファクタリング(モジュール性とエラー処理の向上)
- Rustでテスト駆動開発
- Rustで環境変数を取得する
- RustでStdErr出力
- Rustのクロージャ
- Rustのイテレータ
- Rustのイテレータ(Minigrep改善)
- Rustのイテレータ(パフォーマンス)
- Rustのイテレータ(Minigrep改善)
- Rustのcargo(ビルドのカスタマイズ)
- Rustのcargo(cargo docでドキュメント生成)
- Rustのエクスポート(pub use)
- Rustのクレートを公開する方法(crates.io)
- Rustのcargoでワークスペースをつくる
- Rustのcargo installでバイナリをインストールする
- Rustのcargoを拡張する方法
- Rustのスマートポインタ
- スマートポインタBox
- Rustのスマートポインタ(Derefトレイト)
- Rustのスマートポインタ(Dropトレイト)
- Rustのスマートポインタ(Rc
) - Rustのスマートポインタ(RefCell
) - Rustのスマートポインタ(Weak
) - Rustのスレッド
- Rustのスレッド(メッセージ送受信)
- Rustのスレッド(Mutex、Arc)
- Rustのスレッド(Send、Syncトレイト)
- Rustのオブジェクト指向
- RustのOOP(トレイトオブジェクト)
- Rustのオブジェクト指向(デザインパターン)
- Rustのパターン
- Rustのパターン(論駁可能性)
- Rustパターン(記法)
- Rustの高度な機能(Unsafe Rust)
- Rustの高度な機能(ライフタイム)
- Rustの高度な機能(トレイト)
- Rustの高度な機能(型)
- Rustの高度な機能(関数、クロージャ)
- Rustのサーバ(シングルスレッド)