Browse Source

Extract logic out of control::listen().

wip
Titouan Rigoudy 4 years ago
parent
commit
231809ef2a
1 changed files with 48 additions and 33 deletions
  1. +48
    -33
      client/src/control/ws.rs

+ 48
- 33
client/src/control/ws.rs View File

@ -1,12 +1,57 @@
use futures::stream::SplitStream;
use futures::StreamExt;
use solstice_proto::config;
use tokio::net::TcpListener;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio_tungstenite::WebSocketStream;
use crate::control::request::*;
use crate::control::response::*;
use crate::dispatcher::Message;
async fn handle_incoming(
mut incoming: SplitStream<WebSocketStream<TcpStream>>,
address: &str,
client_tx: &mpsc::UnboundedSender<Message>,
) -> anyhow::Result<()> {
while let Some(result) = incoming.next().await {
let message = match result {
Ok(message) => message,
Err(err) => {
warn!("Error reading control message: {}", err);
break;
}
};
let text = match message.to_text() {
Ok(text) => text,
Err(err) => {
warn!("Received non-text control message: {}", err);
break;
}
};
debug!("Received a text message from {}: {}", address, text);
// Decode the json control request.
let control_request: Request = match serde_json::from_str(&text) {
Ok(control_request) => control_request,
Err(e) => {
warn!("Received invalid JSON message from controller: {}", e);
break;
}
};
debug!(
"Received control request from {}: {:?}",
address, control_request
);
client_tx.send(Message::ControlRequest(control_request))?
}
Ok(())
}
/// Start listening on the socket address stored in configuration, and send
/// control notifications to the client through the given channel.
pub async fn listen(
@ -24,38 +69,8 @@ pub async fn listen(
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
info!("WebSocket connection established: {}", addr);
let (_outgoing, mut incoming) = ws_stream.split();
while let Some(result) = incoming.next().await {
let message = match result {
Ok(message) => message,
Err(err) => {
warn!("Error reading control message: {}", err);
continue;
}
};
let text = match message.to_text() {
Ok(text) => text,
Err(err) => {
warn!("Received non-text control message: {}", err);
continue;
}
};
debug!("Received a text message from {}: {}", addr, text);
// Decode the json control request.
let control_request: Request = match serde_json::from_str(&text) {
Ok(control_request) => control_request,
Err(e) => {
warn!("Received invalid JSON message from controller: {}", e);
continue;
}
};
debug!("Received control request: {:?}", control_request);
client_tx.send(Message::ControlRequest(control_request))?
}
let (_outgoing, incoming) = ws_stream.split();
handle_incoming(incoming, &address, &client_tx).await?
}
Ok(())


Loading…
Cancel
Save