|
|
|
@ -159,12 +159,12 @@ mod tests { |
|
|
|
use std::net::SocketAddr;
|
|
|
|
|
|
|
|
use anyhow::Context;
|
|
|
|
use futures::SinkExt;
|
|
|
|
use futures::{SinkExt, StreamExt};
|
|
|
|
use tokio::sync::mpsc;
|
|
|
|
use tokio_tungstenite::connect_async;
|
|
|
|
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
|
|
|
|
|
|
|
|
use crate::control::{Request, Response};
|
|
|
|
use crate::control::{Request, Response, RoomJoinResponse};
|
|
|
|
use crate::dispatcher::Message;
|
|
|
|
|
|
|
|
struct Channels {
|
|
|
|
@ -274,4 +274,60 @@ mod tests { |
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn forwards_outgoing_responses() -> anyhow::Result<()> {
|
|
|
|
init();
|
|
|
|
|
|
|
|
let mut listener = Listener::bind("localhost:0")
|
|
|
|
.await
|
|
|
|
.context("binding listener")?;
|
|
|
|
|
|
|
|
let address = format!("ws://{}", listener.address());
|
|
|
|
let mut channels = Channels::default();
|
|
|
|
|
|
|
|
// Move individual fields out of `channels`, for capture by `tokio::spawn`.
|
|
|
|
let message_tx = channels.message_tx;
|
|
|
|
let response_rx = channels.response_rx;
|
|
|
|
let listener_task =
|
|
|
|
tokio::spawn(async move { listener.run(message_tx, response_rx).await });
|
|
|
|
|
|
|
|
let (mut ws_stream, _response) = connect_async(address).await?;
|
|
|
|
|
|
|
|
channels
|
|
|
|
.response_tx
|
|
|
|
.send(Response::RoomJoinResponse(RoomJoinResponse {
|
|
|
|
room_name: "bleep".to_string(),
|
|
|
|
}))
|
|
|
|
.await
|
|
|
|
.context("sending response")?;
|
|
|
|
|
|
|
|
let message = ws_stream
|
|
|
|
.next()
|
|
|
|
.await
|
|
|
|
.context("unwrapping next response from stream")?
|
|
|
|
.context("read result")?;
|
|
|
|
|
|
|
|
let text = message.to_text().context("non-text control message")?;
|
|
|
|
|
|
|
|
let response: Response = serde_json::from_str(&text)
|
|
|
|
.with_context(|| format!("decoding JSON message {:?}", text))?;
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
response,
|
|
|
|
Response::RoomJoinResponse(RoomJoinResponse {
|
|
|
|
room_name: "bleep".to_string(),
|
|
|
|
})
|
|
|
|
);
|
|
|
|
|
|
|
|
// Dropping this sender signals to the listener that it should stop.
|
|
|
|
drop(channels.response_tx);
|
|
|
|
|
|
|
|
let () = listener_task
|
|
|
|
.await
|
|
|
|
.context("joining listener task")?
|
|
|
|
.context("running listener")?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|