Solstice client.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

126 lines
3.3 KiB

use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt;
use solstice_proto::config;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
use tokio_tungstenite::WebSocketStream;
use crate::control::request::*;
use crate::control::response::*;
use crate::dispatcher::Message;
struct IncomingHandler<'a> {
address: &'a str,
client_tx: &'a mpsc::UnboundedSender<Message>,
}
impl<'a> IncomingHandler<'a> {
async fn run(&self, mut incoming: SplitStream<WebSocketStream<TcpStream>>) {
while let Some(result) = incoming.next().await {
let message = match result {
Ok(message) => message,
Err(err) => {
warn!(
"Error reading control message from {}: {}",
self.address, err
);
break;
}
};
let text = match message.to_text() {
Ok(text) => text,
Err(err) => {
warn!(
"Received non-text control message from {}: {}",
self.address, err
);
break;
}
};
debug!("Received a text message from {}: {}", self.address, text);
let control_request: Request = match serde_json::from_str(&text) {
Ok(control_request) => control_request,
Err(err) => {
warn!(
"Received invalid JSON message from {}: {}",
self.address, err
);
break;
}
};
debug!(
"Received control request from {}: {:?}",
self.address, control_request
);
self
.client_tx
.send(Message::ControlRequest(control_request));
}
}
}
struct OutgoingHandler<'a> {
address: &'a str,
client_rx: &'a mut mpsc::Receiver<Response>,
}
impl<'a> OutgoingHandler<'a> {
async fn run(
&mut self,
_outgoing: SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
) -> anyhow::Result<()> {
while let Some(_response) = self.client_rx.recv().await {
info!("Sending control response to {}", self.address);
}
Ok(())
}
}
/// Start listening on the socket address stored in configuration, and send
/// control notifications to the client through the given channel.
pub async fn listen(
client_tx: mpsc::UnboundedSender<Message>,
mut client_rx: mpsc::Receiver<Response>,
) -> anyhow::Result<()> {
let address = format!("{}:{}", config::CONTROL_HOST, config::CONTROL_PORT);
let listener = TcpListener::bind(&address).await?;
info!("Listening for control connections on {}", address);
let incoming_handler = IncomingHandler {
address: &address,
client_tx: &client_tx,
};
let mut outgoing_handler = OutgoingHandler {
address: &address,
client_rx: &mut client_rx,
};
while let Ok((raw_stream, addr)) = listener.accept().await {
info!("Accepted control connection from {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
info!("WebSocket connection established from {}", addr);
let (outgoing, incoming) = ws_stream.split();
tokio::select! {
() = incoming_handler.run(incoming) => (),
result = outgoing_handler.run(outgoing) => match result {
Ok(()) => (),
Err(err) => warn!("Error in outgoing websocket handler: {}", err),
},
};
info!("WebSocket connection from {} closed", addr);
}
Ok(())
}