diff --git a/client/src/control/ws.rs b/client/src/control/ws.rs index 23e0f63..3a55b5a 100644 --- a/client/src/control/ws.rs +++ b/client/src/control/ws.rs @@ -19,43 +19,43 @@ pub async fn listen( info!("Listening for control connections on {}", address); while let Ok((raw_stream, addr)) = listener.accept().await { - info!("Accepted control connection from {}", addr); - - let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; - info!("WebSocket connection established: {}", addr); - - let (_outgoing, mut incoming) = ws_stream.split(); - while let Some(result) = incoming.next().await { - let message = match result { - Ok(message) => message, - Err(err) => { - warn!("Error reading control message: {}", err); - continue - }, - }; - - let text = match message.to_text() { - Ok(text) => text, - Err(err) => { - warn!("Received non-text control message: {}", err); - continue - }, - }; - - debug!("Received a text message from {}: {}", addr, text); - - // Decode the json control request. - let control_request: Request = match serde_json::from_str(&text) { - Ok(control_request) => control_request, - Err(e) => { - warn!("Received invalid JSON message from controller: {}", e); - continue - } - }; - - debug!("Received control request: {:?}", control_request); - client_tx.send(Message::ControlRequest(control_request))? - } + info!("Accepted control connection from {}", addr); + + let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; + info!("WebSocket connection established: {}", addr); + + let (_outgoing, mut incoming) = ws_stream.split(); + while let Some(result) = incoming.next().await { + let message = match result { + Ok(message) => message, + Err(err) => { + warn!("Error reading control message: {}", err); + continue; + } + }; + + let text = match message.to_text() { + Ok(text) => text, + Err(err) => { + warn!("Received non-text control message: {}", err); + continue; + } + }; + + debug!("Received a text message from {}: {}", addr, text); + + // Decode the json control request. + let control_request: Request = match serde_json::from_str(&text) { + Ok(control_request) => control_request, + Err(e) => { + warn!("Received invalid JSON message from controller: {}", e); + continue; + } + }; + + debug!("Received control request: {:?}", control_request); + client_tx.send(Message::ControlRequest(control_request))? + } } Ok(()) diff --git a/client/src/main.rs b/client/src/main.rs index 520d244..8ed066b 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -125,13 +125,13 @@ async fn async_main() { let control_task = control::listen(dispatcher_tx, control_rx); let dispatch_task = async move { - while let Some(message) = dispatcher_rx.recv().await { - if let Some(job) = dispatcher.dispatch(message) { - executor.schedule(job); - } + while let Some(message) = dispatcher_rx.recv().await { + if let Some(job) = dispatcher.dispatch(message) { + executor.schedule(job); } + } - tokio::task::spawn_blocking(move || executor.join()).await + tokio::task::spawn_blocking(move || executor.join()).await }; tokio::select! {