From d7dad56d3cc95e495059f373bf32a38ec70edce0 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Fri, 18 Nov 2022 18:11:36 +0000 Subject: [PATCH] Handle messages in dispatcher, remove Job abstraction. Introduce dependency injection to Dispatcher to allow testing. TODO: - introduce room/ submodule, move room.rs and room_event.rs in there - dependency injecton for other handlers --- client/src/control/request.rs | 4 +- client/src/dispatcher.rs | 404 +++++++++++++++++++++------------- client/src/main.rs | 15 +- client/src/room_event.rs | 91 ++++++++ 4 files changed, 353 insertions(+), 161 deletions(-) create mode 100644 client/src/room_event.rs diff --git a/client/src/control/request.rs b/client/src/control/request.rs index f0799b8..7af27e8 100644 --- a/client/src/control/request.rs +++ b/client/src/control/request.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; /// This enumeration is the list of possible control requests made by the /// controller client to the client. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum Request { /// The controller wants to connect to a peer. Contains the peer name. PeerConnectRequest(String), @@ -27,7 +27,7 @@ impl From for Request { } /// This structure contains the chat room message request from the controller. -#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct RoomMessageRequest { /// The name of the chat room in which to send the message. pub room_name: String, diff --git a/client/src/dispatcher.rs b/client/src/dispatcher.rs index f19bee5..ee708b5 100644 --- a/client/src/dispatcher.rs +++ b/client/src/dispatcher.rs @@ -9,6 +9,7 @@ use crate::context::Context; use crate::control::Request as ControlRequest; use crate::handlers::*; use crate::message_handler::MessageHandler; +use crate::room_event::{HandleRoomEvent, RoomEvent, RoomEventHandler}; /// The type of messages dispatched by a dispatcher. #[derive(Debug, Eq, PartialEq)] @@ -17,6 +18,7 @@ pub enum Message { ServerResponse(ServerResponse), } +// TODO: Remove? impl From for Message { fn from(response: ServerResponse) -> Self { Self::ServerResponse(response) @@ -29,229 +31,327 @@ impl From for Message { } } -/// Represents a synchronous task that can be run against a context. -pub trait Job: Send { - /// Runs this job against the given context. - fn execute(self: Box, context: &mut Context); +/// Subsystem event handlers to which the `Dispatcher` dispatches events. +pub trait DispatcherHandlers { + type RoomEventHandler: HandleRoomEvent; + fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler; } -/// Pairs together a message and its handler as chosen by the dispatcher. -/// Implements Job so as to erase the exact types involved. -struct DispatchedMessage { - message: M, - handler: H, +/// Default event handlers implementing real behavior. +#[derive(Default)] +pub struct DefaultDispatcherHandlers { + room_event_handler: RoomEventHandler, } -impl Job for DispatchedMessage::Message> -where - H: MessageHandler + Send, - ::Message: Debug + Send, -{ - fn execute(self: Box, context: &mut Context) { - if let Err(error) = self.handler.run(context, &self.message) { - error!( - "Error in handler {}: {:?}\nMessage: {:?}", - H::name(), - error, - &self.message - ); - } +impl DispatcherHandlers for DefaultDispatcherHandlers { + type RoomEventHandler = RoomEventHandler; + fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler { + &mut self.room_event_handler } } -/// The Dispatcher is in charge of mapping messages to their handlers. -pub struct Dispatcher; +/// Parts that make up a `Dispatcher`. +pub struct DispatcherParts { + /// The global context against which messages are handled. + pub context: Context, -impl Dispatcher { - /// Returns a new dispatcher. - pub fn new() -> Self { - Self {} + /// Subsystem event handlers. Injected for testability. + pub handlers: H, +} + +/// The `Dispatcher` is in charge of mapping messages to their handlers. +pub struct Dispatcher { + pub context: Context, + pub handlers: H, +} + +impl Dispatcher { + /// Returns a new `Dispatcher` from the given parts. + pub fn from_parts(parts: DispatcherParts) -> Self { + Self { + context: parts.context, + handlers: parts.handlers, + } + } + + /// Breaks down a `Dispatcher` into its constituent parts. + pub fn into_parts(self) -> DispatcherParts { + DispatcherParts { + context: self.context, + handlers: self.handlers, + } } - /// Dispatches the given message by wrapping it with a handler. - pub fn dispatch(&self, message: Message) -> Option> { + fn dispatch_internal(&mut self, message: Message) -> anyhow::Result<()> { match message { - Message::ControlRequest(ControlRequest::LoginStatusRequest) => { - Some(Box::new(DispatchedMessage { - message: (), - handler: LoginStatusRequestHandler::default(), - })) - } Message::ServerResponse(ServerResponse::PeerAddressResponse( response, - )) => Some(Box::new(DispatchedMessage { - message: response, - handler: PeerAddressResponseHandler::default(), - })), + )) => { + PeerAddressResponseHandler::default().run(&mut self.context, &response) + } Message::ServerResponse(ServerResponse::PrivilegedUsersResponse( response, - )) => Some(Box::new(DispatchedMessage { - message: response, - handler: PrivilegedUsersResponseHandler::default(), - })), + )) => PrivilegedUsersResponseHandler::default() + .run(&mut self.context, &response), Message::ServerResponse(ServerResponse::RoomJoinResponse(response)) => { - Some(Box::new(DispatchedMessage { - message: response, - handler: RoomJoinResponseHandler::default(), - })) + self + .handlers + .room_event_handler() + .handle(&mut self.context, RoomEvent::JoinResponse(response)) } Message::ServerResponse(ServerResponse::RoomMessageResponse( response, - )) => Some(Box::new(DispatchedMessage { - message: response, - handler: RoomMessageResponseHandler::default(), - })), + )) => self + .handlers + .room_event_handler() + .handle(&mut self.context, RoomEvent::MessageResponse(response)), Message::ServerResponse(ServerResponse::RoomListResponse(response)) => { - Some(Box::new(DispatchedMessage { - message: response, - handler: RoomListResponseHandler::default(), - })) + self + .handlers + .room_event_handler() + .handle(&mut self.context, RoomEvent::ListResponse(response)) } Message::ServerResponse(response) => { warn!("Unhandled server response: {:?}", response); - None + Ok(()) } - Message::ControlRequest(ControlRequest::PeerConnectRequest(request)) => { - Some(Box::new(DispatchedMessage { - message: request, - handler: PeerConnectRequestHandler::default(), - })) + Message::ControlRequest(ControlRequest::LoginStatusRequest) => { + LoginStatusRequestHandler::default().run(&mut self.context, &()) } - Message::ControlRequest(ControlRequest::RoomListRequest) => { - Some(Box::new(DispatchedMessage { - message: (), - handler: RoomListRequestHandler::default(), - })) + Message::ControlRequest(ControlRequest::PeerConnectRequest(request)) => { + PeerConnectRequestHandler::default().run(&mut self.context, &request) } + Message::ControlRequest(ControlRequest::RoomListRequest) => self + .handlers + .room_event_handler() + .handle(&mut self.context, RoomEvent::ListRequest), Message::ControlRequest(ControlRequest::RoomMessageRequest(request)) => { - Some(Box::new(DispatchedMessage { - message: request, - handler: RoomMessageRequestHandler::default(), - })) + self + .handlers + .room_event_handler() + .handle(&mut self.context, RoomEvent::MessageRequest(request)) } Message::ControlRequest(ControlRequest::RoomJoinRequest(room_name)) => { - Some(Box::new(DispatchedMessage { - message: room_name, - handler: RoomJoinRequestHandler::default(), - })) + self + .handlers + .room_event_handler() + .handle(&mut self.context, RoomEvent::JoinRequest(room_name)) } Message::ControlRequest(ControlRequest::UserListRequest) => { - Some(Box::new(DispatchedMessage { - message: (), - handler: UserListRequestHandler::default(), - })) + UserListRequestHandler::default().run(&mut self.context, &()) } Message::ControlRequest(request) => { warn!("Unhandled control request: {:?}", request); - None + Ok(()) } } } + + /// Dispatches the given message to the appropriate subsystem handler. + pub fn dispatch(&mut self, message: Message) { + let debug_message = format!("{:?}", &message); + if let Err(error) = self.dispatch_internal(message) { + error!( + "Error handling message: {:?}\nMessage: {}", + error, debug_message + ); + } + } } #[cfg(test)] mod tests { - use crate::control; - use crate::dispatcher::Message; + use std::net::Ipv4Addr; use solstice_proto::server; + use crate::context::ContextBundle; + use crate::control; + use crate::dispatcher::Message; + use crate::room_event::testing::FakeRoomEventHandler; + use super::*; + #[derive(Default)] + struct FakeDispatcherHandlers { + room_event_handler: FakeRoomEventHandler, + } + + impl FakeDispatcherHandlers { + // Defined here for better maintainability as new handlers are added. + fn has_events(&self) -> bool { + !self.room_event_handler.events.is_empty() + } + } + + impl DispatcherHandlers for FakeDispatcherHandlers { + type RoomEventHandler = FakeRoomEventHandler; + fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler { + &mut self.room_event_handler + } + } + + /// Dispatches `message` to fake handlers using a new `Dispatcher` and + /// `Context`, then returns the handlers for inspection. + fn dispatch(message: Message) -> FakeDispatcherHandlers { + let bundle = ContextBundle::default(); + + let mut dispatcher = Dispatcher::from_parts(DispatcherParts { + context: bundle.context, + handlers: FakeDispatcherHandlers::default(), + }); + + dispatcher.dispatch(message); + + dispatcher.into_parts().handlers + } + #[test] fn does_not_dispatch_unhandled_response() { - assert!(Dispatcher::new() - .dispatch(Message::ServerResponse( - server::LoginResponse::LoginFail { - reason: "bleep bloop".to_string(), - } - .into() - )) - .is_none()); + let response = server::LoginResponse::LoginFail { + reason: "bleep bloop".to_string(), + }; + + let handlers = dispatch(Message::ServerResponse(response.into())); + + assert!(!handlers.has_events()); + } + + #[test] + fn dispatches_login_status_request() { + let request = control::Request::LoginStatusRequest; + + let _handlers = dispatch(Message::ControlRequest(request)); + + // TODO: Check that event is dispatched to a new login event handler. + } + + #[test] + fn dispatches_peer_address_response() { + let response = server::PeerAddressResponse { + user_name: "shruti".to_string(), + ip: Ipv4Addr::new(1, 2, 3, 4), + port: 1234, + }; + + let _handlers = dispatch(Message::ServerResponse(response.into())); + + // TODO: Check that event is dispatched to a new peer event handler. + } + + #[test] + fn dispatches_peer_connect_request() { + let request = control::Request::PeerConnectRequest("shruti".to_string()); + + let _handlers = dispatch(Message::ControlRequest(request)); + + // TODO: Check that event is dispatched to a new peer event handler. } #[test] fn dispatches_privileged_users_response() { - assert!(Dispatcher::new() - .dispatch(Message::ServerResponse( - server::PrivilegedUsersResponse { - users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()], - } - .into() - )) - .is_some()); + let response = server::PrivilegedUsersResponse { + users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()], + }; + + let _handlers = dispatch(Message::ServerResponse(response.into())); + + // TODO: Check that event is dispatched to a new user event handler. + } + + #[test] + fn dispatches_room_join_request() { + let request = control::Request::RoomJoinRequest("bleep".to_string()); + + let handlers = dispatch(Message::ControlRequest(request.into())); + + assert_eq!( + handlers.room_event_handler.events, + &[RoomEvent::JoinRequest("bleep".to_string())], + ); } #[test] fn dispatches_room_join_response() { - assert!(Dispatcher::new() - .dispatch(Message::ServerResponse( - server::RoomJoinResponse { - room_name: "bleep".to_string(), - owner: None, - operators: Vec::new(), - users: Vec::new(), - } - .into() - )) - .is_some()); + let response = server::RoomJoinResponse { + room_name: "bleep".to_string(), + owner: None, + operators: Vec::new(), + users: Vec::new(), + }; + + let handlers = dispatch(Message::ServerResponse(response.clone().into())); + + assert_eq!( + handlers.room_event_handler.events, + &[RoomEvent::JoinResponse(response)] + ); } #[test] - fn dispatches_room_message_response() { - assert!(Dispatcher::new() - .dispatch(Message::ServerResponse( - server::RoomMessageResponse { - room_name: "bleep".to_string(), - user_name: "shruti".to_string(), - message: "yo!".to_string(), - } - .into() - )) - .is_some()); + fn dispatches_room_list_request() { + let handlers = dispatch(control::Request::RoomListRequest.into()); + + assert_eq!( + handlers.room_event_handler.events, + &[RoomEvent::ListRequest] + ); } #[test] fn dispatches_room_list_response() { - assert!(Dispatcher::new() - .dispatch(Message::ServerResponse( - server::RoomListResponse { - rooms: Vec::new(), - owned_private_rooms: Vec::new(), - other_private_rooms: Vec::new(), - operated_private_room_names: Vec::new(), - } - .into() - )) - .is_some()); + let response = server::RoomListResponse { + rooms: Vec::new(), + owned_private_rooms: Vec::new(), + other_private_rooms: Vec::new(), + operated_private_room_names: Vec::new(), + }; + + let handlers = dispatch(Message::ServerResponse(response.clone().into())); + + assert_eq!( + handlers.room_event_handler.events, + &[RoomEvent::ListResponse(response)] + ); } #[test] - fn dispatches_room_join_request() { - assert!(Dispatcher::new() - .dispatch(Message::ControlRequest( - control::Request::RoomJoinRequest("bleep".to_string()).into() - )) - .is_some()); + fn dispatches_room_message_request() { + let request = control::RoomMessageRequest { + room_name: "bleep".to_string(), + message: "yo!".to_string(), + }; + + let handlers = dispatch(Message::ControlRequest(request.clone().into())); + + assert_eq!( + handlers.room_event_handler.events, + &[RoomEvent::MessageRequest(request)], + ); } #[test] - fn dispatches_room_message_request() { - assert!(Dispatcher::new() - .dispatch(Message::ControlRequest( - control::RoomMessageRequest { - room_name: "bleep".to_string(), - message: "yo!".to_string(), - } - .into() - )) - .is_some()); + fn dispatches_room_message_response() { + let response = server::RoomMessageResponse { + room_name: "bleep".to_string(), + user_name: "shruti".to_string(), + message: "yo!".to_string(), + }; + + let handlers = dispatch(Message::ServerResponse(response.clone().into())); + + assert_eq!( + handlers.room_event_handler.events, + &[RoomEvent::MessageResponse(response)] + ); } #[test] - fn dispatches_room_list_request() { - assert!(Dispatcher::new() - .dispatch(Message::ControlRequest(control::Request::RoomListRequest)) - .is_some()); + fn dispatches_user_list_request() { + let request = control::Request::UserListRequest; + + let _handlers = dispatch(request.into()); + + // TODO: Check that event is dispatched to a new user event handler. } } diff --git a/client/src/main.rs b/client/src/main.rs index 3aae563..a9e933f 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -18,13 +18,14 @@ mod login; mod message_handler; mod peer; mod room; +mod room_event; #[cfg(test)] mod testing; mod user; use config::{Config, TomlConfig}; use context::{ContextBundle, ContextOptions}; -use dispatcher::Dispatcher; +use dispatcher::{DefaultDispatcherHandlers, Dispatcher, DispatcherParts}; // Size of various channels defined below. const CHANNEL_BUFFER_SIZE: usize = 100; @@ -101,18 +102,18 @@ async fn async_main(config: Config) -> anyhow::Result<()> { dispatcher_tx.clone(), )); - let dispatcher = Dispatcher::new(); let control_task = control_listener.run(dispatcher_tx, bundle.control_response_rx); let dispatch_task = tokio::task::spawn_blocking(move || { - let mut context = bundle.context; + let mut dispatcher = Dispatcher::from_parts(DispatcherParts { + context: bundle.context, + handlers: DefaultDispatcherHandlers::default(), + }); while let Some(message) = dispatcher_rx.blocking_recv() { - if let Some(job) = dispatcher.dispatch(message) { - job.execute(&mut context); - } + dispatcher.dispatch(message); } - context + dispatcher.into_parts().context }); tokio::select! { diff --git a/client/src/room_event.rs b/client/src/room_event.rs new file mode 100644 index 0000000..28e0cc6 --- /dev/null +++ b/client/src/room_event.rs @@ -0,0 +1,91 @@ +use solstice_proto::server::{ + RoomJoinResponse, RoomListResponse, RoomMessageResponse, +}; + +use crate::context::Context; +use crate::control::RoomMessageRequest; +use crate::handlers::{ + RoomJoinRequestHandler, RoomJoinResponseHandler, RoomListRequestHandler, + RoomListResponseHandler, RoomMessageRequestHandler, + RoomMessageResponseHandler, +}; +use crate::message_handler::MessageHandler; + +#[derive(Debug, PartialEq, Eq)] +pub enum RoomEvent { + JoinRequest(String), + JoinResponse(RoomJoinResponse), + ListRequest, + ListResponse(RoomListResponse), + MessageRequest(RoomMessageRequest), + MessageResponse(RoomMessageResponse), +} + +pub trait HandleRoomEvent { + fn handle( + &mut self, + context: &mut Context, + event: RoomEvent, + ) -> anyhow::Result<()>; +} + +#[derive(Default)] +pub struct RoomEventHandler; + +impl HandleRoomEvent for RoomEventHandler { + fn handle( + &mut self, + context: &mut Context, + event: RoomEvent, + ) -> anyhow::Result<()> { + // TODO: Remove individual handlers, move code into RoomEventHandler. + match event { + RoomEvent::JoinRequest(room_name) => { + RoomJoinRequestHandler::default().run(context, &room_name) + } + RoomEvent::JoinResponse(response) => { + RoomJoinResponseHandler::default().run(context, &response) + } + RoomEvent::ListRequest => { + RoomListRequestHandler::default().run(context, &()) + } + RoomEvent::ListResponse(response) => { + RoomListResponseHandler::default().run(context, &response) + } + RoomEvent::MessageRequest(request) => { + RoomMessageRequestHandler::default().run(context, &request) + } + RoomEvent::MessageResponse(response) => { + RoomMessageResponseHandler::default().run(context, &response) + } + } + } +} + +#[cfg(test)] +pub mod testing { + use crate::context::Context; + + use super::{HandleRoomEvent, RoomEvent}; + + #[derive(Default)] + pub struct FakeRoomEventHandler { + pub events: Vec, + } + + impl HandleRoomEvent for FakeRoomEventHandler { + fn handle( + &mut self, + _context: &mut Context, + event: RoomEvent, + ) -> anyhow::Result<()> { + self.events.push(event); + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + // TODO +}