やってみる

アウトプットすべく己を導くためのブログ。その試行錯誤すらたれ流す。

Rustのサーバ(マルチスレッド)

 難しすぎてついていけない。もうただのコピペマン。

成果物

参考

シングルスレッドの問題

 最初の接続が処理し終えるまで、2番目の接続を処理しない。リクエスト量が増えると処理するまでの遅延が膨大になってしまう。

use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;
use std::fs::File;
use std::thread;
use std::time::Duration;
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        handle_connection(stream);
    }
}
fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let mut file = File::open(filename).unwrap();
    let mut contents = String::new();
    file.read_to_string(&mut contents).unwrap();
    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}
cargo run
  1. 127.0.0.1:7878/sleepにアクセス
  2. 127.0.0.1:7878にアクセス
  3. 1から5秒間経過しないと2の応答が帰ってこない!

スレッドプール

スレッドプールは、待機し、タスクを処理する準備のできた一塊りの大量に生成されたスレッドです。

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

 これはスレッドを無数に作るのでメモリを食いつぶしてシステムがフリーズしてしまう。

有限数のスレッド

 以下のようなインタフェースで有限数のスレッドを作成したい。

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

コンパイラ駆動開発でThreadPool構造体を実装する

 ThreadPoolの実装はないのでエラー。

$ cargo run
   Compiling hello v0.1.0 (/tmp/work/Rust.HttpServer.MultiThread.20190709092539/src/2/hello)
error[E0433]: failed to resolve: use of undeclared type or module `ThreadPool`
  --> src/main.rs:13:16
   |
13 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type or module `ThreadPool`

 コンパイルエラーを元に開発を進めていく。

src/lib.rs

pub struct ThreadPool;

src/main.rs

use hello::ThreadPool;
$ cargo run
...
error[E0599]: no function or associated item named `new` found for type `hello::ThreadPool` in the current scope
  --> src/main.rs:14:28
   |
14 |     let pool = ThreadPool::new(4);
   |                ------------^^^
   |                |
   |                function or associated item not found in `hello::ThreadPool`

 newという関連関数がないと怒られる。以下を追記。

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool 
    }
}
$ cargo run
...
warning: unused variable: `size`
 --> src/lib.rs:3:16
  |
3 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^ help: consider prefixing with an underscore: `_size`
  |
  = note: #[warn(unused_variables)] on by default

error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^
impl ThreadPool {
    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }

 エラーが消えて警告のみになる。

$ cargo run
...
warning: unused variable: `size`
 --> src/lib.rs:3:16
  |
3 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^ help: consider prefixing with an underscore: `_size`
  |
  = note: #[warn(unused_variables)] on by default

warning: unused variable: `f`
 --> src/lib.rs:6:30
  |
6 |     pub fn execute<F>(&self, f: F)
  |                              ^ help: consider prefixing with an underscore: `_f`

newでスレッド数を検査する

impl ThreadPool {
    /// スレッドプールを生成する。
    ///
    /// * size: プール数
    ///
    /// # panic
    ///
    /// sizeが0ならパニックする。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        ThreadPool 
    }

スレッドを格納するスペースを生成する

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let mut threads = Vec::with_capacity(size);
        for _ in 0..size {
            // スレッドを生成してベクタに格納する
            // create some threads and store them in the vector
        }
        ThreadPool {
            threads
        }
    }
    // --snip--
}

Worker構造体: ThreadPoolからスレッドにコードを送信する

src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
}
impl ThreadPool {
    /// スレッドプールを生成する。
    ///
    /// * size: プール数
    ///
    /// # panic
    ///
    /// sizeが0ならパニックする。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id));
        }
        ThreadPool {
            workers
        }
    }
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});
        Worker {
            id,
            thread,
        }
    }
}
チャンネル経由でスレッドにリクエストを送信する
use std::thread;
use std::sync::mpsc;
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
    /// スレッドプールを生成する。
    ///
    /// * size: プール数
    ///
    /// # panic
    ///
    /// sizeが0ならパニックする。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let (sender, receiver) = mpsc::channel();
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }
        ThreadPool {
            workers,
            sender,
        }
    }
    pub fn execute<F>(&self, f: F)
//    pub fn execute<F>(&self, f: F) -> JoinHandle<T>
        where
            F: FnOnce() + Send + 'static
    {

    }
}
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
impl Worker {
    fn new(id: usize, receiver: mpsc::Receiever<Job>) -> Worker {
        let thread = thread::spawn(|| {});
        Worker {
            id,
            thread,
        }
    }
}
$ cargo run
...
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:21:42
   |
18 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
21 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop

 receiverを複数のWorkerインスタンスに渡そうとしているためエラー。そこで、複数のスレッドに所有権をもたせつつ可変性ももたせるためArc<Mutex<T>>を使う。

use std::sync::Arc;
use std::sync::Mutex;
// --snip--
impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
    }
}
executeメソッドを実装する

 型エイリアスで型を短くする。

type Job = Box<FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();
                println!("Worker {} got a job; executing.", id);
                (*job)();
            }
        });
        Worker {
            id,
            thread,
        }
    }
}
$ cargo run
...
error[E0161]: cannot move a value of type dyn std::ops::FnOnce() + std::marker::Send: the size of dyn std::ops::FnOnce() + std::marker::Send cannot be statically determined
  --> src/lib.rs:52:17
   |
52 |                 (*job)();
   |                 ^^^^^^
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
trait FnBox {
    fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}
type Job = Box<FnBox + Send + 'static>;
impl ThreadPool {
    /// スレッドプールを生成する。
    ///
    /// * size: プール数
    ///
    /// # panic
    ///
    /// sizeが0ならパニックする。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        ThreadPool {
            workers,
            sender,
        }
    }
    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();
                println!("Worker {} got a job; executing.", id);
                job.call_box();
            }
        });
        Worker {
            id,
            thread,
        }
    }
}
$ cargo run

 ブラウザでhttp://127.0.0.1:7878をリロードしまくる。以下のような出力が得られた。

Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.

所感

 前回からもうまったく完全に一切これっぽっちも、ついていけない。難易度高すぎ。Rustってこんなムズいのか。吐きそう。

対象環境

$ uname -a
Linux raspberrypi 4.19.42-v7+ #1219 SMP Tue May 14 21:20:58 BST 2019 armv7l GNU/Linux

前回まで