Browse Source

Connect outgoing channel to websocket.

wip
Titouan Rigoudy 4 years ago
parent
commit
064bdedf1d
1 changed files with 17 additions and 11 deletions
  1. +17
    -11
      client/src/control/ws.rs

+ 17
- 11
client/src/control/ws.rs View File

@ -1,5 +1,6 @@
use anyhow::Context;
use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use solstice_proto::config;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
@ -65,18 +66,23 @@ impl<'a> IncomingHandler<'a> {
}
}
// TODO: Remove struct, replace with a free function: `forward_outgoing()`.
struct OutgoingHandler<'a> {
address: &'a str,
client_rx: &'a mut mpsc::Receiver<Response>,
}
impl<'a> OutgoingHandler<'a> {
async fn run(
&mut self,
_outgoing: SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
mut outgoing: SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
) -> anyhow::Result<()> {
while let Some(_response) = self.client_rx.recv().await {
info!("Sending control response to {}", self.address);
while let Some(response) = self.client_rx.recv().await {
let text = serde_json::to_string(&response)
.context("Encoding control response")?;
outgoing
.send(WebSocketMessage::Text(text))
.await
.context("Sending control response to {}")?;
}
Ok(())
}
@ -94,20 +100,20 @@ pub async fn listen(
info!("Listening for control connections on {}", address);
let incoming_handler = IncomingHandler {
// TODO: This should be the address of the remote endpoint, obtained below.
address: &address,
client_tx: &client_tx,
};
let mut outgoing_handler = OutgoingHandler {
address: &address,
client_rx: &mut client_rx,
};
while let Ok((raw_stream, addr)) = listener.accept().await {
info!("Accepted control connection from {}", addr);
while let Ok((raw_stream, remote_address)) = listener.accept().await {
info!("Accepted control connection from {}", remote_address);
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
info!("WebSocket connection established from {}", addr);
info!("WebSocket connection established from {}", remote_address);
let (outgoing, incoming) = ws_stream.split();
@ -115,11 +121,11 @@ pub async fn listen(
() = incoming_handler.run(incoming) => (),
result = outgoing_handler.run(outgoing) => match result {
Ok(()) => (),
Err(err) => warn!("Error in outgoing websocket handler: {}", err),
Err(err) => warn!("Error in outgoing websocket handler for {}: {}", remote_address, err),
},
};
info!("WebSocket connection from {} closed", addr);
info!("WebSocket connection from {} closed", remote_address);
}
Ok(())


Loading…
Cancel
Save