copy of websocket chat example

This commit is contained in:
Peter Hart 2020-05-03 21:27:36 -04:00
parent 0c191fb691
commit b24c933e6c
5 changed files with 786 additions and 10 deletions

231
Cargo.lock generated
View File

@ -1,5 +1,30 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "actix"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4af87564ff659dee8f9981540cac9418c45e910c8072fdedd643a262a38fcaf"
dependencies = [
"actix-http",
"actix-rt",
"actix_derive",
"bitflags",
"bytes",
"crossbeam-channel",
"derive_more",
"futures",
"lazy_static",
"log",
"parking_lot",
"pin-project",
"smallvec",
"tokio",
"tokio-util 0.2.0",
"trust-dns-proto",
"trust-dns-resolver",
]
[[package]]
name = "actix-codec"
version = "0.2.0"
@ -34,6 +59,26 @@ dependencies = [
"trust-dns-resolver",
]
[[package]]
name = "actix-files"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301482841d3d74483a446ead63cb7d362e187d2c8b603f13d91995621ea53c46"
dependencies = [
"actix-http",
"actix-service",
"actix-web",
"bitflags",
"bytes",
"derive_more",
"futures",
"log",
"mime",
"mime_guess",
"percent-encoding",
"v_htmlescape",
]
[[package]]
name = "actix-http"
version = "1.0.1"
@ -248,6 +293,21 @@ dependencies = [
"url",
]
[[package]]
name = "actix-web-actors"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1bd41bd66c4e9b5274cec87aac30168e63d64e96fd19db38edef6b46ba2982"
dependencies = [
"actix",
"actix-codec",
"actix-http",
"actix-web",
"bytes",
"futures",
"pin-project",
]
[[package]]
name = "actix-web-codegen"
version = "0.2.1"
@ -259,6 +319,17 @@ dependencies = [
"syn",
]
[[package]]
name = "actix_derive"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b95aceadaf327f18f0df5962fedc1bde2f870566a0b9f65c89508a3b1f79334c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "adler32"
version = "1.0.4"
@ -291,6 +362,17 @@ dependencies = [
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi 0.3.8",
]
[[package]]
name = "autocfg"
version = "1.0.0"
@ -442,6 +524,27 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
dependencies = [
"crossbeam-utils",
"maybe-uninit",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
"autocfg",
"cfg-if",
"lazy_static",
]
[[package]]
name = "derive_more"
version = "0.99.5"
@ -486,6 +589,19 @@ dependencies = [
"syn",
]
[[package]]
name = "env_logger"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
name = "failure"
version = "0.1.7"
@ -718,6 +834,15 @@ version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
[[package]]
name = "idna"
version = "0.2.0"
@ -838,6 +963,12 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
[[package]]
name = "memchr"
version = "2.3.3"
@ -850,6 +981,16 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.3.6"
@ -912,6 +1053,16 @@ dependencies = [
"winapi 0.3.8",
]
[[package]]
name = "nom"
version = "4.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ad2a91a8e869eeb30b9cb3119ae87773a8f4ae617f41b1eb9c154b2905f7bd6"
dependencies = [
"memchr",
"version_check 0.1.5",
]
[[package]]
name = "num-integer"
version = "0.1.42"
@ -1248,8 +1399,27 @@ dependencies = [
name = "telestrations"
version = "0.1.0"
dependencies = [
"actix",
"actix-files",
"actix-rt",
"actix-web",
"actix-web-actors",
"byteorder",
"bytes",
"env_logger",
"futures",
"rand",
"serde",
"serde_json",
]
[[package]]
name = "termcolor"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f"
dependencies = [
"winapi-util",
]
[[package]]
@ -1368,6 +1538,15 @@ dependencies = [
"trust-dns-proto",
]
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check 0.9.1",
]
[[package]]
name = "unicode-bidi"
version = "0.3.4"
@ -1409,6 +1588,49 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "v_escape"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "660b101c07b5d0863deb9e7fb3138777e858d6d2a79f9e6049a27d1cc77c6da6"
dependencies = [
"v_escape_derive",
]
[[package]]
name = "v_escape_derive"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2ca2a14bc3fc5b64d188b087a7d3a927df87b152e941ccfbc66672e20c467ae"
dependencies = [
"nom",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "v_htmlescape"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e33e939c0d8cf047514fb6ba7d5aac78bc56677a6938b2ee67000b91f2e97e41"
dependencies = [
"cfg-if",
"v_escape",
]
[[package]]
name = "version_check"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"
[[package]]
name = "version_check"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
@ -1449,6 +1671,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi 0.3.8",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View File

@ -4,8 +4,23 @@ version = "0.1.0"
authors = ["Peter Hart <cinphart@gmail.com>"]
edition = "2018"
[[bin]]
name = "telestrations-server"
path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = "2"
actix-rt = "1"
actix-rt = "1.0.0"
actix = "0.9.0"
actix-web = "2.0.0"
actix-web-actors = "2.0.0"
actix-files = "0.2.1"
rand = "0.7"
bytes = "0.5.3"
byteorder = "1.1"
futures = "0.3.1"
env_logger = "0.7"
serde = "1.0"
serde_json = "1.0"

View File

@ -1,14 +1,247 @@
use actix_web::{get, web, App, HttpServer, Responder};
use std::time::{Duration, Instant};
#[get("/{id}/{name}/index.html")]
async fn index(info: web::Path<(u32, String)>) -> impl Responder {
format!("Hello {}! id:{}", info.1, info.0)
use actix::*;
use actix_files as fs;
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
mod server;
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// Entry point for our route
async fn chat_route(
req: HttpRequest,
stream: web::Payload,
srv: web::Data<Addr<server::ChatServer>>,
) -> Result<HttpResponse, Error> {
ws::start(
WsChatSession {
id: 0,
hb: Instant::now(),
room: "Main".to_owned(),
name: None,
addr: srv.get_ref().clone(),
},
&req,
stream,
)
}
struct WsChatSession {
/// unique session id
id: usize,
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// otherwise we drop connection.
hb: Instant,
/// joined room
room: String,
/// peer name
name: Option<String>,
/// Chat server
addr: Addr<server::ChatServer>,
}
impl Actor for WsChatSession {
type Context = ws::WebsocketContext<Self>;
/// Method is called on actor start.
/// We register ws session with ChatServer
fn started(&mut self, ctx: &mut Self::Context) {
// we'll start heartbeat process on session start.
self.hb(ctx);
// register self in chat server. `AsyncContext::wait` register
// future within context, but context waits until this future resolves
// before processing any other events.
// HttpContext::state() is instance of WsChatSessionState, state is shared
// across all routes within application
let addr = ctx.address();
self.addr
.send(server::Connect {
addr: addr.recipient(),
})
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(res) => act.id = res,
// something is wrong with chat server
_ => ctx.stop(),
}
fut::ready(())
})
.wait(ctx);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
// notify chat server
self.addr.do_send(server::Disconnect { id: self.id });
Running::Stop
}
}
/// Handle messages from chat server, we simply send it to peer websocket
impl Handler<server::Message> for WsChatSession {
type Result = ();
fn handle(&mut self, msg: server::Message, ctx: &mut Self::Context) {
ctx.text(msg.0);
}
}
/// WebSocket message handler
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
let msg = match msg {
Err(_) => {
ctx.stop();
return;
}
Ok(msg) => msg,
};
println!("WEBSOCKET MESSAGE: {:?}", msg);
match msg {
ws::Message::Ping(msg) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
ws::Message::Pong(_) => {
self.hb = Instant::now();
}
ws::Message::Text(text) => {
let m = text.trim();
// we check for /sss type of messages
if m.starts_with('/') {
let v: Vec<&str> = m.splitn(2, ' ').collect();
match v[0] {
"/list" => {
// Send ListRooms message to chat server and wait for
// response
println!("List rooms");
self.addr
.send(server::ListRooms)
.into_actor(self)
.then(|res, _, ctx| {
match res {
Ok(rooms) => {
for room in rooms {
ctx.text(room);
}
}
_ => println!("Something is wrong"),
}
fut::ready(())
})
.wait(ctx)
// .wait(ctx) pauses all events in context,
// so actor wont receive any new messages until it get list
// of rooms back
}
"/join" => {
if v.len() == 2 {
self.room = v[1].to_owned();
self.addr.do_send(server::Join {
id: self.id,
name: self.room.clone(),
});
ctx.text("joined");
} else {
ctx.text("!!! room name is required");
}
}
"/name" => {
if v.len() == 2 {
self.name = Some(v[1].to_owned());
} else {
ctx.text("!!! name is required");
}
}
_ => ctx.text(format!("!!! unknown command: {:?}", m)),
}
} else {
let msg = if let Some(ref name) = self.name {
format!("{}: {}", name, m)
} else {
m.to_owned()
};
// send message to chat server
self.addr.do_send(server::ClientMessage {
id: self.id,
msg,
room: self.room.clone(),
})
}
}
ws::Message::Binary(_) => println!("Unexpected binary"),
ws::Message::Close(_) => {
ctx.stop();
}
ws::Message::Continuation(_) => {
ctx.stop();
}
ws::Message::Nop => (),
}
}
}
impl WsChatSession {
/// helper method that sends ping to client every second.
///
/// also this method checks heartbeats from client
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");
// notify chat server
act.addr.do_send(server::Disconnect { id: act.id });
// stop actor
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(b"");
});
}
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().service(index))
.bind("0.0.0.0:8080")?
.run()
.await
env_logger::init();
// Start chat server actor
let server = server::ChatServer::default().start();
// Create Http server with websocket support
HttpServer::new(move || {
App::new()
.data(server.clone())
// redirect to websocket.html
.service(web::resource("/").route(web::get().to(|| {
HttpResponse::Found()
.header("LOCATION", "/static/websocket.html")
.finish()
})))
// websocket
.service(web::resource("/ws/").to(chat_route))
// static resources
.service(fs::Files::new("/static/", "static/"))
})
.bind("0.0.0.0:8080")?
.run()
.await
}

207
src/server/mod.rs Normal file
View File

@ -0,0 +1,207 @@
//! `ChatServer` is an actor. It maintains list of connection client session.
//! And manages available rooms. Peers send messages to other peers in same
//! room through `ChatServer`.
use actix::prelude::*;
use rand::{self, rngs::ThreadRng, Rng};
use std::collections::{HashMap, HashSet};
/// Chat server sends this messages to session
#[derive(Message)]
#[rtype(result = "()")]
pub struct Message(pub String);
/// Message for chat server communications
/// New chat session is created
#[derive(Message)]
#[rtype(usize)]
pub struct Connect {
pub addr: Recipient<Message>,
}
/// Session is disconnected
#[derive(Message)]
#[rtype(result = "()")]
pub struct Disconnect {
pub id: usize,
}
/// Send message to specific room
#[derive(Message)]
#[rtype(result = "()")]
pub struct ClientMessage {
/// Id of the client session
pub id: usize,
/// Peer message
pub msg: String,
/// Room name
pub room: String,
}
/// List of available rooms
pub struct ListRooms;
impl actix::Message for ListRooms {
type Result = Vec<String>;
}
/// Join room, if room does not exists create new one.
#[derive(Message)]
#[rtype(result = "()")]
pub struct Join {
/// Client id
pub id: usize,
/// Room name
pub name: String,
}
/// `ChatServer` manages chat rooms and responsible for coordinating chat
/// session. implementation is super primitive
pub struct ChatServer {
sessions: HashMap<usize, Recipient<Message>>,
rooms: HashMap<String, HashSet<usize>>,
rng: ThreadRng,
}
impl Default for ChatServer {
fn default() -> ChatServer {
// default room
let mut rooms = HashMap::new();
rooms.insert("Main".to_owned(), HashSet::new());
ChatServer {
sessions: HashMap::new(),
rooms,
rng: rand::thread_rng(),
}
}
}
impl ChatServer {
/// Send message to all users in the room
fn send_message(&self, room: &str, message: &str, skip_id: usize) {
if let Some(sessions) = self.rooms.get(room) {
for id in sessions {
if *id != skip_id {
if let Some(addr) = self.sessions.get(id) {
let _ = addr.do_send(Message(message.to_owned()));
}
}
}
}
}
}
/// Make actor from `ChatServer`
impl Actor for ChatServer {
/// We are going to use simple Context, we just need ability to communicate
/// with other actors.
type Context = Context<Self>;
}
/// Handler for Connect message.
///
/// Register new session and assign unique id to this session
impl Handler<Connect> for ChatServer {
type Result = usize;
fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
println!("Someone joined");
// notify all users in same room
self.send_message(&"Main".to_owned(), "Someone joined", 0);
// register session with random id
let id = self.rng.gen::<usize>();
self.sessions.insert(id, msg.addr);
// auto join session to Main room
self.rooms
.entry("Main".to_owned())
.or_insert(HashSet::new())
.insert(id);
// send id back
id
}
}
/// Handler for Disconnect message.
impl Handler<Disconnect> for ChatServer {
type Result = ();
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
println!("Someone disconnected");
let mut rooms: Vec<String> = Vec::new();
// remove address
if self.sessions.remove(&msg.id).is_some() {
// remove session from all rooms
for (name, sessions) in &mut self.rooms {
if sessions.remove(&msg.id) {
rooms.push(name.to_owned());
}
}
}
// send message to other users
for room in rooms {
self.send_message(&room, "Someone disconnected", 0);
}
}
}
/// Handler for Message message.
impl Handler<ClientMessage> for ChatServer {
type Result = ();
fn handle(&mut self, msg: ClientMessage, _: &mut Context<Self>) {
self.send_message(&msg.room, msg.msg.as_str(), msg.id);
}
}
/// Handler for `ListRooms` message.
impl Handler<ListRooms> for ChatServer {
type Result = MessageResult<ListRooms>;
fn handle(&mut self, _: ListRooms, _: &mut Context<Self>) -> Self::Result {
let mut rooms = Vec::new();
for key in self.rooms.keys() {
rooms.push(key.to_owned())
}
MessageResult(rooms)
}
}
/// Join room, send disconnect message to old room
/// send join message to new room
impl Handler<Join> for ChatServer {
type Result = ();
fn handle(&mut self, msg: Join, _: &mut Context<Self>) {
let Join { id, name } = msg;
let mut rooms = Vec::new();
// remove session from all rooms
for (n, sessions) in &mut self.rooms {
if sessions.remove(&id) {
rooms.push(n.to_owned());
}
}
// send message to other users
for room in rooms {
self.send_message(&room, "Someone disconnected", 0);
}
self.rooms
.entry(name.clone())
.or_insert(HashSet::new())
.insert(id);
self.send_message(&name, "Someone connected", id);
}
}

90
static/websocket.html Normal file
View File

@ -0,0 +1,90 @@
<!DOCTYPE html>
<meta charset="utf-8" />
<html>
<head>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js">
</script>
<script language="javascript" type="text/javascript">
$(function() {
var conn = null;
function log(msg) {
var control = $('#log');
control.html(control.html() + msg + '<br/>');
control.scrollTop(control.scrollTop() + 1000);
}
function connect() {
disconnect();
var wsUri = (window.location.protocol=='https:'&&'wss://'||'ws://')+window.location.host + '/ws/';
conn = new WebSocket(wsUri);
log('Connecting...');
conn.onopen = function() {
log('Connected.');
update_ui();
};
conn.onmessage = function(e) {
log('Received: ' + e.data);
};
conn.onclose = function() {
log('Disconnected.');
conn = null;
update_ui();
};
}
function disconnect() {
if (conn != null) {
log('Disconnecting...');
conn.close();
conn = null;
update_ui();
}
}
function update_ui() {
var msg = '';
if (conn == null) {
$('#status').text('disconnected');
$('#connect').html('Connect');
} else {
$('#status').text('connected (' + conn.protocol + ')');
$('#connect').html('Disconnect');
}
}
$('#connect').click(function() {
if (conn == null) {
connect();
} else {
disconnect();
}
update_ui();
return false;
});
$('#send').click(function() {
var text = $('#text').val();
log('Sending: ' + text);
conn.send(text);
$('#text').val('').focus();
return false;
});
$('#text').keyup(function(e) {
if (e.keyCode === 13) {
$('#send').click();
return false;
}
});
});
</script>
</head>
<body>
<h3>Chat!</h3>
<div>
<button id="connect">Connect</button>&nbsp;|&nbsp;Status:
<span id="status">disconnected</span>
</div>
<div id="log"
style="width:20em;height:15em;overflow:auto;border:1px solid black">
</div>
<form id="chatform" onsubmit="return false;">
<input id="text" type="text" />
<input id="send" type="button" value="Send" />
</form>
</body>
</html>