|
|
|
@ -66,26 +66,19 @@ impl<'a> IncomingHandler<'a> { |
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: Remove struct, replace with a free function: `forward_outgoing()`.
|
|
|
|
struct OutgoingHandler<'a> {
|
|
|
|
client_rx: &'a mut mpsc::Receiver<Response>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> OutgoingHandler<'a> {
|
|
|
|
async fn run(
|
|
|
|
&mut self,
|
|
|
|
mut outgoing: SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
while let Some(response) = self.client_rx.recv().await {
|
|
|
|
let text = serde_json::to_string(&response)
|
|
|
|
.context("Encoding control response")?;
|
|
|
|
outgoing
|
|
|
|
.send(WebSocketMessage::Text(text))
|
|
|
|
.await
|
|
|
|
.context("Sending control response to {}")?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
async fn forward_outgoing(
|
|
|
|
client_rx: &mut mpsc::Receiver<Response>,
|
|
|
|
mut outgoing: SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
while let Some(response) = client_rx.recv().await {
|
|
|
|
let text =
|
|
|
|
serde_json::to_string(&response).context("Encoding control response")?;
|
|
|
|
outgoing
|
|
|
|
.send(WebSocketMessage::Text(text))
|
|
|
|
.await
|
|
|
|
.context("Sending control response to {}")?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Start listening on the socket address stored in configuration, and send
|
|
|
|
@ -105,10 +98,6 @@ pub async fn listen( |
|
|
|
client_tx: &client_tx,
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut outgoing_handler = OutgoingHandler {
|
|
|
|
client_rx: &mut client_rx,
|
|
|
|
};
|
|
|
|
|
|
|
|
while let Ok((raw_stream, remote_address)) = listener.accept().await {
|
|
|
|
info!("Accepted control connection from {}", remote_address);
|
|
|
|
|
|
|
|
@ -119,9 +108,14 @@ pub async fn listen( |
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
() = incoming_handler.run(incoming) => (),
|
|
|
|
result = outgoing_handler.run(outgoing) => match result {
|
|
|
|
result = forward_outgoing(&mut client_rx, outgoing) => match result {
|
|
|
|
Ok(()) => (),
|
|
|
|
Err(err) => warn!("Error in outgoing websocket handler for {}: {}", remote_address, err),
|
|
|
|
Err(err) => {
|
|
|
|
warn!(
|
|
|
|
"Error in outgoing websocket handler for {}: {}",
|
|
|
|
remote_address, err
|
|
|
|
)
|
|
|
|
},
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
|