From b279b6996c115862eb7b6329df2fe61db7f491f7 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Fri, 4 Mar 2016 12:35:04 +0100 Subject: [PATCH] Cleanly shut down websocket when client leaves. --- src/control.rs | 155 +++++++++++++++++++++++++++++++++++-------------- src/main.rs | 2 + 2 files changed, 115 insertions(+), 42 deletions(-) diff --git a/src/control.rs b/src/control.rs index 704403f..9315dde 100644 --- a/src/control.rs +++ b/src/control.rs @@ -1,10 +1,9 @@ use std::io; -use std::io::{Read, Write}; use std::sync::mpsc; +use std::str; use std::thread; use rustc_serialize::json; -use mio::tcp::TcpStream; use websocket; use websocket::{Receiver, Sender}; @@ -20,6 +19,17 @@ type WebSocketSender = type WebSocketClient = websocket::Client; +enum Error { + IOError(io::Error), + WebSocketError(websocket::result::WebSocketError), +} + +impl From for Error { + fn from(err: io::Error) -> Self { + Error::IOError(err) + } +} + pub struct Controller { client_tx: mpsc::Sender, client_rx: mpsc::Receiver, @@ -39,27 +49,30 @@ impl Controller { pub fn run(&mut self) { let host = config::CONTROL_HOST; let port = config::CONTROL_PORT; + let mut server = websocket::Server::bind((host, port)).unwrap(); + info!("Controller bound to {}:{}", host, port); + loop { - let client = Self::get_client(host, port); + let client = match Self::try_get_client(&mut server) { + Ok(client) => client, + Err(e) => { + error!("Error accepting control connection: {}", e); + continue; + } + }; info!("Controller client connected"); - let (mut sender, mut receiver) = client.split(); + + let (sender, receiver) = client.split(); + let (sender_tx, sender_rx) = mpsc::channel(); + let tx = self.client_tx.clone(); - thread::spawn(move || { - Self::receiver_loop(receiver, tx); + let handle = thread::spawn(move || { + Self::receiver_loop(receiver, tx, sender_tx); }); - Self::sender_loop(sender, &mut self.client_rx); - } - } - fn get_client(host: &str, port: u16) -> WebSocketClient - { - let mut server = websocket::Server::bind((host, port)).unwrap(); - info!("Controller bound to {}:{}", host, port); - loop { - match Self::try_get_client(&mut server) { - Ok(client) => return client, - Err(e) => error!("Error accepting control connection: {}", e), - } + Self::sender_loop(sender, &mut self.client_rx, sender_rx); + + handle.join(); } } @@ -76,43 +89,101 @@ impl Controller { fn receiver_loop( mut receiver: WebSocketReceiver, - tx: mpsc::Sender) + client_tx: mpsc::Sender, + sender_tx: mpsc::Sender<()>) { for message_result in receiver.incoming_messages() { - let message: websocket::Message = match message_result { + let message : websocket::message::Message = match message_result { Ok(message) => message, Err(e) => { - warn!("Error receiving control message {}", e); + warn!("Error receving websocket message: {}", e); continue; - }, + } }; - let payload = match message.opcode { + match message.opcode { websocket::message::Type::Text => - String::from_utf8(message.payload.into_owned()).unwrap(), + Self::handle_text_message(&message.payload, &client_tx), - code => { - warn!("Unhandled websocket message type: {:?}", code); - continue; - }, - }; - match json::decode(&payload) { - Ok(control_request) => { - debug!("Received control request: {:?}", control_request); - tx.send(client::IncomingMessage::ControlRequest( - control_request)); - }, - Err(e) => warn!("Error decoding control request: {}", e), - }; + websocket::message::Type::Close => break, + + code => warn!("Unhandled websocket message with code {:?}", + code), + } + } + info!("Shutting down websocket receiver"); + receiver.shutdown().unwrap(); + // Notify sender that the websocket is closed + sender_tx.send(()); + } + + fn handle_text_message( + payload_bytes: &[u8], + client_tx: &mpsc::Sender) + { + let payload = match str::from_utf8(payload_bytes) { + Ok(payload) => payload, + Err(e) => { + warn!("Invalid UTF8 payload: {}", e); + return; + }, + }; + + let control_request = match json::decode(payload) { + Ok(control_request) => control_request, + Err(e) => { + warn!("Invalid JSON payload: {}", e); + return; + } + }; + + let message = client::IncomingMessage::ControlRequest(control_request); + match client_tx.send(message) { + Ok(()) => (), + Err(e) => { + warn!("Error sending control request to client: {}", e); + } } } fn sender_loop( - mut sender: WebSocketSender, rx: &mut mpsc::Receiver) + mut sender: WebSocketSender, + client_rx: &mut mpsc::Receiver, + sender_rx: mpsc::Receiver<()>) { - for control_response in rx.iter() { - let encoded = json::encode(&control_response).unwrap(); - let message = websocket::Message::text(encoded); - sender.send_message(&message).unwrap(); + loop { + select! { + _ = sender_rx.recv() => break, + + response_result = client_rx.recv() => { + match response_result { + Ok(response) => + Self::send_response(&mut sender, response), + Err(e) => { + error!("Error receving from client channel: {}", e); + break; + } + } + } + } + } + info!("Shutting down websocket sender"); + sender.shutdown_all().unwrap(); + } + + fn send_response(sender: &mut WebSocketSender, response: ControlResponse) { + let message = match json::encode(&response) { + Ok(encoded) => websocket::Message::text(encoded), + Err(e) => { + error!("Error encoding control_response to JSON: {}", e); + return; + } + }; + match sender.send_message(&message) { + Ok(()) => (), + Err(e) => { + error!("Error sending message to control client: {}", e); + return; + } } } } diff --git a/src/main.rs b/src/main.rs index db0bee0..2c6b8c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +#![feature(mpsc_select)] + mod client; mod config; mod control;