From b24c933e6cbb53a0ef5b8279d18e9fa7b0a3bc49 Mon Sep 17 00:00:00 2001 From: Peter Hart Date: Sun, 3 May 2020 21:27:36 -0400 Subject: [PATCH] copy of websocket chat example --- Cargo.lock | 231 +++++++++++++++++++++++++++++++++++++++ Cargo.toml | 19 +++- src/main.rs | 249 ++++++++++++++++++++++++++++++++++++++++-- src/server/mod.rs | 207 +++++++++++++++++++++++++++++++++++ static/websocket.html | 90 +++++++++++++++ 5 files changed, 786 insertions(+), 10 deletions(-) create mode 100644 src/server/mod.rs create mode 100644 static/websocket.html diff --git a/Cargo.lock b/Cargo.lock index 96f0765..78404b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,30 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "actix" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4af87564ff659dee8f9981540cac9418c45e910c8072fdedd643a262a38fcaf" +dependencies = [ + "actix-http", + "actix-rt", + "actix_derive", + "bitflags", + "bytes", + "crossbeam-channel", + "derive_more", + "futures", + "lazy_static", + "log", + "parking_lot", + "pin-project", + "smallvec", + "tokio", + "tokio-util 0.2.0", + "trust-dns-proto", + "trust-dns-resolver", +] + [[package]] name = "actix-codec" version = "0.2.0" @@ -34,6 +59,26 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "actix-files" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301482841d3d74483a446ead63cb7d362e187d2c8b603f13d91995621ea53c46" +dependencies = [ + "actix-http", + "actix-service", + "actix-web", + "bitflags", + "bytes", + "derive_more", + "futures", + "log", + "mime", + "mime_guess", + "percent-encoding", + "v_htmlescape", +] + [[package]] name = "actix-http" version = "1.0.1" @@ -248,6 +293,21 @@ dependencies = [ "url", ] +[[package]] +name = "actix-web-actors" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1bd41bd66c4e9b5274cec87aac30168e63d64e96fd19db38edef6b46ba2982" +dependencies = [ + "actix", + "actix-codec", + "actix-http", + "actix-web", + "bytes", + "futures", + "pin-project", +] + [[package]] name = "actix-web-codegen" version = "0.2.1" @@ -259,6 +319,17 @@ dependencies = [ "syn", ] +[[package]] +name = "actix_derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95aceadaf327f18f0df5962fedc1bde2f870566a0b9f65c89508a3b1f79334c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "adler32" version = "1.0.4" @@ -291,6 +362,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi 0.3.8", +] + [[package]] name = "autocfg" version = "1.0.0" @@ -442,6 +524,27 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" +dependencies = [ + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if", + "lazy_static", +] + [[package]] name = "derive_more" version = "0.99.5" @@ -486,6 +589,19 @@ dependencies = [ "syn", ] +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "failure" version = "0.1.7" @@ -718,6 +834,15 @@ version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + [[package]] name = "idna" version = "0.2.0" @@ -838,6 +963,12 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.3.3" @@ -850,6 +981,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mime_guess" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.3.6" @@ -912,6 +1053,16 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "nom" +version = "4.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ad2a91a8e869eeb30b9cb3119ae87773a8f4ae617f41b1eb9c154b2905f7bd6" +dependencies = [ + "memchr", + "version_check 0.1.5", +] + [[package]] name = "num-integer" version = "0.1.42" @@ -1248,8 +1399,27 @@ dependencies = [ name = "telestrations" version = "0.1.0" dependencies = [ + "actix", + "actix-files", "actix-rt", "actix-web", + "actix-web-actors", + "byteorder", + "bytes", + "env_logger", + "futures", + "rand", + "serde", + "serde_json", +] + +[[package]] +name = "termcolor" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" +dependencies = [ + "winapi-util", ] [[package]] @@ -1368,6 +1538,15 @@ dependencies = [ "trust-dns-proto", ] +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check 0.9.1", +] + [[package]] name = "unicode-bidi" version = "0.3.4" @@ -1409,6 +1588,49 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "v_escape" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "660b101c07b5d0863deb9e7fb3138777e858d6d2a79f9e6049a27d1cc77c6da6" +dependencies = [ + "v_escape_derive", +] + +[[package]] +name = "v_escape_derive" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2ca2a14bc3fc5b64d188b087a7d3a927df87b152e941ccfbc66672e20c467ae" +dependencies = [ + "nom", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "v_htmlescape" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e33e939c0d8cf047514fb6ba7d5aac78bc56677a6938b2ee67000b91f2e97e41" +dependencies = [ + "cfg-if", + "v_escape", +] + +[[package]] +name = "version_check" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" + +[[package]] +name = "version_check" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce" + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -1449,6 +1671,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi 0.3.8", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index aadffa0..e2821ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,23 @@ version = "0.1.0" authors = ["Peter Hart "] edition = "2018" +[[bin]] +name = "telestrations-server" +path = "src/main.rs" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-web = "2" -actix-rt = "1" \ No newline at end of file +actix-rt = "1.0.0" +actix = "0.9.0" +actix-web = "2.0.0" +actix-web-actors = "2.0.0" +actix-files = "0.2.1" + +rand = "0.7" +bytes = "0.5.3" +byteorder = "1.1" +futures = "0.3.1" +env_logger = "0.7" +serde = "1.0" +serde_json = "1.0" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 00b6ede..9ce0635 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,247 @@ -use actix_web::{get, web, App, HttpServer, Responder}; +use std::time::{Duration, Instant}; -#[get("/{id}/{name}/index.html")] -async fn index(info: web::Path<(u32, String)>) -> impl Responder { - format!("Hello {}! id:{}", info.1, info.0) +use actix::*; +use actix_files as fs; +use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; +use actix_web_actors::ws; + +mod server; + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Entry point for our route +async fn chat_route( + req: HttpRequest, + stream: web::Payload, + srv: web::Data>, +) -> Result { + ws::start( + WsChatSession { + id: 0, + hb: Instant::now(), + room: "Main".to_owned(), + name: None, + addr: srv.get_ref().clone(), + }, + &req, + stream, + ) +} + +struct WsChatSession { + /// unique session id + id: usize, + /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// otherwise we drop connection. + hb: Instant, + /// joined room + room: String, + /// peer name + name: Option, + /// Chat server + addr: Addr, +} + +impl Actor for WsChatSession { + type Context = ws::WebsocketContext; + + /// Method is called on actor start. + /// We register ws session with ChatServer + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); + + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + // HttpContext::state() is instance of WsChatSessionState, state is shared + // across all routes within application + let addr = ctx.address(); + self.addr + .send(server::Connect { + addr: addr.recipient(), + }) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + fut::ready(()) + }) + .wait(ctx); + } + + fn stopping(&mut self, _: &mut Self::Context) -> Running { + // notify chat server + self.addr.do_send(server::Disconnect { id: self.id }); + Running::Stop + } +} + +/// Handle messages from chat server, we simply send it to peer websocket +impl Handler for WsChatSession { + type Result = (); + + fn handle(&mut self, msg: server::Message, ctx: &mut Self::Context) { + ctx.text(msg.0); + } +} + +/// WebSocket message handler +impl StreamHandler> for WsChatSession { + fn handle( + &mut self, + msg: Result, + ctx: &mut Self::Context, + ) { + let msg = match msg { + Err(_) => { + ctx.stop(); + return; + } + Ok(msg) => msg, + }; + + println!("WEBSOCKET MESSAGE: {:?}", msg); + match msg { + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.hb = Instant::now(); + } + ws::Message::Text(text) => { + let m = text.trim(); + // we check for /sss type of messages + if m.starts_with('/') { + let v: Vec<&str> = m.splitn(2, ' ').collect(); + match v[0] { + "/list" => { + // Send ListRooms message to chat server and wait for + // response + println!("List rooms"); + self.addr + .send(server::ListRooms) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(rooms) => { + for room in rooms { + ctx.text(room); + } + } + _ => println!("Something is wrong"), + } + fut::ready(()) + }) + .wait(ctx) + // .wait(ctx) pauses all events in context, + // so actor wont receive any new messages until it get list + // of rooms back + } + "/join" => { + if v.len() == 2 { + self.room = v[1].to_owned(); + self.addr.do_send(server::Join { + id: self.id, + name: self.room.clone(), + }); + + ctx.text("joined"); + } else { + ctx.text("!!! room name is required"); + } + } + "/name" => { + if v.len() == 2 { + self.name = Some(v[1].to_owned()); + } else { + ctx.text("!!! name is required"); + } + } + _ => ctx.text(format!("!!! unknown command: {:?}", m)), + } + } else { + let msg = if let Some(ref name) = self.name { + format!("{}: {}", name, m) + } else { + m.to_owned() + }; + // send message to chat server + self.addr.do_send(server::ClientMessage { + id: self.id, + msg, + room: self.room.clone(), + }) + } + } + ws::Message::Binary(_) => println!("Unexpected binary"), + ws::Message::Close(_) => { + ctx.stop(); + } + ws::Message::Continuation(_) => { + ctx.stop(); + } + ws::Message::Nop => (), + } + } +} + +impl WsChatSession { + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + + // notify chat server + act.addr.do_send(server::Disconnect { id: act.id }); + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } + + ctx.ping(b""); + }); + } } #[actix_rt::main] async fn main() -> std::io::Result<()> { - HttpServer::new(|| App::new().service(index)) - .bind("0.0.0.0:8080")? - .run() - .await + env_logger::init(); + + // Start chat server actor + let server = server::ChatServer::default().start(); + + // Create Http server with websocket support + HttpServer::new(move || { + App::new() + .data(server.clone()) + // redirect to websocket.html + .service(web::resource("/").route(web::get().to(|| { + HttpResponse::Found() + .header("LOCATION", "/static/websocket.html") + .finish() + }))) + // websocket + .service(web::resource("/ws/").to(chat_route)) + // static resources + .service(fs::Files::new("/static/", "static/")) + }) + .bind("0.0.0.0:8080")? + .run() + .await } \ No newline at end of file diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..9e9fae4 --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,207 @@ + +//! `ChatServer` is an actor. It maintains list of connection client session. +//! And manages available rooms. Peers send messages to other peers in same +//! room through `ChatServer`. + +use actix::prelude::*; +use rand::{self, rngs::ThreadRng, Rng}; +use std::collections::{HashMap, HashSet}; + +/// Chat server sends this messages to session +#[derive(Message)] +#[rtype(result = "()")] +pub struct Message(pub String); + +/// Message for chat server communications + +/// New chat session is created +#[derive(Message)] +#[rtype(usize)] +pub struct Connect { + pub addr: Recipient, +} + +/// Session is disconnected +#[derive(Message)] +#[rtype(result = "()")] +pub struct Disconnect { + pub id: usize, +} + +/// Send message to specific room +#[derive(Message)] +#[rtype(result = "()")] +pub struct ClientMessage { + /// Id of the client session + pub id: usize, + /// Peer message + pub msg: String, + /// Room name + pub room: String, +} + +/// List of available rooms +pub struct ListRooms; + +impl actix::Message for ListRooms { + type Result = Vec; +} + +/// Join room, if room does not exists create new one. +#[derive(Message)] +#[rtype(result = "()")] +pub struct Join { + /// Client id + pub id: usize, + /// Room name + pub name: String, +} + +/// `ChatServer` manages chat rooms and responsible for coordinating chat +/// session. implementation is super primitive +pub struct ChatServer { + sessions: HashMap>, + rooms: HashMap>, + rng: ThreadRng, +} + +impl Default for ChatServer { + fn default() -> ChatServer { + // default room + let mut rooms = HashMap::new(); + rooms.insert("Main".to_owned(), HashSet::new()); + + ChatServer { + sessions: HashMap::new(), + rooms, + rng: rand::thread_rng(), + } + } +} + +impl ChatServer { + /// Send message to all users in the room + fn send_message(&self, room: &str, message: &str, skip_id: usize) { + if let Some(sessions) = self.rooms.get(room) { + for id in sessions { + if *id != skip_id { + if let Some(addr) = self.sessions.get(id) { + let _ = addr.do_send(Message(message.to_owned())); + } + } + } + } + } +} + +/// Make actor from `ChatServer` +impl Actor for ChatServer { + /// We are going to use simple Context, we just need ability to communicate + /// with other actors. + type Context = Context; +} + +/// Handler for Connect message. +/// +/// Register new session and assign unique id to this session +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { + println!("Someone joined"); + + // notify all users in same room + self.send_message(&"Main".to_owned(), "Someone joined", 0); + + // register session with random id + let id = self.rng.gen::(); + self.sessions.insert(id, msg.addr); + + // auto join session to Main room + self.rooms + .entry("Main".to_owned()) + .or_insert(HashSet::new()) + .insert(id); + + // send id back + id + } +} + +/// Handler for Disconnect message. +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Disconnect, _: &mut Context) { + println!("Someone disconnected"); + + let mut rooms: Vec = Vec::new(); + + // remove address + if self.sessions.remove(&msg.id).is_some() { + // remove session from all rooms + for (name, sessions) in &mut self.rooms { + if sessions.remove(&msg.id) { + rooms.push(name.to_owned()); + } + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0); + } + } +} + +/// Handler for Message message. +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: ClientMessage, _: &mut Context) { + self.send_message(&msg.room, msg.msg.as_str(), msg.id); + } +} + +/// Handler for `ListRooms` message. +impl Handler for ChatServer { + type Result = MessageResult; + + fn handle(&mut self, _: ListRooms, _: &mut Context) -> Self::Result { + let mut rooms = Vec::new(); + + for key in self.rooms.keys() { + rooms.push(key.to_owned()) + } + + MessageResult(rooms) + } +} + +/// Join room, send disconnect message to old room +/// send join message to new room +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Join, _: &mut Context) { + let Join { id, name } = msg; + let mut rooms = Vec::new(); + + // remove session from all rooms + for (n, sessions) in &mut self.rooms { + if sessions.remove(&id) { + rooms.push(n.to_owned()); + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0); + } + + self.rooms + .entry(name.clone()) + .or_insert(HashSet::new()) + .insert(id); + + self.send_message(&name, "Someone connected", id); + } +} diff --git a/static/websocket.html b/static/websocket.html new file mode 100644 index 0000000..b2fbfcd --- /dev/null +++ b/static/websocket.html @@ -0,0 +1,90 @@ + + + + + + + + +

Chat!

+
+  | Status: + disconnected +
+
+
+
+ + +
+ + \ No newline at end of file