From 52838efeb13f5a102ed830ee081db6bea43cf80a Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Tue, 10 Aug 2021 21:54:20 +0200 Subject: [PATCH] Introduce useless OutgoingHandler. --- client/src/control/ws.rs | 45 +++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/client/src/control/ws.rs b/client/src/control/ws.rs index 0f79f74..6846554 100644 --- a/client/src/control/ws.rs +++ b/client/src/control/ws.rs @@ -1,8 +1,9 @@ -use futures::stream::SplitStream; -use futures::StreamExt; +use futures::stream::{SplitSink, SplitStream}; +use futures::{future, StreamExt}; use solstice_proto::config; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; +use tokio_tungstenite::tungstenite::Message as WebSocketMessage; use tokio_tungstenite::WebSocketStream; use crate::control::request::*; @@ -15,10 +16,7 @@ struct IncomingHandler<'a> { } impl<'a> IncomingHandler<'a> { - async fn run( - &self, - mut incoming: SplitStream>, - ) -> anyhow::Result<()> { + async fn run(&self, mut incoming: SplitStream>) { while let Some(result) = incoming.next().await { let message = match result { Ok(message) => message, @@ -62,9 +60,22 @@ impl<'a> IncomingHandler<'a> { self .client_tx - .send(Message::ControlRequest(control_request))? + .send(Message::ControlRequest(control_request)); } + } +} + +struct OutgoingHandler<'a> { + address: &'a str, + client_rx: &'a mut mpsc::Receiver, +} +impl<'a> OutgoingHandler<'a> { + async fn run( + &mut self, + outgoing: SplitSink, WebSocketMessage>, + ) -> anyhow::Result<()> { + let () = future::pending().await; Ok(()) } } @@ -73,7 +84,7 @@ impl<'a> IncomingHandler<'a> { /// control notifications to the client through the given channel. pub async fn listen( client_tx: mpsc::UnboundedSender, - _socket_rx: mpsc::Receiver, + mut client_rx: mpsc::Receiver, ) -> anyhow::Result<()> { let address = format!("{}:{}", config::CONTROL_HOST, config::CONTROL_PORT); let listener = TcpListener::bind(&address).await?; @@ -85,14 +96,28 @@ pub async fn listen( client_tx: &client_tx, }; + let mut outgoing_handler = OutgoingHandler { + address: &address, + client_rx: &mut client_rx, + }; + while let Ok((raw_stream, addr)) = listener.accept().await { info!("Accepted control connection from {}", addr); let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; info!("WebSocket connection established from {}", addr); - let (_outgoing, incoming) = ws_stream.split(); - incoming_handler.run(incoming).await? + let (outgoing, incoming) = ws_stream.split(); + + tokio::select! { + () = incoming_handler.run(incoming) => (), + result = outgoing_handler.run(outgoing) => match result { + Ok(()) => (), + Err(err) => warn!("Error in outgoing websocket handler: {}", err), + }, + }; + + info!("WebSocket connection from {} closed", addr); } Ok(())