From 2f6edc3e556f9d429e152be77c93cefc81fcf635 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Thu, 21 Jan 2021 18:09:22 +0100 Subject: [PATCH] Improve ergonomics of FakeServer. --- src/proto/server/client.rs | 14 +++---- src/proto/server/testing.rs | 74 ++++++++++++++++++++++++++----------- 2 files changed, 58 insertions(+), 30 deletions(-) diff --git a/src/proto/server/client.rs b/src/proto/server/client.rs index 9e0a5ff..dec892c 100644 --- a/src/proto/server/client.rs +++ b/src/proto/server/client.rs @@ -172,22 +172,18 @@ impl Client { mod tests { use futures::stream::StreamExt; use tokio::net; - use tokio::sync::watch; - use crate::proto::server::testing::FakeServer; + use crate::proto::server::testing::fake_server; use crate::proto::server::*; use super::{Client, ClientOptions, Version}; #[tokio::test] async fn client_like_grpc() { - let server = FakeServer::new().await.unwrap(); - let address = server.address().unwrap(); + let (server, handle) = fake_server().await.unwrap(); + let server_task = tokio::spawn(server.serve()); - let (shutdown_tx, shutdown_rx) = watch::channel(()); - let server_task = tokio::spawn(server.serve(shutdown_rx)); - - let stream = net::TcpStream::connect(address).await.unwrap(); + let stream = net::TcpStream::connect(handle.address()).await.unwrap(); let options = ClientOptions { user_name: "alice".to_string(), @@ -210,7 +206,7 @@ mod tests { let _ = dbg!(result); } - drop(shutdown_tx); + handle.shutdown(); server_task.await.unwrap().unwrap(); } } diff --git a/src/proto/server/testing.rs b/src/proto/server/testing.rs index dfb838e..809ad78 100644 --- a/src/proto/server/testing.rs +++ b/src/proto/server/testing.rs @@ -69,13 +69,40 @@ impl GracefulHandler { /// A fake server for connecting to in tests. pub struct FakeServer { listener: TcpListener, + shutdown_rx: watch::Receiver<()>, +} + +/// Allows interacting with a running `FakeServer`. +pub struct FakeServerHandle { + shutdown_tx: watch::Sender<()>, + address: SocketAddr, +} + +impl FakeServerHandle { + /// Returns the address on which the server is accepting connections. + pub fn address(&self) -> SocketAddr { + self.address + } + + /// Starts shutting down the server. + /// Does nothing if the server is already shutting down or even dropped. + pub fn shutdown(self) { + // Ignore send errors, which mean that the server has been dropped. + let _ = self.shutdown_tx.send(()); + } } impl FakeServer { /// Creates a new fake server and binds it to a port on localhost. - pub async fn new() -> io::Result { + /// + /// The returned server will stop serving when `shutdown_rx` is notified, or + /// when its sender is dropped. + pub async fn bind(shutdown_rx: watch::Receiver<()>) -> io::Result { let listener = TcpListener::bind("localhost:0").await?; - Ok(FakeServer { listener }) + Ok(FakeServer { + listener, + shutdown_rx, + }) } /// Returns the address to which this server is bound. @@ -86,18 +113,12 @@ impl FakeServer { /// Runs the server: accepts incoming connections and responds to requests. /// - /// Attempts to shut down when `shutdown_rx` receives a change, or when its - /// sender is dropped. - /// /// Returns an error if: /// /// - an error was encountered while listening /// - an error was encountered while serving a request /// - pub async fn serve( - self, - mut shutdown_rx: watch::Receiver<()>, - ) -> io::Result<()> { + pub async fn serve(mut self) -> io::Result<()> { let mut handler_tasks = vec![]; loop { @@ -110,14 +131,14 @@ impl FakeServer { frame_stream: FrameStream::new(stream), peer_address, }, - shutdown_rx: shutdown_rx.clone(), + shutdown_rx: self.shutdown_rx.clone(), }; handler_tasks.push(tokio::spawn(handler.run())); }, // Ignore receive errors - if shutdown_rx's sender is dropped, we take // that as a signal to shut down too. - _ = shutdown_rx.changed() => break, + _ = self.shutdown_rx.changed() => break, ); } @@ -130,31 +151,42 @@ impl FakeServer { } } +pub async fn fake_server() -> io::Result<(FakeServer, FakeServerHandle)> { + let (shutdown_tx, shutdown_rx) = watch::channel(()); + let server = FakeServer::bind(shutdown_rx).await?; + + let address = server.address()?; + + Ok(( + server, + FakeServerHandle { + shutdown_tx, + address, + }, + )) +} + #[cfg(test)] mod tests { use tokio::net::TcpStream; - use tokio::sync::watch; - use super::FakeServer; + use super::fake_server; #[tokio::test] async fn new_binds_to_localhost() { - let server = FakeServer::new().await.unwrap(); + let (server, _handle) = fake_server().await.unwrap(); assert!(server.address().unwrap().ip().is_loopback()); } #[tokio::test] async fn accepts_incoming_connections() { - let server = FakeServer::new().await.unwrap(); - let address = server.address().unwrap(); - - let (shutdown_tx, shutdown_rx) = watch::channel(()); - let server_task = tokio::spawn(server.serve(shutdown_rx)); + let (server, handle) = fake_server().await.unwrap(); + let server_task = tokio::spawn(server.serve()); // The connection succeeds. - let _ = TcpStream::connect(address).await.unwrap(); + let _ = TcpStream::connect(handle.address()).await.unwrap(); - drop(shutdown_tx); + handle.shutdown(); server_task.await.unwrap().unwrap(); } }