diff --git a/client/src/control/ws.rs b/client/src/control/ws.rs index ec1642f..90c8950 100644 --- a/client/src/control/ws.rs +++ b/client/src/control/ws.rs @@ -1,5 +1,6 @@ +use anyhow::Context; use futures::stream::{SplitSink, SplitStream}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use solstice_proto::config; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; @@ -65,18 +66,23 @@ impl<'a> IncomingHandler<'a> { } } +// TODO: Remove struct, replace with a free function: `forward_outgoing()`. struct OutgoingHandler<'a> { - address: &'a str, client_rx: &'a mut mpsc::Receiver, } impl<'a> OutgoingHandler<'a> { async fn run( &mut self, - _outgoing: SplitSink, WebSocketMessage>, + mut outgoing: SplitSink, WebSocketMessage>, ) -> anyhow::Result<()> { - while let Some(_response) = self.client_rx.recv().await { - info!("Sending control response to {}", self.address); + while let Some(response) = self.client_rx.recv().await { + let text = serde_json::to_string(&response) + .context("Encoding control response")?; + outgoing + .send(WebSocketMessage::Text(text)) + .await + .context("Sending control response to {}")?; } Ok(()) } @@ -94,20 +100,20 @@ pub async fn listen( info!("Listening for control connections on {}", address); let incoming_handler = IncomingHandler { + // TODO: This should be the address of the remote endpoint, obtained below. address: &address, client_tx: &client_tx, }; let mut outgoing_handler = OutgoingHandler { - address: &address, client_rx: &mut client_rx, }; - while let Ok((raw_stream, addr)) = listener.accept().await { - info!("Accepted control connection from {}", addr); + while let Ok((raw_stream, remote_address)) = listener.accept().await { + info!("Accepted control connection from {}", remote_address); let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; - info!("WebSocket connection established from {}", addr); + info!("WebSocket connection established from {}", remote_address); let (outgoing, incoming) = ws_stream.split(); @@ -115,11 +121,11 @@ pub async fn listen( () = incoming_handler.run(incoming) => (), result = outgoing_handler.run(outgoing) => match result { Ok(()) => (), - Err(err) => warn!("Error in outgoing websocket handler: {}", err), + Err(err) => warn!("Error in outgoing websocket handler for {}: {}", remote_address, err), }, }; - info!("WebSocket connection from {} closed", addr); + info!("WebSocket connection from {} closed", remote_address); } Ok(())