Rustで気軽にWebSocketサーバを作ったら意外と大変だった件

Rustその2 Advent Calendar 2019

目次

概要

この記事はRustその2 Advent Calendar 2019の16日目です。17日に若干時間はみ出ていますが気にせずいきましょう()

誰?

Rustは今年の夏ぐらいから興味持ってちょこちょこやってる morifuji です。actix-webちょこっと触ったりとかスクレイピングで遊んだりぐらいしかないです。実務未経験の初心者です。(下記のスライドはこの前のLTで発表した内容です)

https://speakerdeck.com/diggymo/she-nei-turuwo-rustnishu-kihuan-etemitajian

やりたいこと

ブラウザーのgetDisplay というAPIを最近ちょこちょこ見て、WebRTCと組み合わせたらオモシロソウダナーと思ったので、Rustで気軽にWebSocketサーバを作ってみようかと思ったのでした。

NodejsのSocket.ioがとても有名ですが、Rustにも ws-rsというライブラリがあったので勉強がてらに使ってみました。

やりたいこととしてはこれだけです。

  • ユーザーAが接続する・ユーザーBが接続する
  • ユーザーAが「みなさんこんにちは」と送信する
  • →Bに「みなさんこんにちは」が届く
  • ユーザーCも接続する
  • ユーザーAが「みなさんこんにちは」と送信する
  • →BとCに「みなさんこんにちは」が届く

ですが、ws-rsは思った以上にシンプルで、メッセージを送信する処理は2種類のみです。

  1. 1接続クライアントにメッセージを送る(send()
  2. 全接続クライアントにメッセージを送る(broadcast()

なので、今回やりたい「自分以外のユーザーのみに送信」ができない問題がありました。今回はそれを泥臭く実装する方法について書きました。

クライアントの識別子がない

ws-rsでは、 一つのクライアントを、一つの構造体で表します。

struct Client {
    out: Sender,
}

impl Handler for Client {

    fn on_open(&mut self, _: Handshake) -> Result<()> {
        self.out.send("Hello WebSocket")
    }

    fn on_message(&mut self, msg: Message) -> Result<()> {
        println!("Got message: {}", msg);
        self.out.close(CloseCode::Normal)
    }
}

fn main() {
  listen("ws://127.0.0.1:3012", |out| Client { out: out } ).unwrap()
}

なので、このサンプルでは、on_messageが発火しても、ユーザーが誰なのか全く区別がつかない問題がありました。

ドキュメントには token() のようなそれらしい関数はあったのですが、連番でクライアントを割り振り、途中でクライアントが減った場合はその穴抜けの番号が次に入るクライアントに使いまわされていたため、今回のクライアントを識別する目的では使用できませんでした。

なので、自力でClientにuser_idを手動で付与してやりました

struct Client {
  out: Sender,
  user_id: u32
}

impl Handler for Client {
  # ...

  fn on_message(&mut self, msg: Message) -> Result<()> {
    println!("My id is {:?}", self.user_id);
  }
}

fn main() {
  listen("127.0.0.1:3012", move |out| { 
    let user_id = rand::thread_rng().gen();
    Client {
      out: out,
      user_id: user_id
    }
  }).unwrap()
}

これで、メッセージを送ってきたクライアントを識別することができました。

他のクライアントに対してメッセージを送る

上記の実装でメッセージを送ってきたクライアントは識別できたので、あとは接続している全ユーザーのidを取得して、そこから自分以外のクライアントにメッセージを送るだけ!…と思っていたのですがまだまだ大変でした。

接続しているクライアントの一覧は、おそらくwebsocketのルートのインスタンスから取得できるはず!と思ってドキュメントを眺めましたが、全く見当たらず。。。

これも自分で実装する形になりました。

理想ではこんな感じ(このコードは実際には動きません)

struct Client {
  out: Sender,
  user_id: u32,
  client_list: Vec<Client>
}

impl Handler for Client {

    fn on_open(&mut self, _: Handshake) -> Result<()> {
        // ①接続されたら、リストに追加する
        self.client_list.push(self);
    }

    fn on_message(&mut self, msg: Message) -> Result<()> {
        // 自分以外に送信
        self.client_list.iter().filter(|*v| v.user_id == self.user_id).foreach(|v|{
          v.out.send("Joined {:?}!", self.user_id);
        });
    }
}

fn main() {
  // 最初に一度初期化して、
  let client_list = vec!()
  listen("ws://127.0.0.1:3012", |out| {
    Client {
      out: out,
      user_id: rand::thread_rng().gen(),
      // ②各ユーザーのプロパティにvecを持たせる
      client_list: client_list
    }
  }).unwrap()
}

これをそのまま動かそうとすると selfのプロパティにselfを突っ込むことは可能なのかどうかが怪しいですね。。確かRustでフィボナッチ数列を表現するときも、無限にサイズが決まらなくてエラーになったはずで、それと似た危険な香りがしました。

ダメ元で試しましたがやっぱり無理でした。こんな感じで書き殴ってみましたが見事にエラーが出ました

struct A {
    id: u32,
    classmates: Vec<A>
}

impl A {
    fn add_friends(self) {
        self.classmates.push(self);
    }
}
fn main() {
    let yamada = A {
        id: 1,
        classmates: vec!()
    };
    
    yamada.add_friends();
}
error[E0596]: cannot borrow `self.classmates` as mutable, as `self` is not declared as mutable
 --> src/test.rs:9:9
  |
8 |     fn add_friends(self) {
  |                    ---- help: consider changing this to be mutable: `mut self`
9 |         self.classmates.push(self);
  |         ^^^^^^^^^^^^^^^ cannot borrow as mutable

error[E0505]: cannot move out of `self` because it is borrowed
 --> src/test.rs:9:30
  |
9 |         self.classmates.push(self);
  |         --------------- ---- ^^^^ move out of `self` occurs here
  |         |               |
  |         |               borrow later used by call
  |         borrow of `self.classmates` occurs here

なので、listへの追加処理は on_open ではなくlistenに書くことにしました。ですが今度はclientが二重で借用されてエラーが出ました。

fn main() {
  // 最初に一度初期化して、
  let client_list = vec!()
  listen("ws://127.0.0.1:3012", |out| {
    let client = Client {
      out: out,
      user_id: rand::thread_rng().gen(),
      // ②各ユーザーのプロパティにvecを持たせる
      client_list: client_list
    };
    client_list.push(client);

    client;
  }).unwrap()
}

前に非同期のサーバーをactix-webで実装した際は、借用規則を破るのなら Arc<Mutex<>>が使えるだろうと思い込んで実装しましたが、結局無駄でした。今回の場合はシングルスレッドなので Rc<RefCell<>>でいけるみたいです(多分RefCellはMutexでも代用可能?だとは思いますが…おそらくシングルスレッドならばRefCellが最善の選択のはず?..)

結局こんな感じのコードになりました。

#[derive(Clone, Debug)]
struct Client {
    out: Sender,
    user_id: u32,
    client_list: Rc<RefCell<Vec<Client>>>,
}

impl Handler for Client {
    fn on_message(&mut self, msg: Message) -> ws::Result<()> {
        self.client_list.borrow_mut().iter().filter(|user| user.user_id != self.user_id)
        .for_each(|user|{
             user.out.send(msg.clone());
        });
        Ok(())
    }

    fn on_open<'a>(&'a mut self, _: Handshake) -> ws::Result<()> {
      print!("current user is below.");
      self.client_list.borrow_mut().iter().for_each(|client|{
          println!("{:?}", client.user_id);
      });
    })
}

fn main() {
  let client_list = Rc::new(RefCell::new(vec!()));

  listen("127.0.0.1:3012", move |out| { 
    let client = Client { 
      out: out.clone(),
      user_id: rand::thread_rng().gen(),
      client_list: client_list.clone(),
    };
    client_list.borrow_mut().push(client.clone());

    client
  }).unwrap()
} 

追記

Arc<Mutex>でも実装可能でした。borrow_mut()の箇所を lock().unwrap()にするだけでした。ただ、中身の処理は MutexRefCellで別物だと思うのでもう少しお勉強しないといけないですね…

#[derive(Clone, Debug)]
struct Client {
    out: Sender,
    user_id: u32,
    // client_list: Rc<RefCell<Vec<Client>>>,
    client_list: Arc<Mutex<Vec<Client>>>,
}

impl Handler for Client {
    fn on_message(&mut self, msg: Message) -> ws::Result<()> {
        self.client_list.lock().unwrap().iter().filter(|user| user.user_id != self.user_id)
        .for_each(|user|{
             user.out.send(msg.clone());
        });
        Ok(())
    }

    fn on_open<'a>(&'a mut self, _: Handshake) -> ws::Result<()> {
      println!("current user is below.");
      self.client_list.lock().unwrap().iter().for_each(|client|{
          print!("{:?}, ", client.user_id);
      });
      println!("");

      Ok(())
    }
}

fn main() {
  let client_list = Arc::new(Mutex::new(vec!()));

  listen("127.0.0.1:3012", move |out| { 
    let client = Client { 
      out: out.clone(),
      user_id: rand::thread_rng().gen(),
      client_list: client_list.clone(),
    };
    client_list.lock().unwrap().push(client.clone());

    client
  }).unwrap()
} 

このさいせっかくなんで、 Arc<Mutex>Rc<RefCell> での両者のパフォーマンスの測定をしようと思い、9999回websocketでサーバーにメッセージを送って、別のクライアントに届くまでの時間を測定しようと思いましたが、なぜか499回あたりでWebSocketの送信が止まってしまいました、、、

Macのネットワークの問題なような気もするんですが、原因がわからず,,詳しい方ご意見ください 🙇‍♂️

use ws::{connect, CloseCode, Sender};

 fn main() {
     connect("ws://127.0.0.1:3012", |out| {
        for i in 0..9999 {
            println!("{}", i);  # 出力が499で止まる
            out.send("hello");
        }
 
         move |msg: ws::Message| {
             Ok(())
         }
     });
 }

まとめ

  • ws-rsはシンプルなAPIのみ提供していた。
  • シングルスレッドで複数所有のポインタを使いたい場合は Rc<RefCell> を使うことを覚えた
  • マルチスレッドで複数所有のポインタを使いたい場合は Arc<Mutex> を使うことを覚えた
  • ただし仕組みは違うので、使い所を正しく理解して使ったほうが良さそう
    • ポインタ・保証範囲についてのドキュメント→https://ykomatsu.github.io/rust/book-ja/choosing-your-guarantees.html
  • Rustに時間が取られすぎて肝心のgetDisplayは触れなかった

明日17日はblockさんです!!


See also