diff --git a/client/src/context.rs b/client/src/context.rs index 94df335..4520593 100644 --- a/client/src/context.rs +++ b/client/src/context.rs @@ -5,6 +5,7 @@ use parking_lot::Mutex; use solstice_proto::ServerRequest; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use crate::control::Response as ControlResponse; use crate::room::RoomMap; use crate::user::UserMap; @@ -23,7 +24,9 @@ pub struct Context { /// Sender half of a channel used to send requests to the server. pub server_request_tx: Sender, - // TODO: Add control response sender. + + /// Sender half of a channel used to send responses to the controller. + pub control_response_tx: Sender, } /// Convenience bundle for creating new `Context` structs. @@ -34,6 +37,9 @@ pub struct ContextBundle { /// The receiver corresponding to `context.server_request_tx`. pub server_request_rx: Receiver, + + /// The receiver corresponsing to `context.control_response_tx`. + pub control_response_rx: Receiver, } /// Specifies options for new `ContextBundle` structs. @@ -44,13 +50,17 @@ pub struct ContextOptions { /// The buffer size of the server request channel. pub server_request_buffer: usize, + + /// The buffer size of the control response channel. + pub control_response_buffer: usize, } impl Default for ContextOptions { fn default() -> Self { Self { - server_request_buffer: 100, initial_state: State::default(), + server_request_buffer: 100, + control_response_buffer: 100, } } } @@ -60,12 +70,16 @@ impl ContextBundle { fn new(options: ContextOptions) -> Self { let (server_request_tx, server_request_rx) = channel(options.server_request_buffer); + let (control_response_tx, control_response_rx) = + channel(options.control_response_buffer); Self { context: Context { state: Mutex::new(options.initial_state), server_request_tx, + control_response_tx, }, server_request_rx, + control_response_rx, } } } diff --git a/client/src/control/ws.rs b/client/src/control/ws.rs index 6846554..ec1642f 100644 --- a/client/src/control/ws.rs +++ b/client/src/control/ws.rs @@ -1,5 +1,5 @@ use futures::stream::{SplitSink, SplitStream}; -use futures::{future, StreamExt}; +use futures::StreamExt; use solstice_proto::config; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; @@ -73,9 +73,11 @@ struct OutgoingHandler<'a> { impl<'a> OutgoingHandler<'a> { async fn run( &mut self, - outgoing: SplitSink, WebSocketMessage>, + _outgoing: SplitSink, WebSocketMessage>, ) -> anyhow::Result<()> { - let () = future::pending().await; + while let Some(_response) = self.client_rx.recv().await { + info!("Sending control response to {}", self.address); + } Ok(()) } } diff --git a/client/src/handlers/set_room_list_handler.rs b/client/src/handlers/set_room_list_handler.rs index abea6eb..c8683de 100644 --- a/client/src/handlers/set_room_list_handler.rs +++ b/client/src/handlers/set_room_list_handler.rs @@ -3,6 +3,7 @@ use std::io; use solstice_proto::server::RoomListResponse; use crate::context::Context; +use crate::control; use crate::message_handler::MessageHandler; #[derive(Debug, Default)] @@ -15,7 +16,20 @@ impl MessageHandler for SetRoomListHandler { message: &RoomListResponse, ) -> io::Result<()> { let response = (*message).clone(); - context.state.lock().rooms.set_room_list(response); + { + let mut guard = context.state.lock(); + guard.rooms.set_room_list(response); + + // Send under lock to avoid out-of-order sends. + let rooms = guard.rooms.get_room_list(); + let control_response = + control::Response::RoomListResponse(control::RoomListResponse { + rooms, + }); + + // TODO: decide what to do with errors here. + context.control_response_tx.blocking_send(control_response); + } Ok(()) } diff --git a/client/src/main.rs b/client/src/main.rs index 8ed066b..2be787e 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -114,15 +114,13 @@ async fn async_main() { let (dispatcher_tx, mut dispatcher_rx) = tokio::sync::mpsc::unbounded_channel(); - let (_control_tx, control_rx) = tokio::sync::mpsc::channel(100); - let client_task = tokio::spawn(run_client(bundle.server_request_rx, dispatcher_tx.clone())); let dispatcher = Dispatcher::new(); let executor = Executor::new(bundle.context); - let control_task = control::listen(dispatcher_tx, control_rx); + let control_task = control::listen(dispatcher_tx, bundle.control_response_rx); let dispatch_task = async move { while let Some(message) = dispatcher_rx.recv().await {