diff --git a/client/src/client.rs b/client/src/client.rs index 2dc0b12..62a1002 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -16,9 +16,6 @@ use crate::user; #[derive(Debug)] enum IncomingMessage { Proto(solstice_proto::Response), - - #[allow(dead_code)] - ControlNotification(control::Notification), } #[derive(Debug)] @@ -52,11 +49,6 @@ pub struct Client { proto_tx: mio::deprecated::Sender, proto_rx: crossbeam_channel::Receiver, - control_tx: Option, - - #[allow(dead_code)] - control_rx: crossbeam_channel::Receiver, - login_status: LoginStatus, rooms: room::RoomMap, @@ -73,15 +65,11 @@ impl Client { pub fn new( proto_tx: mio::deprecated::Sender, proto_rx: crossbeam_channel::Receiver, - control_rx: crossbeam_channel::Receiver, ) -> Self { Client { proto_tx: proto_tx, proto_rx: proto_rx, - control_tx: None, - control_rx: control_rx, - login_status: LoginStatus::Todo, rooms: room::RoomMap::new(), @@ -122,10 +110,6 @@ impl Client { IncomingMessage::Proto(response) => { self.handle_proto_response(response) } - - IncomingMessage::ControlNotification(notif) => { - self.handle_control_notification(notif) - } } } } @@ -156,53 +140,14 @@ impl Client { /// Send a response to the controller client. fn send_to_controller(&mut self, response: control::Response) { - #[allow(deprecated)] - let result = match self.control_tx { - None => { - // Silently drop control requests when controller is - // disconnected. - return; - } - Some(ref mut control_tx) => control_tx.send(response), - }; - // If we failed to send, we assume it means that the other end of the - // channel has been dropped, i.e. the controller has disconnected. - // It may be that mio has died on us, in which case we will never see - // a controller again. If that happens, there would have probably been - // a panic anyway, so we might never hit this corner case. - if let Err(_) = result { - info!("Controller has disconnected."); - self.control_tx = None; - } - } - - /*===============================* - * CONTROL NOTIFICATION HANDLING * - *===============================*/ - - fn handle_control_notification(&mut self, notif: control::Notification) { - match notif { - control::Notification::Connected(tx) => { - self.control_tx = Some(tx); - } - - control::Notification::Disconnected => { - self.control_tx = None; - } - - control::Notification::Error(e) => { - debug!("Control loop error: {}", e); - self.control_tx = None; - } - - control::Notification::Request(req) => self.handle_control_request(req), - } + warn!("Cannot send control response: {:?}", response); } /*==========================* * CONTROL REQUEST HANDLING * *==========================*/ + /* fn handle_control_request(&mut self, request: control::Request) { match request { control::Request::LoginStatusRequest => { @@ -322,6 +267,7 @@ impl Client { }, )); } + */ /*=========================* * PROTO RESPONSE HANDLING * diff --git a/client/src/context.rs b/client/src/context.rs index 7505465..94df335 100644 --- a/client/src/context.rs +++ b/client/src/context.rs @@ -23,6 +23,7 @@ pub struct Context { /// Sender half of a channel used to send requests to the server. pub server_request_tx: Sender, + // TODO: Add control response sender. } /// Convenience bundle for creating new `Context` structs. diff --git a/client/src/control/mod.rs b/client/src/control/mod.rs index 1057a8e..7850598 100644 --- a/client/src/control/mod.rs +++ b/client/src/control/mod.rs @@ -4,4 +4,4 @@ mod ws; pub use self::request::*; pub use self::response::*; -pub use self::ws::{listen, Notification, SendError, Sender}; +pub use self::ws::listen; diff --git a/client/src/control/ws.rs b/client/src/control/ws.rs index 1bb482f..393b83a 100644 --- a/client/src/control/ws.rs +++ b/client/src/control/ws.rs @@ -1,25 +1,11 @@ -use crossbeam_channel; use solstice_proto::config; use thiserror::Error; +use tokio::sync::mpsc; use ws; -use super::request::*; -use super::response::*; - -/// This enum contains the possible notifications that the control loop will -/// send to the client. -#[derive(Debug)] -pub enum Notification { - /// A new controller has connected: control messages can now be sent on the - /// given channel. - Connected(Sender), - /// The controller has disconnected. - Disconnected, - /// An irretrievable error has arisen. - Error(String), - /// The controller has sent a request. - Request(Request), -} +use crate::control::request::*; +use crate::control::response::*; +use crate::dispatcher::Message; /// This error is returned when a `Sender` fails to send a control request. #[derive(Debug, Error)] @@ -50,37 +36,32 @@ impl Sender { /// This struct handles a single websocket connection. #[derive(Debug)] struct Handler { - /// The channel on which to send notifications to the client. - client_tx: crossbeam_channel::Sender, + /// The channel on which to send requests to the client. + client_tx: mpsc::UnboundedSender, /// The channel on which to send messages to the controller. socket_tx: ws::Sender, } impl Handler { - fn send_to_client(&self, notification: Notification) -> ws::Result<()> { - match self.client_tx.send(notification) { - Ok(()) => Ok(()), - Err(e) => { - error!("Error sending notification to client: {}", e); - Err(ws::Error::new(ws::ErrorKind::Internal, "")) - } - } + fn send_to_client(&self, request: Request) -> ws::Result<()> { + self + .client_tx + .send(Message::ControlRequest(request)) + .map_err(|err| { + error!("Error sending notification to client: {}", err); + ws::Error::new(ws::ErrorKind::Internal, "") + }) } } impl ws::Handler for Handler { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { info!("Websocket open"); - self.send_to_client(Notification::Connected(Sender { - sender: self.socket_tx.clone(), - })) + Ok(()) } fn on_close(&mut self, code: ws::CloseCode, reason: &str) { info!("Websocket closed: code: {:?}, reason: {:?}", code, reason); - self - .send_to_client(Notification::Disconnected) - .unwrap_or(()) } fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { @@ -108,13 +89,16 @@ impl ws::Handler for Handler { debug!("Received control request: {:?}", control_request); // Send the control request to the client. - self.send_to_client(Notification::Request(control_request)) + self.send_to_client(control_request) } } /// Start listening on the socket address stored in configuration, and send /// control notifications to the client through the given channel. -pub fn listen(client_tx: crossbeam_channel::Sender) { +pub fn listen( + client_tx: mpsc::UnboundedSender, + socket_rx: mpsc::Receiver, +) { let websocket_result = ws::Builder::new() .with_settings(ws::Settings { max_connections: 1, @@ -129,16 +113,14 @@ pub fn listen(client_tx: crossbeam_channel::Sender) { Ok(websocket) => websocket, Err(e) => { error!("Unable to build websocket: {}", e); - client_tx - .send(Notification::Error(format!( - "Unable to build websocket: {}", - e - ))) - .unwrap(); return; } }; + // TODO: Read responses off `socket_rx` and send them on `client_tx`. When + // the channel is closed, we should stop listening. In the meantime, we can + // at least spawn a task to pass along the responses. + let listen_result = websocket.listen((config::CONTROL_HOST, config::CONTROL_PORT)); @@ -146,12 +128,6 @@ pub fn listen(client_tx: crossbeam_channel::Sender) { Ok(_) => (), Err(e) => { error!("Unable to listen on websocket: {}", e); - client_tx - .send(Notification::Error(format!( - "Unable to listen on websocket: {}", - e - ))) - .unwrap(); } } } diff --git a/client/src/main.rs b/client/src/main.rs index d91e938..a83a73d 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -40,16 +40,10 @@ fn old_main() { }; let client_to_proto_tx = proto_agent.channel(); - let (control_to_client_tx, control_to_client_rx) = - crossbeam_channel::unbounded(); - let mut client = client::Client::new( - client_to_proto_tx, - proto_to_client_rx, - control_to_client_rx, - ); + let mut client = client::Client::new(client_to_proto_tx, proto_to_client_rx); - thread::spawn(move || control::listen(control_to_client_tx)); + // Run ws server. thread::spawn(move || proto_agent.run().unwrap()); client.run(); } @@ -121,12 +115,18 @@ async fn async_main() { let (dispatcher_tx, mut dispatcher_rx) = tokio::sync::mpsc::unbounded_channel(); + let (control_tx, control_rx) = tokio::sync::mpsc::channel(100); + let client_task = - tokio::spawn(run_client(bundle.server_request_rx, dispatcher_tx)); + tokio::spawn(run_client(bundle.server_request_rx, dispatcher_tx.clone())); let dispatcher = Dispatcher::new(); let executor = Executor::new(bundle.context); + let control_task = tokio::task::spawn_blocking(move || { + control::listen(dispatcher_tx, control_rx); + }); + while let Some(message) = dispatcher_rx.recv().await { if let Some(job) = dispatcher.dispatch(message) { executor.schedule(job); @@ -141,6 +141,9 @@ async fn async_main() { .await .expect("Client task join error") .expect("Client error"); + + // TODO: Send some signal that we want to wrap up to the controller. + control_task.await.unwrap(); } #[tokio::main]