|
|
@ -27,8 +27,8 @@ async fn forward_incoming( |
|
|
|
|
|
|
|
|
let text = message.to_text().context("non-text control message")?;
|
|
|
let text = message.to_text().context("non-text control message")?;
|
|
|
|
|
|
|
|
|
let control_request: Request =
|
|
|
|
|
|
serde_json::from_str(&text).context("decoding JSON message")?;
|
|
|
|
|
|
|
|
|
let control_request: Request = serde_json::from_str(&text)
|
|
|
|
|
|
.with_context(|| format!("decoding JSON message {:?}", text))?;
|
|
|
|
|
|
|
|
|
message_tx.send(Message::ControlRequest(control_request));
|
|
|
message_tx.send(Message::ControlRequest(control_request));
|
|
|
}
|
|
|
}
|
|
|
@ -159,10 +159,12 @@ mod tests { |
|
|
use std::net::SocketAddr;
|
|
|
use std::net::SocketAddr;
|
|
|
|
|
|
|
|
|
use anyhow::Context;
|
|
|
use anyhow::Context;
|
|
|
|
|
|
use futures::SinkExt;
|
|
|
use tokio::sync::mpsc;
|
|
|
use tokio::sync::mpsc;
|
|
|
use tokio_tungstenite::connect_async;
|
|
|
use tokio_tungstenite::connect_async;
|
|
|
|
|
|
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
|
|
|
|
|
|
|
|
|
use crate::control::Response;
|
|
|
|
|
|
|
|
|
use crate::control::{Request, Response};
|
|
|
use crate::dispatcher::Message;
|
|
|
use crate::dispatcher::Message;
|
|
|
|
|
|
|
|
|
struct Channels {
|
|
|
struct Channels {
|
|
|
@ -213,7 +215,6 @@ mod tests { |
|
|
.context("binding listener")?;
|
|
|
.context("binding listener")?;
|
|
|
|
|
|
|
|
|
let address = format!("ws://{}", listener.address());
|
|
|
let address = format!("ws://{}", listener.address());
|
|
|
|
|
|
|
|
|
let channels = Channels::default();
|
|
|
let channels = Channels::default();
|
|
|
|
|
|
|
|
|
// Move individual fields out of `channels`, for capture by `tokio::spawn`.
|
|
|
// Move individual fields out of `channels`, for capture by `tokio::spawn`.
|
|
|
@ -234,4 +235,43 @@ mod tests { |
|
|
|
|
|
|
|
|
Ok(())
|
|
|
Ok(())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
|
async fn forwards_incoming_requests() -> anyhow::Result<()> {
|
|
|
|
|
|
init();
|
|
|
|
|
|
|
|
|
|
|
|
let mut listener = Listener::bind("localhost:0")
|
|
|
|
|
|
.await
|
|
|
|
|
|
.context("binding listener")?;
|
|
|
|
|
|
|
|
|
|
|
|
let address = format!("ws://{}", listener.address());
|
|
|
|
|
|
let mut channels = Channels::default();
|
|
|
|
|
|
|
|
|
|
|
|
// Move individual fields out of `channels`, for capture by `tokio::spawn`.
|
|
|
|
|
|
let message_tx = channels.message_tx;
|
|
|
|
|
|
let response_rx = channels.response_rx;
|
|
|
|
|
|
let listener_task =
|
|
|
|
|
|
tokio::spawn(async move { listener.run(message_tx, response_rx).await });
|
|
|
|
|
|
|
|
|
|
|
|
let (mut ws_stream, _response) = connect_async(address).await?;
|
|
|
|
|
|
|
|
|
|
|
|
let request = serde_json::to_string(&Request::RoomListRequest)
|
|
|
|
|
|
.context("serializing request")?;
|
|
|
|
|
|
ws_stream.send(WebSocketMessage::Text(request)).await?;
|
|
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
|
channels.message_rx.recv().await,
|
|
|
|
|
|
Some(Message::ControlRequest(Request::RoomListRequest))
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
// Dropping this sender signals to the listener that it should stop.
|
|
|
|
|
|
drop(channels.response_tx);
|
|
|
|
|
|
|
|
|
|
|
|
let () = listener_task
|
|
|
|
|
|
.await
|
|
|
|
|
|
.context("joining listener task")?
|
|
|
|
|
|
.context("running listener")?;
|
|
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
|
}
|
|
|
}
|
|
|
}
|