From 896b99a1fe3bb12edd145af99afd08b08412d2ac Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Tue, 10 Aug 2021 21:37:55 +0200 Subject: [PATCH] Introduce IncomingHandler. --- client/src/control/ws.rs | 106 +++++++++++++++++++++++---------------- 1 file changed, 64 insertions(+), 42 deletions(-) diff --git a/client/src/control/ws.rs b/client/src/control/ws.rs index 1937a73..0f79f74 100644 --- a/client/src/control/ws.rs +++ b/client/src/control/ws.rs @@ -9,47 +9,64 @@ use crate::control::request::*; use crate::control::response::*; use crate::dispatcher::Message; -async fn handle_incoming( - mut incoming: SplitStream>, - address: &str, - client_tx: &mpsc::UnboundedSender, -) -> anyhow::Result<()> { - while let Some(result) = incoming.next().await { - let message = match result { - Ok(message) => message, - Err(err) => { - warn!("Error reading control message: {}", err); - break; - } - }; - - let text = match message.to_text() { - Ok(text) => text, - Err(err) => { - warn!("Received non-text control message: {}", err); - break; - } - }; - - debug!("Received a text message from {}: {}", address, text); - - // Decode the json control request. - let control_request: Request = match serde_json::from_str(&text) { - Ok(control_request) => control_request, - Err(e) => { - warn!("Received invalid JSON message from controller: {}", e); - break; - } - }; - - debug!( - "Received control request from {}: {:?}", - address, control_request - ); - client_tx.send(Message::ControlRequest(control_request))? - } +struct IncomingHandler<'a> { + address: &'a str, + client_tx: &'a mpsc::UnboundedSender, +} - Ok(()) +impl<'a> IncomingHandler<'a> { + async fn run( + &self, + mut incoming: SplitStream>, + ) -> anyhow::Result<()> { + while let Some(result) = incoming.next().await { + let message = match result { + Ok(message) => message, + Err(err) => { + warn!( + "Error reading control message from {}: {}", + self.address, err + ); + break; + } + }; + + let text = match message.to_text() { + Ok(text) => text, + Err(err) => { + warn!( + "Received non-text control message from {}: {}", + self.address, err + ); + break; + } + }; + + debug!("Received a text message from {}: {}", self.address, text); + + let control_request: Request = match serde_json::from_str(&text) { + Ok(control_request) => control_request, + Err(err) => { + warn!( + "Received invalid JSON message from {}: {}", + self.address, err + ); + break; + } + }; + + debug!( + "Received control request from {}: {:?}", + self.address, control_request + ); + + self + .client_tx + .send(Message::ControlRequest(control_request))? + } + + Ok(()) + } } /// Start listening on the socket address stored in configuration, and send @@ -63,14 +80,19 @@ pub async fn listen( info!("Listening for control connections on {}", address); + let incoming_handler = IncomingHandler { + address: &address, + client_tx: &client_tx, + }; + while let Ok((raw_stream, addr)) = listener.accept().await { info!("Accepted control connection from {}", addr); let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; - info!("WebSocket connection established: {}", addr); + info!("WebSocket connection established from {}", addr); let (_outgoing, incoming) = ws_stream.split(); - handle_incoming(incoming, &address, &client_tx).await? + incoming_handler.run(incoming).await? } Ok(())