Browse Source

Refactor IncomingHandler.

wip
Titouan Rigoudy 4 years ago
parent
commit
45e1b5fa24
1 changed files with 43 additions and 59 deletions
  1. +43
    -59
      client/src/control/ws.rs

+ 43
- 59
client/src/control/ws.rs View File

@ -4,71 +4,41 @@ use futures::{SinkExt, StreamExt};
use solstice_proto::config; use solstice_proto::config;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
use tokio_tungstenite::tungstenite::{
Error as WebSocketError, Message as WebSocketMessage,
};
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
use crate::control::request::*; use crate::control::request::*;
use crate::control::response::*; use crate::control::response::*;
use crate::dispatcher::Message; use crate::dispatcher::Message;
struct IncomingHandler<'a> {
address: &'a str,
client_tx: &'a mpsc::UnboundedSender<Message>,
}
async fn forward_incoming(
mut incoming: SplitStream<WebSocketStream<TcpStream>>,
client_tx: &mpsc::UnboundedSender<Message>,
) -> anyhow::Result<()> {
// TODO: close incoming on error? notify forward_outgoing somehow.
while let Some(result) = incoming.next().await {
if let Err(WebSocketError::ConnectionClosed) = result {
break;
}
impl<'a> IncomingHandler<'a> {
async fn run(&self, mut incoming: SplitStream<WebSocketStream<TcpStream>>) {
while let Some(result) = incoming.next().await {
let message = match result {
Ok(message) => message,
Err(err) => {
warn!(
"Error reading control message from {}: {}",
self.address, err
);
break;
}
};
let text = match message.to_text() {
Ok(text) => text,
Err(err) => {
warn!(
"Received non-text control message from {}: {}",
self.address, err
);
break;
}
};
let message = result.context("reading control message")?;
debug!("Received a text message from {}: {}", self.address, text);
let text = message.to_text().context("non-text control message")?;
let control_request: Request = match serde_json::from_str(&text) {
Ok(control_request) => control_request,
Err(err) => {
warn!(
"Received invalid JSON message from {}: {}",
self.address, err
);
break;
}
};
debug!(
"Received control request from {}: {:?}",
self.address, control_request
);
self
.client_tx
.send(Message::ControlRequest(control_request));
}
let control_request: Request =
serde_json::from_str(&text).context("decoding JSON message")?;
client_tx.send(Message::ControlRequest(control_request));
} }
Ok(())
} }
async fn forward_outgoing(
async fn forward_outgoing_inner(
client_rx: &mut mpsc::Receiver<Response>, client_rx: &mut mpsc::Receiver<Response>,
mut outgoing: SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
outgoing: &mut SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
while let Some(response) = client_rx.recv().await { while let Some(response) = client_rx.recv().await {
let text = let text =
@ -81,6 +51,16 @@ async fn forward_outgoing(
Ok(()) Ok(())
} }
async fn forward_outgoing(
client_rx: &mut mpsc::Receiver<Response>,
mut outgoing: SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
) -> anyhow::Result<()> {
let result = forward_outgoing_inner(client_rx, &mut outgoing).await;
// TODO: handle error.
outgoing.close().await;
result
}
/// Start listening on the socket address stored in configuration, and send /// Start listening on the socket address stored in configuration, and send
/// control notifications to the client through the given channel. /// control notifications to the client through the given channel.
pub async fn listen( pub async fn listen(
@ -92,12 +72,6 @@ pub async fn listen(
info!("Listening for control connections on {}", address); info!("Listening for control connections on {}", address);
let incoming_handler = IncomingHandler {
// TODO: This should be the address of the remote endpoint, obtained below.
address: &address,
client_tx: &client_tx,
};
while let Ok((raw_stream, remote_address)) = listener.accept().await { while let Ok((raw_stream, remote_address)) = listener.accept().await {
info!("Accepted control connection from {}", remote_address); info!("Accepted control connection from {}", remote_address);
@ -106,8 +80,18 @@ pub async fn listen(
let (outgoing, incoming) = ws_stream.split(); let (outgoing, incoming) = ws_stream.split();
// Instead of selecting, spawn a task and wait for both to resolve. This
// works because tunstenite communicates the "closed" state to both ends.
tokio::select! { tokio::select! {
() = incoming_handler.run(incoming) => (),
result = forward_incoming(incoming, &client_tx) => match result {
Ok(()) => (),
Err(err) => {
warn!(
"Error in incoming websocket handler for {}: {}",
remote_address, err
)
},
},
result = forward_outgoing(&mut client_rx, outgoing) => match result { result = forward_outgoing(&mut client_rx, outgoing) => match result {
Ok(()) => (), Ok(()) => (),
Err(err) => { Err(err) => {


Loading…
Cancel
Save