From 231809ef2ad8b086c903d5ef64f01537a536f404 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Sun, 8 Aug 2021 15:56:02 -0400 Subject: [PATCH] Extract logic out of control::listen(). --- client/src/control/ws.rs | 81 ++++++++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/client/src/control/ws.rs b/client/src/control/ws.rs index 3a55b5a..1937a73 100644 --- a/client/src/control/ws.rs +++ b/client/src/control/ws.rs @@ -1,12 +1,57 @@ +use futures::stream::SplitStream; use futures::StreamExt; use solstice_proto::config; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; +use tokio_tungstenite::WebSocketStream; use crate::control::request::*; use crate::control::response::*; use crate::dispatcher::Message; +async fn handle_incoming( + mut incoming: SplitStream>, + address: &str, + client_tx: &mpsc::UnboundedSender, +) -> anyhow::Result<()> { + while let Some(result) = incoming.next().await { + let message = match result { + Ok(message) => message, + Err(err) => { + warn!("Error reading control message: {}", err); + break; + } + }; + + let text = match message.to_text() { + Ok(text) => text, + Err(err) => { + warn!("Received non-text control message: {}", err); + break; + } + }; + + debug!("Received a text message from {}: {}", address, text); + + // Decode the json control request. + let control_request: Request = match serde_json::from_str(&text) { + Ok(control_request) => control_request, + Err(e) => { + warn!("Received invalid JSON message from controller: {}", e); + break; + } + }; + + debug!( + "Received control request from {}: {:?}", + address, control_request + ); + client_tx.send(Message::ControlRequest(control_request))? + } + + Ok(()) +} + /// Start listening on the socket address stored in configuration, and send /// control notifications to the client through the given channel. pub async fn listen( @@ -24,38 +69,8 @@ pub async fn listen( let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; info!("WebSocket connection established: {}", addr); - let (_outgoing, mut incoming) = ws_stream.split(); - while let Some(result) = incoming.next().await { - let message = match result { - Ok(message) => message, - Err(err) => { - warn!("Error reading control message: {}", err); - continue; - } - }; - - let text = match message.to_text() { - Ok(text) => text, - Err(err) => { - warn!("Received non-text control message: {}", err); - continue; - } - }; - - debug!("Received a text message from {}: {}", addr, text); - - // Decode the json control request. - let control_request: Request = match serde_json::from_str(&text) { - Ok(control_request) => control_request, - Err(e) => { - warn!("Received invalid JSON message from controller: {}", e); - continue; - } - }; - - debug!("Received control request: {:?}", control_request); - client_tx.send(Message::ControlRequest(control_request))? - } + let (_outgoing, incoming) = ws_stream.split(); + handle_incoming(incoming, &address, &client_tx).await? } Ok(())