Browse Source

Introduce IncomingHandler.

wip
Titouan Rigoudy 4 years ago
parent
commit
896b99a1fe
1 changed files with 64 additions and 42 deletions
  1. +64
    -42
      client/src/control/ws.rs

+ 64
- 42
client/src/control/ws.rs View File

@ -9,47 +9,64 @@ use crate::control::request::*;
use crate::control::response::*;
use crate::dispatcher::Message;
async fn handle_incoming(
mut incoming: SplitStream<WebSocketStream<TcpStream>>,
address: &str,
client_tx: &mpsc::UnboundedSender<Message>,
) -> anyhow::Result<()> {
while let Some(result) = incoming.next().await {
let message = match result {
Ok(message) => message,
Err(err) => {
warn!("Error reading control message: {}", err);
break;
}
};
let text = match message.to_text() {
Ok(text) => text,
Err(err) => {
warn!("Received non-text control message: {}", err);
break;
}
};
debug!("Received a text message from {}: {}", address, text);
// Decode the json control request.
let control_request: Request = match serde_json::from_str(&text) {
Ok(control_request) => control_request,
Err(e) => {
warn!("Received invalid JSON message from controller: {}", e);
break;
}
};
debug!(
"Received control request from {}: {:?}",
address, control_request
);
client_tx.send(Message::ControlRequest(control_request))?
}
struct IncomingHandler<'a> {
address: &'a str,
client_tx: &'a mpsc::UnboundedSender<Message>,
}
Ok(())
impl<'a> IncomingHandler<'a> {
async fn run(
&self,
mut incoming: SplitStream<WebSocketStream<TcpStream>>,
) -> anyhow::Result<()> {
while let Some(result) = incoming.next().await {
let message = match result {
Ok(message) => message,
Err(err) => {
warn!(
"Error reading control message from {}: {}",
self.address, err
);
break;
}
};
let text = match message.to_text() {
Ok(text) => text,
Err(err) => {
warn!(
"Received non-text control message from {}: {}",
self.address, err
);
break;
}
};
debug!("Received a text message from {}: {}", self.address, text);
let control_request: Request = match serde_json::from_str(&text) {
Ok(control_request) => control_request,
Err(err) => {
warn!(
"Received invalid JSON message from {}: {}",
self.address, err
);
break;
}
};
debug!(
"Received control request from {}: {:?}",
self.address, control_request
);
self
.client_tx
.send(Message::ControlRequest(control_request))?
}
Ok(())
}
}
/// Start listening on the socket address stored in configuration, and send
@ -63,14 +80,19 @@ pub async fn listen(
info!("Listening for control connections on {}", address);
let incoming_handler = IncomingHandler {
address: &address,
client_tx: &client_tx,
};
while let Ok((raw_stream, addr)) = listener.accept().await {
info!("Accepted control connection from {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
info!("WebSocket connection established: {}", addr);
info!("WebSocket connection established from {}", addr);
let (_outgoing, incoming) = ws_stream.split();
handle_incoming(incoming, &address, &client_tx).await?
incoming_handler.run(incoming).await?
}
Ok(())


Loading…
Cancel
Save