diff --git a/client/src/control/listener.rs b/client/src/control/listener.rs index 13e8eed..888ed2c 100644 --- a/client/src/control/listener.rs +++ b/client/src/control/listener.rs @@ -13,11 +13,11 @@ use tokio_tungstenite::WebSocketStream; use crate::control::request::*; use crate::control::response::*; -use crate::event::Message; +use crate::event::Event; async fn forward_incoming( mut incoming: SplitStream>, - message_tx: &mpsc::Sender, + event_tx: &mpsc::Sender, ) -> anyhow::Result<()> { while let Some(result) = incoming.next().await { if let Err(WebSocketError::ConnectionClosed) = result { @@ -31,8 +31,8 @@ async fn forward_incoming( let control_request: Request = serde_json::from_str(&text) .with_context(|| format!("decoding JSON message {:?}", text))?; - message_tx - .send(Message::ControlRequest(control_request)) + event_tx + .send(Event::ControlRequest(control_request)) .await .context("dispatcher channel closed")?; } @@ -60,7 +60,7 @@ async fn forward_outgoing( async fn handle( stream: TcpStream, remote_address: &SocketAddr, - message_tx: &mpsc::Sender, + event_tx: &mpsc::Sender, response_rx: &mut mpsc::Receiver, ) { let ws_stream = match tokio_tungstenite::accept_async(stream).await { @@ -79,7 +79,7 @@ async fn handle( let (mut outgoing, incoming) = ws_stream.split(); tokio::select! { - result = forward_incoming(incoming, message_tx) => match result { + result = forward_incoming(incoming, event_tx) => match result { Ok(()) => info!( "Incoming WebSocket handler task for {} stopped", remote_address, @@ -132,11 +132,11 @@ impl Listener { } /// Starts accepting control connections, one at a time. For each connection, - /// forwards incoming messages from the socket to `message_tx` and outgoing + /// forwards incoming messages from the socket to `event_tx` and outgoing /// responses from `response_rx` to the socket. pub async fn run( &mut self, - message_tx: mpsc::Sender, + event_tx: mpsc::Sender, mut response_rx: mpsc::Receiver, ) -> anyhow::Result<()> { info!("Accepting control connections on {}", self.address()); @@ -163,7 +163,7 @@ impl Listener { info!("Accepted control connection from {}", remote_address); - handle(stream, &remote_address, &message_tx, &mut response_rx).await + handle(stream, &remote_address, &event_tx, &mut response_rx).await } Ok(()) @@ -185,19 +185,19 @@ mod tests { use tokio_tungstenite::tungstenite::Message as WebSocketMessage; use crate::control::{Request, Response, RoomLeaveResponse}; - use crate::event::Message; + use crate::event::Event; // A bound `Listener` packaged with the channels it needs to run. // Convenient for tests. struct RunnableListener { inner: Listener, - message_tx: mpsc::Sender, + event_tx: mpsc::Sender, response_rx: mpsc::Receiver, } impl RunnableListener { async fn run(mut self) -> anyhow::Result<()> { - self.inner.run(self.message_tx, self.response_rx).await + self.inner.run(self.event_tx, self.response_rx).await } } @@ -206,7 +206,7 @@ mod tests { pub listener: RunnableListener, pub address: SocketAddr, pub websocket_address: String, - pub message_rx: mpsc::Receiver, + pub event_rx: mpsc::Receiver, pub response_tx: mpsc::Sender, } @@ -217,12 +217,12 @@ mod tests { let address = inner.address().clone(); let websocket_address = format!("ws://{}", address); - let (message_tx, message_rx) = mpsc::channel(100); + let (event_tx, event_rx) = mpsc::channel(100); let (response_tx, response_rx) = mpsc::channel(100); let listener = RunnableListener { inner, - message_tx, + event_tx, response_rx, }; @@ -230,7 +230,7 @@ mod tests { listener, address, websocket_address, - message_rx, + event_rx, response_tx, }) } @@ -335,8 +335,8 @@ mod tests { .context("sending request")?; assert_eq!( - bundle.message_rx.recv().await, - Some(Message::ControlRequest(Request::RoomListRequest)) + bundle.event_rx.recv().await, + Some(Event::ControlRequest(Request::RoomListRequest)) ); // Dropping this sender signals to the listener that it should stop. diff --git a/client/src/dispatcher.rs b/client/src/dispatcher.rs index 7bf3584..1bc8f93 100644 --- a/client/src/dispatcher.rs +++ b/client/src/dispatcher.rs @@ -1,18 +1,18 @@ -//! This module defines the central message dispatcher to the client process. +//! This module defines the central event dispatcher to the client process. use log::{error, warn}; use solstice_proto::server::ServerResponse; use crate::context::Context; use crate::control::Request as ControlRequest; -use crate::event::{Message, EventHandler}; +use crate::event::{Event, EventHandler}; use crate::handlers::*; use crate::message_handler::MessageHandler; use crate::room::{RoomEvent, RoomEventHandler}; /// Subsystem event handlers to which the `Dispatcher` dispatches events. pub trait DispatcherHandlers { - type RoomEventHandler: EventHandler; + type RoomEventHandler: EventHandler; fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler; } @@ -31,14 +31,14 @@ impl DispatcherHandlers for DefaultDispatcherHandlers { /// Parts that make up a `Dispatcher`. pub struct DispatcherParts { - /// The global context against which messages are handled. + /// The global context against which events are handled. pub context: Context, /// Subsystem event handlers. Injected for testability. pub handlers: H, } -/// The `Dispatcher` is in charge of mapping messages to their handlers. +/// The `Dispatcher` is in charge of mapping events to their handlers. pub struct Dispatcher { pub context: Context, pub handlers: H, @@ -61,79 +61,68 @@ impl Dispatcher { } } - fn dispatch_internal(&mut self, message: Message) -> anyhow::Result<()> { - match message { - Message::ServerResponse(ServerResponse::PeerAddressResponse( - response, - )) => { + fn dispatch_internal(&mut self, event: Event) -> anyhow::Result<()> { + match event { + Event::ServerResponse(ServerResponse::PeerAddressResponse(response)) => { PeerAddressResponseHandler::default().run(&mut self.context, &response) } - Message::ServerResponse(ServerResponse::PrivilegedUsersResponse( + Event::ServerResponse(ServerResponse::PrivilegedUsersResponse( response, )) => PrivilegedUsersResponseHandler::default() .run(&mut self.context, &response), - Message::ServerResponse(ServerResponse::RoomJoinResponse(response)) => { - self - .handlers - .room_event_handler() - .handle(&mut self.context, RoomEvent::JoinResponse(response)) - } - Message::ServerResponse(ServerResponse::RoomMessageResponse( - response, - )) => self + Event::ServerResponse(ServerResponse::RoomJoinResponse(response)) => self .handlers .room_event_handler() - .handle(&mut self.context, RoomEvent::MessageResponse(response)), - Message::ServerResponse(ServerResponse::RoomListResponse(response)) => { + .handle(&mut self.context, RoomEvent::JoinResponse(response)), + Event::ServerResponse(ServerResponse::RoomMessageResponse(response)) => { self .handlers .room_event_handler() - .handle(&mut self.context, RoomEvent::ListResponse(response)) + .handle(&mut self.context, RoomEvent::MessageResponse(response)) } - Message::ServerResponse(response) => { + Event::ServerResponse(ServerResponse::RoomListResponse(response)) => self + .handlers + .room_event_handler() + .handle(&mut self.context, RoomEvent::ListResponse(response)), + Event::ServerResponse(response) => { warn!("Unhandled server response: {:?}", response); Ok(()) } - Message::ControlRequest(ControlRequest::LoginStatusRequest) => { + Event::ControlRequest(ControlRequest::LoginStatusRequest) => { LoginStatusRequestHandler::default().run(&mut self.context, &()) } - Message::ControlRequest(ControlRequest::PeerConnectRequest(request)) => { + Event::ControlRequest(ControlRequest::PeerConnectRequest(request)) => { PeerConnectRequestHandler::default().run(&mut self.context, &request) } - Message::ControlRequest(ControlRequest::RoomListRequest) => self + Event::ControlRequest(ControlRequest::RoomListRequest) => self .handlers .room_event_handler() .handle(&mut self.context, RoomEvent::ListRequest), - Message::ControlRequest(ControlRequest::RoomMessageRequest(request)) => { + Event::ControlRequest(ControlRequest::RoomMessageRequest(request)) => { self .handlers .room_event_handler() .handle(&mut self.context, RoomEvent::MessageRequest(request)) } - Message::ControlRequest(ControlRequest::RoomJoinRequest(room_name)) => { - self - .handlers - .room_event_handler() - .handle(&mut self.context, RoomEvent::JoinRequest(room_name)) - } - Message::ControlRequest(ControlRequest::UserListRequest) => { + Event::ControlRequest(ControlRequest::RoomJoinRequest(room_name)) => self + .handlers + .room_event_handler() + .handle(&mut self.context, RoomEvent::JoinRequest(room_name)), + Event::ControlRequest(ControlRequest::UserListRequest) => { UserListRequestHandler::default().run(&mut self.context, &()) } - Message::ControlRequest(request) => { + Event::ControlRequest(request) => { warn!("Unhandled control request: {:?}", request); Ok(()) } } } - /// Dispatches the given message to the appropriate subsystem handler. - pub fn dispatch(&mut self, message: Message) { - let debug_message = format!("{:?}", &message); - if let Err(error) = self.dispatch_internal(message) { - error!( - "Error handling message: {:?}\nMessage: {}", - error, debug_message - ); + /// Dispatches the given event to the appropriate subsystem handler. + pub fn dispatch(&mut self, event: Event) { + let debug_event = format!("{:?}", &event); + if let Err(error) = self.dispatch_internal(event) { + error!("Error handling event: {:?}\nEvent: {}", error, debug_event); } } } @@ -146,7 +135,7 @@ mod tests { use crate::context::ContextBundle; use crate::control; - use crate::dispatcher::Message; + use crate::dispatcher::Event; use crate::room::testing::FakeRoomEventHandler; use super::*; @@ -170,9 +159,9 @@ mod tests { } } - /// Dispatches `message` to fake handlers using a new `Dispatcher` and + /// Dispatches `event` to fake handlers using a new `Dispatcher` and /// `Context`, then returns the handlers for inspection. - fn dispatch(message: Message) -> FakeDispatcherHandlers { + fn dispatch(event: Event) -> FakeDispatcherHandlers { let bundle = ContextBundle::default(); let mut dispatcher = Dispatcher::from_parts(DispatcherParts { @@ -180,7 +169,7 @@ mod tests { handlers: FakeDispatcherHandlers::default(), }); - dispatcher.dispatch(message); + dispatcher.dispatch(event); dispatcher.into_parts().handlers } @@ -191,7 +180,7 @@ mod tests { reason: "bleep bloop".to_string(), }; - let handlers = dispatch(Message::ServerResponse(response.into())); + let handlers = dispatch(Event::ServerResponse(response.into())); assert!(!handlers.has_events()); } @@ -200,7 +189,7 @@ mod tests { fn dispatches_login_status_request() { let request = control::Request::LoginStatusRequest; - let _handlers = dispatch(Message::ControlRequest(request)); + let _handlers = dispatch(Event::ControlRequest(request)); // TODO: Check that event is dispatched to a new login event handler. } @@ -213,7 +202,7 @@ mod tests { port: 1234, }; - let _handlers = dispatch(Message::ServerResponse(response.into())); + let _handlers = dispatch(Event::ServerResponse(response.into())); // TODO: Check that event is dispatched to a new peer event handler. } @@ -222,7 +211,7 @@ mod tests { fn dispatches_peer_connect_request() { let request = control::Request::PeerConnectRequest("shruti".to_string()); - let _handlers = dispatch(Message::ControlRequest(request)); + let _handlers = dispatch(Event::ControlRequest(request)); // TODO: Check that event is dispatched to a new peer event handler. } @@ -233,7 +222,7 @@ mod tests { users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()], }; - let _handlers = dispatch(Message::ServerResponse(response.into())); + let _handlers = dispatch(Event::ServerResponse(response.into())); // TODO: Check that event is dispatched to a new user event handler. } @@ -242,7 +231,7 @@ mod tests { fn dispatches_room_join_request() { let request = control::Request::RoomJoinRequest("bleep".to_string()); - let handlers = dispatch(Message::ControlRequest(request.into())); + let handlers = dispatch(Event::ControlRequest(request.into())); assert_eq!( handlers.room_event_handler.events, @@ -259,7 +248,7 @@ mod tests { users: Vec::new(), }; - let handlers = dispatch(Message::ServerResponse(response.clone().into())); + let handlers = dispatch(Event::ServerResponse(response.clone().into())); assert_eq!( handlers.room_event_handler.events, @@ -286,7 +275,7 @@ mod tests { operated_private_room_names: Vec::new(), }; - let handlers = dispatch(Message::ServerResponse(response.clone().into())); + let handlers = dispatch(Event::ServerResponse(response.clone().into())); assert_eq!( handlers.room_event_handler.events, @@ -301,7 +290,7 @@ mod tests { message: "yo!".to_string(), }; - let handlers = dispatch(Message::ControlRequest(request.clone().into())); + let handlers = dispatch(Event::ControlRequest(request.clone().into())); assert_eq!( handlers.room_event_handler.events, @@ -317,7 +306,7 @@ mod tests { message: "yo!".to_string(), }; - let handlers = dispatch(Message::ServerResponse(response.clone().into())); + let handlers = dispatch(Event::ServerResponse(response.clone().into())); assert_eq!( handlers.room_event_handler.events, diff --git a/client/src/event.rs b/client/src/event.rs index ff8c3f0..ec694a4 100644 --- a/client/src/event.rs +++ b/client/src/event.rs @@ -5,21 +5,20 @@ use solstice_proto::ServerResponse; use crate::context::Context; use crate::control; -// TODO: Rename to `Event`. -/// The type of messages dispatched by a dispatcher. +/// The type of events affecting the client. #[derive(Debug, Eq, PartialEq)] -pub enum Message { +pub enum Event { ControlRequest(control::Request), ServerResponse(ServerResponse), } -impl From for Message { +impl From for Event { fn from(response: ServerResponse) -> Self { Self::ServerResponse(response) } } -impl From for Message { +impl From for Event { fn from(request: control::Request) -> Self { Self::ControlRequest(request) } diff --git a/client/src/main.rs b/client/src/main.rs index cf6dbb8..2a3c74f 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -26,6 +26,7 @@ mod user; use config::{Config, TomlConfig}; use context::{ContextBundle, ContextOptions}; use dispatcher::{DefaultDispatcherHandlers, Dispatcher, DispatcherParts}; +use event::Event; // Size of various channels defined below. const CHANNEL_BUFFER_SIZE: usize = 100; @@ -44,7 +45,7 @@ fn old_main(config: Config) { async fn run_client( config: Config, request_rx: mpsc::Receiver, - dispatcher_tx: mpsc::Sender, + event_tx: mpsc::Sender, ) -> anyhow::Result<()> { info!("Connecting to server at {}.", config.server_address); let stream = TcpStream::connect(&config.server_address) @@ -62,8 +63,8 @@ async fn run_client( let (response_tx, mut response_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); let forwarder_task = tokio::spawn(async move { while let Some(response) = response_rx.recv().await { - dispatcher_tx - .send(event::Message::ServerResponse(response)) + event_tx + .send(Event::ServerResponse(response)) .await .expect("dispatcher channel closed"); } @@ -89,7 +90,7 @@ async fn async_main(config: Config) -> anyhow::Result<()> { let bundle = ContextBundle::new(options); - let (dispatcher_tx, mut dispatcher_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); + let (event_tx, mut event_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); let mut control_listener = control::Listener::bind(&config.control_listen_address) @@ -99,18 +100,17 @@ async fn async_main(config: Config) -> anyhow::Result<()> { let client_task = tokio::spawn(run_client( config, bundle.server_request_rx, - dispatcher_tx.clone(), + event_tx.clone(), )); - let control_task = - control_listener.run(dispatcher_tx, bundle.control_response_rx); + let control_task = control_listener.run(event_tx, bundle.control_response_rx); let dispatch_task = tokio::task::spawn_blocking(move || { let mut dispatcher = Dispatcher::from_parts(DispatcherParts { context: bundle.context, handlers: DefaultDispatcherHandlers::default(), }); - while let Some(message) = dispatcher_rx.blocking_recv() { + while let Some(message) = event_rx.blocking_recv() { dispatcher.dispatch(message); } dispatcher.into_parts().context