|
|
|
@ -17,7 +17,7 @@ use crate::dispatcher::Message; |
|
|
|
|
|
|
|
async fn forward_incoming(
|
|
|
|
mut incoming: SplitStream<WebSocketStream<TcpStream>>,
|
|
|
|
client_tx: &mpsc::UnboundedSender<Message>,
|
|
|
|
message_tx: &mpsc::UnboundedSender<Message>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
while let Some(result) = incoming.next().await {
|
|
|
|
if let Err(WebSocketError::ConnectionClosed) = result {
|
|
|
|
@ -31,17 +31,17 @@ async fn forward_incoming( |
|
|
|
let control_request: Request =
|
|
|
|
serde_json::from_str(&text).context("decoding JSON message")?;
|
|
|
|
|
|
|
|
client_tx.send(Message::ControlRequest(control_request));
|
|
|
|
message_tx.send(Message::ControlRequest(control_request));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn forward_outgoing(
|
|
|
|
client_rx: &mut mpsc::Receiver<Response>,
|
|
|
|
response_rx: &mut mpsc::Receiver<Response>,
|
|
|
|
outgoing: &mut SplitSink<WebSocketStream<TcpStream>, WebSocketMessage>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
while let Some(response) = client_rx.recv().await {
|
|
|
|
while let Some(response) = response_rx.recv().await {
|
|
|
|
let text =
|
|
|
|
serde_json::to_string(&response).context("encoding control response")?;
|
|
|
|
|
|
|
|
@ -57,13 +57,13 @@ async fn forward_outgoing( |
|
|
|
async fn handle(
|
|
|
|
stream: WebSocketStream<TcpStream>,
|
|
|
|
remote_address: &SocketAddr,
|
|
|
|
client_tx: &mpsc::UnboundedSender<Message>,
|
|
|
|
client_rx: &mut mpsc::Receiver<Response>,
|
|
|
|
message_tx: &mpsc::UnboundedSender<Message>,
|
|
|
|
response_rx: &mut mpsc::Receiver<Response>,
|
|
|
|
) {
|
|
|
|
let (mut outgoing, incoming) = stream.split();
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
result = forward_incoming(incoming, client_tx) => match result {
|
|
|
|
result = forward_incoming(incoming, message_tx) => match result {
|
|
|
|
Ok(()) => info!(
|
|
|
|
"Incoming websocket handler task for {} stopped",
|
|
|
|
remote_address,
|
|
|
|
@ -73,7 +73,7 @@ async fn handle( |
|
|
|
remote_address, err,
|
|
|
|
),
|
|
|
|
},
|
|
|
|
result = forward_outgoing(client_rx, &mut outgoing) => match result {
|
|
|
|
result = forward_outgoing(response_rx, &mut outgoing) => match result {
|
|
|
|
Ok(()) => info!(
|
|
|
|
"Outgoing websocket handler for {} stopped",
|
|
|
|
remote_address,
|
|
|
|
@ -96,8 +96,8 @@ async fn handle( |
|
|
|
/// Start listening on the socket address stored in configuration, and send
|
|
|
|
/// control notifications to the client through the given channel.
|
|
|
|
pub async fn listen(
|
|
|
|
client_tx: mpsc::UnboundedSender<Message>,
|
|
|
|
mut client_rx: mpsc::Receiver<Response>,
|
|
|
|
message_tx: mpsc::UnboundedSender<Message>,
|
|
|
|
mut response_rx: mpsc::Receiver<Response>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
let address = format!("{}:{}", config::CONTROL_HOST, config::CONTROL_PORT);
|
|
|
|
let listener = TcpListener::bind(&address).await?;
|
|
|
|
@ -110,7 +110,7 @@ pub async fn listen( |
|
|
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
|
|
|
info!("WebSocket connection established from {}", remote_address);
|
|
|
|
|
|
|
|
handle(ws_stream, &remote_address, &client_tx, &mut client_rx).await
|
|
|
|
handle(ws_stream, &remote_address, &message_tx, &mut response_rx).await
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|