Browse Source

Rename Message to Event.

main
Titouan Rigoudy 2 years ago
parent
commit
edfb46dd01
4 changed files with 77 additions and 89 deletions
  1. +18
    -18
      client/src/control/listener.rs
  2. +47
    -58
      client/src/dispatcher.rs
  3. +4
    -5
      client/src/event.rs
  4. +8
    -8
      client/src/main.rs

+ 18
- 18
client/src/control/listener.rs View File

@ -13,11 +13,11 @@ use tokio_tungstenite::WebSocketStream;
use crate::control::request::*; use crate::control::request::*;
use crate::control::response::*; use crate::control::response::*;
use crate::event::Message;
use crate::event::Event;
async fn forward_incoming( async fn forward_incoming(
mut incoming: SplitStream<WebSocketStream<TcpStream>>, mut incoming: SplitStream<WebSocketStream<TcpStream>>,
message_tx: &mpsc::Sender<Message>,
event_tx: &mpsc::Sender<Event>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
while let Some(result) = incoming.next().await { while let Some(result) = incoming.next().await {
if let Err(WebSocketError::ConnectionClosed) = result { if let Err(WebSocketError::ConnectionClosed) = result {
@ -31,8 +31,8 @@ async fn forward_incoming(
let control_request: Request = serde_json::from_str(&text) let control_request: Request = serde_json::from_str(&text)
.with_context(|| format!("decoding JSON message {:?}", text))?; .with_context(|| format!("decoding JSON message {:?}", text))?;
message_tx
.send(Message::ControlRequest(control_request))
event_tx
.send(Event::ControlRequest(control_request))
.await .await
.context("dispatcher channel closed")?; .context("dispatcher channel closed")?;
} }
@ -60,7 +60,7 @@ async fn forward_outgoing(
async fn handle( async fn handle(
stream: TcpStream, stream: TcpStream,
remote_address: &SocketAddr, remote_address: &SocketAddr,
message_tx: &mpsc::Sender<Message>,
event_tx: &mpsc::Sender<Event>,
response_rx: &mut mpsc::Receiver<Response>, response_rx: &mut mpsc::Receiver<Response>,
) { ) {
let ws_stream = match tokio_tungstenite::accept_async(stream).await { let ws_stream = match tokio_tungstenite::accept_async(stream).await {
@ -79,7 +79,7 @@ async fn handle(
let (mut outgoing, incoming) = ws_stream.split(); let (mut outgoing, incoming) = ws_stream.split();
tokio::select! { tokio::select! {
result = forward_incoming(incoming, message_tx) => match result {
result = forward_incoming(incoming, event_tx) => match result {
Ok(()) => info!( Ok(()) => info!(
"Incoming WebSocket handler task for {} stopped", "Incoming WebSocket handler task for {} stopped",
remote_address, remote_address,
@ -132,11 +132,11 @@ impl Listener {
} }
/// Starts accepting control connections, one at a time. For each connection, /// Starts accepting control connections, one at a time. For each connection,
/// forwards incoming messages from the socket to `message_tx` and outgoing
/// forwards incoming messages from the socket to `event_tx` and outgoing
/// responses from `response_rx` to the socket. /// responses from `response_rx` to the socket.
pub async fn run( pub async fn run(
&mut self, &mut self,
message_tx: mpsc::Sender<Message>,
event_tx: mpsc::Sender<Event>,
mut response_rx: mpsc::Receiver<Response>, mut response_rx: mpsc::Receiver<Response>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("Accepting control connections on {}", self.address()); info!("Accepting control connections on {}", self.address());
@ -163,7 +163,7 @@ impl Listener {
info!("Accepted control connection from {}", remote_address); info!("Accepted control connection from {}", remote_address);
handle(stream, &remote_address, &message_tx, &mut response_rx).await
handle(stream, &remote_address, &event_tx, &mut response_rx).await
} }
Ok(()) Ok(())
@ -185,19 +185,19 @@ mod tests {
use tokio_tungstenite::tungstenite::Message as WebSocketMessage; use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
use crate::control::{Request, Response, RoomLeaveResponse}; use crate::control::{Request, Response, RoomLeaveResponse};
use crate::event::Message;
use crate::event::Event;
// A bound `Listener` packaged with the channels it needs to run. // A bound `Listener` packaged with the channels it needs to run.
// Convenient for tests. // Convenient for tests.
struct RunnableListener { struct RunnableListener {
inner: Listener, inner: Listener,
message_tx: mpsc::Sender<Message>,
event_tx: mpsc::Sender<Event>,
response_rx: mpsc::Receiver<Response>, response_rx: mpsc::Receiver<Response>,
} }
impl RunnableListener { impl RunnableListener {
async fn run(mut self) -> anyhow::Result<()> { async fn run(mut self) -> anyhow::Result<()> {
self.inner.run(self.message_tx, self.response_rx).await
self.inner.run(self.event_tx, self.response_rx).await
} }
} }
@ -206,7 +206,7 @@ mod tests {
pub listener: RunnableListener, pub listener: RunnableListener,
pub address: SocketAddr, pub address: SocketAddr,
pub websocket_address: String, pub websocket_address: String,
pub message_rx: mpsc::Receiver<Message>,
pub event_rx: mpsc::Receiver<Event>,
pub response_tx: mpsc::Sender<Response>, pub response_tx: mpsc::Sender<Response>,
} }
@ -217,12 +217,12 @@ mod tests {
let address = inner.address().clone(); let address = inner.address().clone();
let websocket_address = format!("ws://{}", address); let websocket_address = format!("ws://{}", address);
let (message_tx, message_rx) = mpsc::channel(100);
let (event_tx, event_rx) = mpsc::channel(100);
let (response_tx, response_rx) = mpsc::channel(100); let (response_tx, response_rx) = mpsc::channel(100);
let listener = RunnableListener { let listener = RunnableListener {
inner, inner,
message_tx,
event_tx,
response_rx, response_rx,
}; };
@ -230,7 +230,7 @@ mod tests {
listener, listener,
address, address,
websocket_address, websocket_address,
message_rx,
event_rx,
response_tx, response_tx,
}) })
} }
@ -335,8 +335,8 @@ mod tests {
.context("sending request")?; .context("sending request")?;
assert_eq!( assert_eq!(
bundle.message_rx.recv().await,
Some(Message::ControlRequest(Request::RoomListRequest))
bundle.event_rx.recv().await,
Some(Event::ControlRequest(Request::RoomListRequest))
); );
// Dropping this sender signals to the listener that it should stop. // Dropping this sender signals to the listener that it should stop.


+ 47
- 58
client/src/dispatcher.rs View File

@ -1,18 +1,18 @@
//! This module defines the central message dispatcher to the client process.
//! This module defines the central event dispatcher to the client process.
use log::{error, warn}; use log::{error, warn};
use solstice_proto::server::ServerResponse; use solstice_proto::server::ServerResponse;
use crate::context::Context; use crate::context::Context;
use crate::control::Request as ControlRequest; use crate::control::Request as ControlRequest;
use crate::event::{Message, EventHandler};
use crate::event::{Event, EventHandler};
use crate::handlers::*; use crate::handlers::*;
use crate::message_handler::MessageHandler; use crate::message_handler::MessageHandler;
use crate::room::{RoomEvent, RoomEventHandler}; use crate::room::{RoomEvent, RoomEventHandler};
/// Subsystem event handlers to which the `Dispatcher` dispatches events. /// Subsystem event handlers to which the `Dispatcher` dispatches events.
pub trait DispatcherHandlers { pub trait DispatcherHandlers {
type RoomEventHandler: EventHandler<Event=RoomEvent>;
type RoomEventHandler: EventHandler<Event = RoomEvent>;
fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler; fn room_event_handler(&mut self) -> &mut Self::RoomEventHandler;
} }
@ -31,14 +31,14 @@ impl DispatcherHandlers for DefaultDispatcherHandlers {
/// Parts that make up a `Dispatcher`. /// Parts that make up a `Dispatcher`.
pub struct DispatcherParts<H> { pub struct DispatcherParts<H> {
/// The global context against which messages are handled.
/// The global context against which events are handled.
pub context: Context, pub context: Context,
/// Subsystem event handlers. Injected for testability. /// Subsystem event handlers. Injected for testability.
pub handlers: H, pub handlers: H,
} }
/// The `Dispatcher` is in charge of mapping messages to their handlers.
/// The `Dispatcher` is in charge of mapping events to their handlers.
pub struct Dispatcher<H> { pub struct Dispatcher<H> {
pub context: Context, pub context: Context,
pub handlers: H, pub handlers: H,
@ -61,79 +61,68 @@ impl<H: DispatcherHandlers> Dispatcher<H> {
} }
} }
fn dispatch_internal(&mut self, message: Message) -> anyhow::Result<()> {
match message {
Message::ServerResponse(ServerResponse::PeerAddressResponse(
response,
)) => {
fn dispatch_internal(&mut self, event: Event) -> anyhow::Result<()> {
match event {
Event::ServerResponse(ServerResponse::PeerAddressResponse(response)) => {
PeerAddressResponseHandler::default().run(&mut self.context, &response) PeerAddressResponseHandler::default().run(&mut self.context, &response)
} }
Message::ServerResponse(ServerResponse::PrivilegedUsersResponse(
Event::ServerResponse(ServerResponse::PrivilegedUsersResponse(
response, response,
)) => PrivilegedUsersResponseHandler::default() )) => PrivilegedUsersResponseHandler::default()
.run(&mut self.context, &response), .run(&mut self.context, &response),
Message::ServerResponse(ServerResponse::RoomJoinResponse(response)) => {
self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::JoinResponse(response))
}
Message::ServerResponse(ServerResponse::RoomMessageResponse(
response,
)) => self
Event::ServerResponse(ServerResponse::RoomJoinResponse(response)) => self
.handlers .handlers
.room_event_handler() .room_event_handler()
.handle(&mut self.context, RoomEvent::MessageResponse(response)),
Message::ServerResponse(ServerResponse::RoomListResponse(response)) => {
.handle(&mut self.context, RoomEvent::JoinResponse(response)),
Event::ServerResponse(ServerResponse::RoomMessageResponse(response)) => {
self self
.handlers .handlers
.room_event_handler() .room_event_handler()
.handle(&mut self.context, RoomEvent::ListResponse(response))
.handle(&mut self.context, RoomEvent::MessageResponse(response))
} }
Message::ServerResponse(response) => {
Event::ServerResponse(ServerResponse::RoomListResponse(response)) => self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::ListResponse(response)),
Event::ServerResponse(response) => {
warn!("Unhandled server response: {:?}", response); warn!("Unhandled server response: {:?}", response);
Ok(()) Ok(())
} }
Message::ControlRequest(ControlRequest::LoginStatusRequest) => {
Event::ControlRequest(ControlRequest::LoginStatusRequest) => {
LoginStatusRequestHandler::default().run(&mut self.context, &()) LoginStatusRequestHandler::default().run(&mut self.context, &())
} }
Message::ControlRequest(ControlRequest::PeerConnectRequest(request)) => {
Event::ControlRequest(ControlRequest::PeerConnectRequest(request)) => {
PeerConnectRequestHandler::default().run(&mut self.context, &request) PeerConnectRequestHandler::default().run(&mut self.context, &request)
} }
Message::ControlRequest(ControlRequest::RoomListRequest) => self
Event::ControlRequest(ControlRequest::RoomListRequest) => self
.handlers .handlers
.room_event_handler() .room_event_handler()
.handle(&mut self.context, RoomEvent::ListRequest), .handle(&mut self.context, RoomEvent::ListRequest),
Message::ControlRequest(ControlRequest::RoomMessageRequest(request)) => {
Event::ControlRequest(ControlRequest::RoomMessageRequest(request)) => {
self self
.handlers .handlers
.room_event_handler() .room_event_handler()
.handle(&mut self.context, RoomEvent::MessageRequest(request)) .handle(&mut self.context, RoomEvent::MessageRequest(request))
} }
Message::ControlRequest(ControlRequest::RoomJoinRequest(room_name)) => {
self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::JoinRequest(room_name))
}
Message::ControlRequest(ControlRequest::UserListRequest) => {
Event::ControlRequest(ControlRequest::RoomJoinRequest(room_name)) => self
.handlers
.room_event_handler()
.handle(&mut self.context, RoomEvent::JoinRequest(room_name)),
Event::ControlRequest(ControlRequest::UserListRequest) => {
UserListRequestHandler::default().run(&mut self.context, &()) UserListRequestHandler::default().run(&mut self.context, &())
} }
Message::ControlRequest(request) => {
Event::ControlRequest(request) => {
warn!("Unhandled control request: {:?}", request); warn!("Unhandled control request: {:?}", request);
Ok(()) Ok(())
} }
} }
} }
/// Dispatches the given message to the appropriate subsystem handler.
pub fn dispatch(&mut self, message: Message) {
let debug_message = format!("{:?}", &message);
if let Err(error) = self.dispatch_internal(message) {
error!(
"Error handling message: {:?}\nMessage: {}",
error, debug_message
);
/// Dispatches the given event to the appropriate subsystem handler.
pub fn dispatch(&mut self, event: Event) {
let debug_event = format!("{:?}", &event);
if let Err(error) = self.dispatch_internal(event) {
error!("Error handling event: {:?}\nEvent: {}", error, debug_event);
} }
} }
} }
@ -146,7 +135,7 @@ mod tests {
use crate::context::ContextBundle; use crate::context::ContextBundle;
use crate::control; use crate::control;
use crate::dispatcher::Message;
use crate::dispatcher::Event;
use crate::room::testing::FakeRoomEventHandler; use crate::room::testing::FakeRoomEventHandler;
use super::*; use super::*;
@ -170,9 +159,9 @@ mod tests {
} }
} }
/// Dispatches `message` to fake handlers using a new `Dispatcher` and
/// Dispatches `event` to fake handlers using a new `Dispatcher` and
/// `Context`, then returns the handlers for inspection. /// `Context`, then returns the handlers for inspection.
fn dispatch(message: Message) -> FakeDispatcherHandlers {
fn dispatch(event: Event) -> FakeDispatcherHandlers {
let bundle = ContextBundle::default(); let bundle = ContextBundle::default();
let mut dispatcher = Dispatcher::from_parts(DispatcherParts { let mut dispatcher = Dispatcher::from_parts(DispatcherParts {
@ -180,7 +169,7 @@ mod tests {
handlers: FakeDispatcherHandlers::default(), handlers: FakeDispatcherHandlers::default(),
}); });
dispatcher.dispatch(message);
dispatcher.dispatch(event);
dispatcher.into_parts().handlers dispatcher.into_parts().handlers
} }
@ -191,7 +180,7 @@ mod tests {
reason: "bleep bloop".to_string(), reason: "bleep bloop".to_string(),
}; };
let handlers = dispatch(Message::ServerResponse(response.into()));
let handlers = dispatch(Event::ServerResponse(response.into()));
assert!(!handlers.has_events()); assert!(!handlers.has_events());
} }
@ -200,7 +189,7 @@ mod tests {
fn dispatches_login_status_request() { fn dispatches_login_status_request() {
let request = control::Request::LoginStatusRequest; let request = control::Request::LoginStatusRequest;
let _handlers = dispatch(Message::ControlRequest(request));
let _handlers = dispatch(Event::ControlRequest(request));
// TODO: Check that event is dispatched to a new login event handler. // TODO: Check that event is dispatched to a new login event handler.
} }
@ -213,7 +202,7 @@ mod tests {
port: 1234, port: 1234,
}; };
let _handlers = dispatch(Message::ServerResponse(response.into()));
let _handlers = dispatch(Event::ServerResponse(response.into()));
// TODO: Check that event is dispatched to a new peer event handler. // TODO: Check that event is dispatched to a new peer event handler.
} }
@ -222,7 +211,7 @@ mod tests {
fn dispatches_peer_connect_request() { fn dispatches_peer_connect_request() {
let request = control::Request::PeerConnectRequest("shruti".to_string()); let request = control::Request::PeerConnectRequest("shruti".to_string());
let _handlers = dispatch(Message::ControlRequest(request));
let _handlers = dispatch(Event::ControlRequest(request));
// TODO: Check that event is dispatched to a new peer event handler. // TODO: Check that event is dispatched to a new peer event handler.
} }
@ -233,7 +222,7 @@ mod tests {
users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()], users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()],
}; };
let _handlers = dispatch(Message::ServerResponse(response.into()));
let _handlers = dispatch(Event::ServerResponse(response.into()));
// TODO: Check that event is dispatched to a new user event handler. // TODO: Check that event is dispatched to a new user event handler.
} }
@ -242,7 +231,7 @@ mod tests {
fn dispatches_room_join_request() { fn dispatches_room_join_request() {
let request = control::Request::RoomJoinRequest("bleep".to_string()); let request = control::Request::RoomJoinRequest("bleep".to_string());
let handlers = dispatch(Message::ControlRequest(request.into()));
let handlers = dispatch(Event::ControlRequest(request.into()));
assert_eq!( assert_eq!(
handlers.room_event_handler.events, handlers.room_event_handler.events,
@ -259,7 +248,7 @@ mod tests {
users: Vec::new(), users: Vec::new(),
}; };
let handlers = dispatch(Message::ServerResponse(response.clone().into()));
let handlers = dispatch(Event::ServerResponse(response.clone().into()));
assert_eq!( assert_eq!(
handlers.room_event_handler.events, handlers.room_event_handler.events,
@ -286,7 +275,7 @@ mod tests {
operated_private_room_names: Vec::new(), operated_private_room_names: Vec::new(),
}; };
let handlers = dispatch(Message::ServerResponse(response.clone().into()));
let handlers = dispatch(Event::ServerResponse(response.clone().into()));
assert_eq!( assert_eq!(
handlers.room_event_handler.events, handlers.room_event_handler.events,
@ -301,7 +290,7 @@ mod tests {
message: "yo!".to_string(), message: "yo!".to_string(),
}; };
let handlers = dispatch(Message::ControlRequest(request.clone().into()));
let handlers = dispatch(Event::ControlRequest(request.clone().into()));
assert_eq!( assert_eq!(
handlers.room_event_handler.events, handlers.room_event_handler.events,
@ -317,7 +306,7 @@ mod tests {
message: "yo!".to_string(), message: "yo!".to_string(),
}; };
let handlers = dispatch(Message::ServerResponse(response.clone().into()));
let handlers = dispatch(Event::ServerResponse(response.clone().into()));
assert_eq!( assert_eq!(
handlers.room_event_handler.events, handlers.room_event_handler.events,


+ 4
- 5
client/src/event.rs View File

@ -5,21 +5,20 @@ use solstice_proto::ServerResponse;
use crate::context::Context; use crate::context::Context;
use crate::control; use crate::control;
// TODO: Rename to `Event`.
/// The type of messages dispatched by a dispatcher.
/// The type of events affecting the client.
#[derive(Debug, Eq, PartialEq)] #[derive(Debug, Eq, PartialEq)]
pub enum Message {
pub enum Event {
ControlRequest(control::Request), ControlRequest(control::Request),
ServerResponse(ServerResponse), ServerResponse(ServerResponse),
} }
impl From<ServerResponse> for Message {
impl From<ServerResponse> for Event {
fn from(response: ServerResponse) -> Self { fn from(response: ServerResponse) -> Self {
Self::ServerResponse(response) Self::ServerResponse(response)
} }
} }
impl From<control::Request> for Message {
impl From<control::Request> for Event {
fn from(request: control::Request) -> Self { fn from(request: control::Request) -> Self {
Self::ControlRequest(request) Self::ControlRequest(request)
} }


+ 8
- 8
client/src/main.rs View File

@ -26,6 +26,7 @@ mod user;
use config::{Config, TomlConfig}; use config::{Config, TomlConfig};
use context::{ContextBundle, ContextOptions}; use context::{ContextBundle, ContextOptions};
use dispatcher::{DefaultDispatcherHandlers, Dispatcher, DispatcherParts}; use dispatcher::{DefaultDispatcherHandlers, Dispatcher, DispatcherParts};
use event::Event;
// Size of various channels defined below. // Size of various channels defined below.
const CHANNEL_BUFFER_SIZE: usize = 100; const CHANNEL_BUFFER_SIZE: usize = 100;
@ -44,7 +45,7 @@ fn old_main(config: Config) {
async fn run_client( async fn run_client(
config: Config, config: Config,
request_rx: mpsc::Receiver<ServerRequest>, request_rx: mpsc::Receiver<ServerRequest>,
dispatcher_tx: mpsc::Sender<event::Message>,
event_tx: mpsc::Sender<Event>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("Connecting to server at {}.", config.server_address); info!("Connecting to server at {}.", config.server_address);
let stream = TcpStream::connect(&config.server_address) let stream = TcpStream::connect(&config.server_address)
@ -62,8 +63,8 @@ async fn run_client(
let (response_tx, mut response_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); let (response_tx, mut response_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let forwarder_task = tokio::spawn(async move { let forwarder_task = tokio::spawn(async move {
while let Some(response) = response_rx.recv().await { while let Some(response) = response_rx.recv().await {
dispatcher_tx
.send(event::Message::ServerResponse(response))
event_tx
.send(Event::ServerResponse(response))
.await .await
.expect("dispatcher channel closed"); .expect("dispatcher channel closed");
} }
@ -89,7 +90,7 @@ async fn async_main(config: Config) -> anyhow::Result<()> {
let bundle = ContextBundle::new(options); let bundle = ContextBundle::new(options);
let (dispatcher_tx, mut dispatcher_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let (event_tx, mut event_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let mut control_listener = let mut control_listener =
control::Listener::bind(&config.control_listen_address) control::Listener::bind(&config.control_listen_address)
@ -99,18 +100,17 @@ async fn async_main(config: Config) -> anyhow::Result<()> {
let client_task = tokio::spawn(run_client( let client_task = tokio::spawn(run_client(
config, config,
bundle.server_request_rx, bundle.server_request_rx,
dispatcher_tx.clone(),
event_tx.clone(),
)); ));
let control_task =
control_listener.run(dispatcher_tx, bundle.control_response_rx);
let control_task = control_listener.run(event_tx, bundle.control_response_rx);
let dispatch_task = tokio::task::spawn_blocking(move || { let dispatch_task = tokio::task::spawn_blocking(move || {
let mut dispatcher = Dispatcher::from_parts(DispatcherParts { let mut dispatcher = Dispatcher::from_parts(DispatcherParts {
context: bundle.context, context: bundle.context,
handlers: DefaultDispatcherHandlers::default(), handlers: DefaultDispatcherHandlers::default(),
}); });
while let Some(message) = dispatcher_rx.blocking_recv() {
while let Some(message) = event_rx.blocking_recv() {
dispatcher.dispatch(message); dispatcher.dispatch(message);
} }
dispatcher.into_parts().context dispatcher.into_parts().context


Loading…
Cancel
Save