|
|
|
@ -1,8 +1,9 @@ |
|
|
|
use futures::stream::SplitStream;
|
|
|
|
use futures::StreamExt;
|
|
|
|
use futures::stream::{SplitSink, SplitStream};
|
|
|
|
use futures::{future, StreamExt};
|
|
|
|
use solstice_proto::config;
|
|
|
|
use tokio::net::{TcpListener, TcpStream};
|
|
|
|
use tokio::sync::mpsc;
|
|
|
|
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
|
|
|
|
use tokio_tungstenite::WebSocketStream;
|
|
|
|
|
|
|
|
use crate::control::request::*;
|
|
|
|
@ -15,10 +16,7 @@ struct IncomingHandler<'a> { |
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> IncomingHandler<'a> {
|
|
|
|
async fn run(
|
|
|
|
&self,
|
|
|
|
mut incoming: SplitStream<WebSocketStream<TcpStream>>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
async fn run(&self, mut incoming: SplitStream<WebSocketStream<TcpStream>>) {
|
|
|
|
while let Some(result) = incoming.next().await {
|
|
|
|
let message = match result {
|
|
|
|
Ok(message) => message,
|
|
|
|
@ -62,9 +60,22 @@ impl<'a> IncomingHandler<'a> { |
|
|
|
|
|
|
|
self
|
|
|
|
.client_tx
|
|
|
|
.send(Message::ControlRequest(control_request))?
|
|
|
|
.send(Message::ControlRequest(control_request));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
struct OutgoingHandler<'a> {
|
|
|
|
address: &'a str,
|
|
|
|
client_rx: &'a mut mpsc::Receiver<Response>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> OutgoingHandler<'a> {
|
|
|
|
async fn run(
|
|
|
|
&mut self,
|
|
|
|
outgoing: SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
let () = future::pending().await;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -73,7 +84,7 @@ impl<'a> IncomingHandler<'a> { |
|
|
|
/// control notifications to the client through the given channel.
|
|
|
|
pub async fn listen(
|
|
|
|
client_tx: mpsc::UnboundedSender<Message>,
|
|
|
|
_socket_rx: mpsc::Receiver<Response>,
|
|
|
|
mut client_rx: mpsc::Receiver<Response>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
let address = format!("{}:{}", config::CONTROL_HOST, config::CONTROL_PORT);
|
|
|
|
let listener = TcpListener::bind(&address).await?;
|
|
|
|
@ -85,14 +96,28 @@ pub async fn listen( |
|
|
|
client_tx: &client_tx,
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut outgoing_handler = OutgoingHandler {
|
|
|
|
address: &address,
|
|
|
|
client_rx: &mut client_rx,
|
|
|
|
};
|
|
|
|
|
|
|
|
while let Ok((raw_stream, addr)) = listener.accept().await {
|
|
|
|
info!("Accepted control connection from {}", addr);
|
|
|
|
|
|
|
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
|
|
|
info!("WebSocket connection established from {}", addr);
|
|
|
|
|
|
|
|
let (_outgoing, incoming) = ws_stream.split();
|
|
|
|
incoming_handler.run(incoming).await?
|
|
|
|
let (outgoing, incoming) = ws_stream.split();
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
() = incoming_handler.run(incoming) => (),
|
|
|
|
result = outgoing_handler.run(outgoing) => match result {
|
|
|
|
Ok(()) => (),
|
|
|
|
Err(err) => warn!("Error in outgoing websocket handler: {}", err),
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
info!("WebSocket connection from {} closed", addr);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|