|
|
|
@ -165,6 +165,8 @@ mod tests { |
|
|
|
|
|
|
|
use anyhow::Context;
|
|
|
|
use futures::{SinkExt, StreamExt};
|
|
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
use tokio::net::TcpStream;
|
|
|
|
use tokio::sync::mpsc;
|
|
|
|
use tokio_tungstenite::connect_async;
|
|
|
|
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
|
|
|
|
@ -230,6 +232,48 @@ mod tests { |
|
|
|
|
|
|
|
let (_ws_stream, _response) = connect_async(address).await?;
|
|
|
|
|
|
|
|
// dropping this sender signals to the listener that it should stop.
|
|
|
|
drop(channels.response_tx);
|
|
|
|
|
|
|
|
let () = listener_task
|
|
|
|
.await
|
|
|
|
.context("joining listener task")?
|
|
|
|
.context("running listener")?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn keeps_listening_after_failed_handshake() -> anyhow::Result<()> {
|
|
|
|
init();
|
|
|
|
|
|
|
|
let mut listener = Listener::bind("localhost:0")
|
|
|
|
.await
|
|
|
|
.context("binding listener")?;
|
|
|
|
|
|
|
|
let address = listener.address().clone();
|
|
|
|
let channels = Channels::default();
|
|
|
|
|
|
|
|
// Move individual fields out of `channels`, for capture by `tokio::spawn`.
|
|
|
|
let message_tx = channels.message_tx;
|
|
|
|
let response_rx = channels.response_rx;
|
|
|
|
let listener_task =
|
|
|
|
tokio::spawn(async move { listener.run(message_tx, response_rx).await });
|
|
|
|
|
|
|
|
let mut stream = TcpStream::connect(address).await.context("connecting")?;
|
|
|
|
|
|
|
|
// Write some invalid data, causing the listener to drop the connection.
|
|
|
|
stream.write_all(&[0]).await.context("writing")?;
|
|
|
|
|
|
|
|
// Expect that the stream immediately closes.
|
|
|
|
let mut buf = Vec::new();
|
|
|
|
stream.read_to_end(&mut buf).await.context("reading")?;
|
|
|
|
assert_eq!(buf, Vec::<u8>::new());
|
|
|
|
|
|
|
|
// Connect a second time.
|
|
|
|
let (_ws_stream, _response) =
|
|
|
|
connect_async(format!("ws://{}", address)).await?;
|
|
|
|
|
|
|
|
// Dropping this sender signals to the listener that it should stop.
|
|
|
|
drop(channels.response_tx);
|
|
|
|
|
|
|
|
|