1 Commits

Author SHA1 Message Date
  Titouan Rigoudy 214278b418 Handle messages in dispatcher, remove Job abstraction. 3 years ago
4 changed files with 352 additions and 161 deletions
Split View
  1. +2
    -2
      client/src/control/request.rs
  2. +251
    -152
      client/src/dispatcher.rs
  3. +8
    -7
      client/src/main.rs
  4. +91
    -0
      client/src/room_event.rs

+ 2
- 2
client/src/control/request.rs View File

@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
/// This enumeration is the list of possible control requests made by the
/// controller client to the client.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Request {
/// The controller wants to connect to a peer. Contains the peer name.
PeerConnectRequest(String),
@ -27,7 +27,7 @@ impl From<RoomMessageRequest> for Request {
}
/// This structure contains the chat room message request from the controller.
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct RoomMessageRequest {
/// The name of the chat room in which to send the message.
pub room_name: String,


+ 251
- 152
client/src/dispatcher.rs View File

@ -9,6 +9,7 @@ use crate::context::Context;
use crate::control::Request as ControlRequest;
use crate::handlers::*;
use crate::message_handler::MessageHandler;
use crate::room_event::{HandleRoomEvent, RoomEvent, RoomEventHandler};
/// The type of messages dispatched by a dispatcher.
#[derive(Debug, Eq, PartialEq)]
@ -29,229 +30,327 @@ impl From<ControlRequest> for Message {
}
}
/// Represents a synchronous task that can be run against a context.
pub trait Job: Send {
/// Runs this job against the given context.
fn execute(self: Box<Self>, context: &mut Context);
/// Subsystem event handlers to which the `Dispatcher` dispatches events.
pub trait DispatcherHandlers {
type RoomEventHandler: HandleRoomEvent;
fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler;
}
/// Pairs together a message and its handler as chosen by the dispatcher.
/// Implements Job so as to erase the exact types involved.
struct DispatchedMessage<H, M> {
message: M,
handler: H,
/// Default event handlers implementing real behavior.
#[derive(Default)]
pub struct DefaultDispatcherHandlers {
room_event_handler: RoomEventHandler,
}
impl<H> Job for DispatchedMessage<H, <H as MessageHandler>::Message>
where
H: MessageHandler + Send,
<H as MessageHandler>::Message: Debug + Send,
{
fn execute(self: Box<Self>, context: &mut Context) {
if let Err(error) = self.handler.run(context, &self.message) {
error!(
"Error in handler {}: {:?}\nMessage: {:?}",
H::name(),
error,
&self.message
);
}
impl DispatcherHandlers for DefaultDispatcherHandlers {
type RoomEventHandler = RoomEventHandler;
fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler {
&mut self.room_event_handler
}
}
/// The Dispatcher is in charge of mapping messages to their handlers.
pub struct Dispatcher;
/// Parts that make up a `Dispatcher`.
pub struct DispatcherParts<H> {
/// The global context against which messages are handled.
pub context: Context,
impl Dispatcher {
/// Returns a new dispatcher.
pub fn new() -> Self {
Self {}
/// Subsystem event handlers. Injected for testability.
pub handlers: H,
}
/// The `Dispatcher` is in charge of mapping messages to their handlers.
pub struct Dispatcher<H> {
pub context: Context,
pub handlers: H,
}
impl<H: DispatcherHandlers> Dispatcher<H> {
/// Returns a new `Dispatcher` from the given parts.
pub fn from_parts(parts: DispatcherParts<H>) -> Self {
Self {
context: parts.context,
handlers: parts.handlers,
}
}
/// Breaks down a `Dispatcher` into its constituent parts.
pub fn into_parts(self) -> DispatcherParts<H> {
DispatcherParts {
context: self.context,
handlers: self.handlers,
}
}
/// Dispatches the given message by wrapping it with a handler.
pub fn dispatch(&self, message: Message) -> Option<Box<dyn Job>> {
fn dispatch_internal(&mut self, message: Message) -> anyhow::Result<()> {
match message {
Message::ControlRequest(ControlRequest::LoginStatusRequest) => {
Some(Box::new(DispatchedMessage {
message: (),
handler: LoginStatusRequestHandler::default(),
}))
}
Message::ServerResponse(ServerResponse::PeerAddressResponse(
response,
)) => Some(Box::new(DispatchedMessage {
message: response,
handler: PeerAddressResponseHandler::default(),
})),
)) => {
PeerAddressResponseHandler::default().run(&mut self.context, &response)
}
Message::ServerResponse(ServerResponse::PrivilegedUsersResponse(
response,
)) => Some(Box::new(DispatchedMessage {
message: response,
handler: PrivilegedUsersResponseHandler::default(),
})),
)) => PrivilegedUsersResponseHandler::default()
.run(&mut self.context, &response),
Message::ServerResponse(ServerResponse::RoomJoinResponse(response)) => {
Some(Box::new(DispatchedMessage {
message: response,
handler: RoomJoinResponseHandler::default(),
}))
self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::JoinResponse(response))
}
Message::ServerResponse(ServerResponse::RoomMessageResponse(
response,
)) => Some(Box::new(DispatchedMessage {
message: response,
handler: RoomMessageResponseHandler::default(),
})),
)) => self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::MessageResponse(response)),
Message::ServerResponse(ServerResponse::RoomListResponse(response)) => {
Some(Box::new(DispatchedMessage {
message: response,
handler: RoomListResponseHandler::default(),
}))
self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::ListResponse(response))
}
Message::ServerResponse(response) => {
warn!("Unhandled server response: {:?}", response);
None
Ok(())
}
Message::ControlRequest(ControlRequest::PeerConnectRequest(request)) => {
Some(Box::new(DispatchedMessage {
message: request,
handler: PeerConnectRequestHandler::default(),
}))
Message::ControlRequest(ControlRequest::LoginStatusRequest) => {
LoginStatusRequestHandler::default().run(&mut self.context, &())
}
Message::ControlRequest(ControlRequest::RoomListRequest) => {
Some(Box::new(DispatchedMessage {
message: (),
handler: RoomListRequestHandler::default(),
}))
Message::ControlRequest(ControlRequest::PeerConnectRequest(request)) => {
PeerConnectRequestHandler::default().run(&mut self.context, &request)
}
Message::ControlRequest(ControlRequest::RoomListRequest) => self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::ListRequest),
Message::ControlRequest(ControlRequest::RoomMessageRequest(request)) => {
Some(Box::new(DispatchedMessage {
message: request,
handler: RoomMessageRequestHandler::default(),
}))
self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::MessageRequest(request))
}
Message::ControlRequest(ControlRequest::RoomJoinRequest(room_name)) => {
Some(Box::new(DispatchedMessage {
message: room_name,
handler: RoomJoinRequestHandler::default(),
}))
self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::JoinRequest(room_name))
}
Message::ControlRequest(ControlRequest::UserListRequest) => {
Some(Box::new(DispatchedMessage {
message: (),
handler: UserListRequestHandler::default(),
}))
UserListRequestHandler::default().run(&mut self.context, &())
}
Message::ControlRequest(request) => {
warn!("Unhandled control request: {:?}", request);
None
Ok(())
}
}
}
/// Dispatches the given message to the appropriate subsystem handler.
pub fn dispatch(&mut self, message: Message) {
let debug_message = format!("{:?}", &message);
if let Err(error) = self.dispatch_internal(message) {
error!(
"Error handling message: {:?}\nMessage: {}",
error, debug_message
);
}
}
}
#[cfg(test)]
mod tests {
use crate::control;
use crate::dispatcher::Message;
use std::net::Ipv4Addr;
use solstice_proto::server;
use crate::context::ContextBundle;
use crate::control;
use crate::dispatcher::Message;
use crate::room_event::testing::FakeRoomEventHandler;
use super::*;
#[derive(Default)]
struct FakeDispatcherHandlers {
room_event_handler: FakeRoomEventHandler,
}
impl FakeDispatcherHandlers {
// Defined here for better maintainability as new handlers are added.
fn has_events(&self) -> bool {
!self.room_event_handler.events.is_empty()
}
}
impl DispatcherHandlers for FakeDispatcherHandlers {
type RoomEventHandler = FakeRoomEventHandler;
fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler {
&mut self.room_event_handler
}
}
/// Dispatches `message` to fake handlers using a new `Dispatcher` and
/// `Context`, then returns the handlers for inspection.
fn dispatch(message: Message) -> FakeDispatcherHandlers {
let bundle = ContextBundle::default();
let mut dispatcher = Dispatcher::from_parts(DispatcherParts {
context: bundle.context,
handlers: FakeDispatcherHandlers::default(),
});
dispatcher.dispatch(message);
dispatcher.into_parts().handlers
}
#[test]
fn does_not_dispatch_unhandled_response() {
assert!(Dispatcher::new()
.dispatch(Message::ServerResponse(
server::LoginResponse::LoginFail {
reason: "bleep bloop".to_string(),
}
.into()
))
.is_none());
let response = server::LoginResponse::LoginFail {
reason: "bleep bloop".to_string(),
};
let handlers = dispatch(Message::ServerResponse(response.into()));
assert!(!handlers.has_events());
}
#[test]
fn dispatches_login_status_request() {
let request = control::Request::LoginStatusRequest;
let _handlers = dispatch(Message::ControlRequest(request));
// TODO: Check that event is dispatched to a new login event handler.
}
#[test]
fn dispatches_peer_address_response() {
let response = server::PeerAddressResponse {
user_name: "shruti".to_string(),
ip: Ipv4Addr::new(1, 2, 3, 4),
port: 1234,
};
let _handlers = dispatch(Message::ServerResponse(response.into()));
// TODO: Check that event is dispatched to a new peer event handler.
}
#[test]
fn dispatches_peer_connect_request() {
let request = control::Request::PeerConnectRequest("shruti".to_string());
let _handlers = dispatch(Message::ControlRequest(request));
// TODO: Check that event is dispatched to a new peer event handler.
}
#[test]
fn dispatches_privileged_users_response() {
assert!(Dispatcher::new()
.dispatch(Message::ServerResponse(
server::PrivilegedUsersResponse {
users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()],
}
.into()
))
.is_some());
let response = server::PrivilegedUsersResponse {
users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()],
};
let _handlers = dispatch(Message::ServerResponse(response.into()));
// TODO: Check that event is dispatched to a new user event handler.
}
#[test]
fn dispatches_room_join_request() {
let request = control::Request::RoomJoinRequest("bleep".to_string());
let handlers = dispatch(Message::ControlRequest(request.into()));
assert_eq!(
handlers.room_event_handler.events,
&[RoomEvent::JoinRequest("bleep".to_string())],
);
}
#[test]
fn dispatches_room_join_response() {
assert!(Dispatcher::new()
.dispatch(Message::ServerResponse(
server::RoomJoinResponse {
room_name: "bleep".to_string(),
owner: None,
operators: Vec::new(),
users: Vec::new(),
}
.into()
))
.is_some());
let response = server::RoomJoinResponse {
room_name: "bleep".to_string(),
owner: None,
operators: Vec::new(),
users: Vec::new(),
};
let handlers = dispatch(Message::ServerResponse(response.clone().into()));
assert_eq!(
handlers.room_event_handler.events,
&[RoomEvent::JoinResponse(response)]
);
}
#[test]
fn dispatches_room_message_response() {
assert!(Dispatcher::new()
.dispatch(Message::ServerResponse(
server::RoomMessageResponse {
room_name: "bleep".to_string(),
user_name: "shruti".to_string(),
message: "yo!".to_string(),
}
.into()
))
.is_some());
fn dispatches_room_list_request() {
let handlers = dispatch(control::Request::RoomListRequest.into());
assert_eq!(
handlers.room_event_handler.events,
&[RoomEvent::ListRequest]
);
}
#[test]
fn dispatches_room_list_response() {
assert!(Dispatcher::new()
.dispatch(Message::ServerResponse(
server::RoomListResponse {
rooms: Vec::new(),
owned_private_rooms: Vec::new(),
other_private_rooms: Vec::new(),
operated_private_room_names: Vec::new(),
}
.into()
))
.is_some());
let response = server::RoomListResponse {
rooms: Vec::new(),
owned_private_rooms: Vec::new(),
other_private_rooms: Vec::new(),
operated_private_room_names: Vec::new(),
};
let handlers = dispatch(Message::ServerResponse(response.clone().into()));
assert_eq!(
handlers.room_event_handler.events,
&[RoomEvent::ListResponse(response)]
);
}
#[test]
fn dispatches_room_join_request() {
assert!(Dispatcher::new()
.dispatch(Message::ControlRequest(
control::Request::RoomJoinRequest("bleep".to_string()).into()
))
.is_some());
fn dispatches_room_message_request() {
let request = control::RoomMessageRequest {
room_name: "bleep".to_string(),
message: "yo!".to_string(),
};
let handlers = dispatch(Message::ControlRequest(request.clone().into()));
assert_eq!(
handlers.room_event_handler.events,
&[RoomEvent::MessageRequest(request)],
);
}
#[test]
fn dispatches_room_message_request() {
assert!(Dispatcher::new()
.dispatch(Message::ControlRequest(
control::RoomMessageRequest {
room_name: "bleep".to_string(),
message: "yo!".to_string(),
}
.into()
))
.is_some());
fn dispatches_room_message_response() {
let response = server::RoomMessageResponse {
room_name: "bleep".to_string(),
user_name: "shruti".to_string(),
message: "yo!".to_string(),
};
let handlers = dispatch(Message::ServerResponse(response.clone().into()));
assert_eq!(
handlers.room_event_handler.events,
&[RoomEvent::MessageResponse(response)]
);
}
#[test]
fn dispatches_room_list_request() {
assert!(Dispatcher::new()
.dispatch(Message::ControlRequest(control::Request::RoomListRequest))
.is_some());
fn dispatches_user_list_request() {
let request = control::Request::UserListRequest;
let _handlers = dispatch(request.into());
// TODO: Check that event is dispatched to a new user event handler.
}
}

+ 8
- 7
client/src/main.rs View File

@ -18,13 +18,14 @@ mod login;
mod message_handler;
mod peer;
mod room;
mod room_event;
#[cfg(test)]
mod testing;
mod user;
use config::{Config, TomlConfig};
use context::{ContextBundle, ContextOptions};
use dispatcher::Dispatcher;
use dispatcher::{DefaultDispatcherHandlers, Dispatcher, DispatcherParts};
// Size of various channels defined below.
const CHANNEL_BUFFER_SIZE: usize = 100;
@ -101,18 +102,18 @@ async fn async_main(config: Config) -> anyhow::Result<()> {
dispatcher_tx.clone(),
));
let dispatcher = Dispatcher::new();
let control_task =
control_listener.run(dispatcher_tx, bundle.control_response_rx);
let dispatch_task = tokio::task::spawn_blocking(move || {
let mut context = bundle.context;
let mut dispatcher = Dispatcher::from_parts(DispatcherParts {
context: bundle.context,
handlers: DefaultDispatcherHandlers::default(),
});
while let Some(message) = dispatcher_rx.blocking_recv() {
if let Some(job) = dispatcher.dispatch(message) {
job.execute(&mut context);
}
dispatcher.dispatch(message);
}
context
dispatcher.into_parts().context
});
tokio::select! {


+ 91
- 0
client/src/room_event.rs View File

@ -0,0 +1,91 @@
use solstice_proto::server::{
RoomJoinResponse, RoomListResponse, RoomMessageResponse,
};
use crate::context::Context;
use crate::control::RoomMessageRequest;
use crate::handlers::{
RoomJoinRequestHandler, RoomJoinResponseHandler, RoomListRequestHandler,
RoomListResponseHandler, RoomMessageRequestHandler,
RoomMessageResponseHandler,
};
use crate::message_handler::MessageHandler;
#[derive(Debug, PartialEq, Eq)]
pub enum RoomEvent {
JoinRequest(String),
JoinResponse(RoomJoinResponse),
ListRequest,
ListResponse(RoomListResponse),
MessageRequest(RoomMessageRequest),
MessageResponse(RoomMessageResponse),
}
pub trait HandleRoomEvent {
fn handle(
&mut self,
context: &mut Context,
event: RoomEvent,
) -> anyhow::Result<()>;
}
#[derive(Default)]
pub struct RoomEventHandler;
impl HandleRoomEvent for RoomEventHandler {
fn handle(
&mut self,
context: &mut Context,
event: RoomEvent,
) -> anyhow::Result<()> {
// TODO: Remove individual handlers, move code into RoomEventHandler.
match event {
RoomEvent::JoinRequest(room_name) => {
RoomJoinRequestHandler::default().run(context, &room_name)
}
RoomEvent::JoinResponse(response) => {
RoomJoinResponseHandler::default().run(context, &response)
}
RoomEvent::ListRequest => {
RoomListRequestHandler::default().run(context, &())
}
RoomEvent::ListResponse(response) => {
RoomListResponseHandler::default().run(context, &response)
}
RoomEvent::MessageRequest(request) => {
RoomMessageRequestHandler::default().run(context, &request)
}
RoomEvent::MessageResponse(response) => {
RoomMessageResponseHandler::default().run(context, &response)
}
}
}
}
#[cfg(test)]
pub mod testing {
use crate::context::Context;
use super::{HandleRoomEvent, RoomEvent};
#[derive(Default)]
pub struct FakeRoomEventHandler {
pub events: Vec<RoomEvent>,
}
impl HandleRoomEvent for FakeRoomEventHandler {
fn handle(
&mut self,
_context: &mut Context,
event: RoomEvent,
) -> anyhow::Result<()> {
self.events.push(event);
Ok(())
}
}
}
#[cfg(test)]
mod tests {
// TODO
}

Loading…
Cancel
Save