Browse Source

Improve ergonomics of FakeServer.

wip
Titouan Rigoudy 4 years ago
parent
commit
2f6edc3e55
2 changed files with 58 additions and 30 deletions
  1. +5
    -9
      src/proto/server/client.rs
  2. +53
    -21
      src/proto/server/testing.rs

+ 5
- 9
src/proto/server/client.rs View File

@ -172,22 +172,18 @@ impl Client {
mod tests {
use futures::stream::StreamExt;
use tokio::net;
use tokio::sync::watch;
use crate::proto::server::testing::FakeServer;
use crate::proto::server::testing::fake_server;
use crate::proto::server::*;
use super::{Client, ClientOptions, Version};
#[tokio::test]
async fn client_like_grpc() {
let server = FakeServer::new().await.unwrap();
let address = server.address().unwrap();
let (server, handle) = fake_server().await.unwrap();
let server_task = tokio::spawn(server.serve());
let (shutdown_tx, shutdown_rx) = watch::channel(());
let server_task = tokio::spawn(server.serve(shutdown_rx));
let stream = net::TcpStream::connect(address).await.unwrap();
let stream = net::TcpStream::connect(handle.address()).await.unwrap();
let options = ClientOptions {
user_name: "alice".to_string(),
@ -210,7 +206,7 @@ mod tests {
let _ = dbg!(result);
}
drop(shutdown_tx);
handle.shutdown();
server_task.await.unwrap().unwrap();
}
}

+ 53
- 21
src/proto/server/testing.rs View File

@ -69,13 +69,40 @@ impl GracefulHandler {
/// A fake server for connecting to in tests.
pub struct FakeServer {
listener: TcpListener,
shutdown_rx: watch::Receiver<()>,
}
/// Allows interacting with a running `FakeServer`.
pub struct FakeServerHandle {
shutdown_tx: watch::Sender<()>,
address: SocketAddr,
}
impl FakeServerHandle {
/// Returns the address on which the server is accepting connections.
pub fn address(&self) -> SocketAddr {
self.address
}
/// Starts shutting down the server.
/// Does nothing if the server is already shutting down or even dropped.
pub fn shutdown(self) {
// Ignore send errors, which mean that the server has been dropped.
let _ = self.shutdown_tx.send(());
}
}
impl FakeServer {
/// Creates a new fake server and binds it to a port on localhost.
pub async fn new() -> io::Result<Self> {
///
/// The returned server will stop serving when `shutdown_rx` is notified, or
/// when its sender is dropped.
pub async fn bind(shutdown_rx: watch::Receiver<()>) -> io::Result<Self> {
let listener = TcpListener::bind("localhost:0").await?;
Ok(FakeServer { listener })
Ok(FakeServer {
listener,
shutdown_rx,
})
}
/// Returns the address to which this server is bound.
@ -86,18 +113,12 @@ impl FakeServer {
/// Runs the server: accepts incoming connections and responds to requests.
///
/// Attempts to shut down when `shutdown_rx` receives a change, or when its
/// sender is dropped.
///
/// Returns an error if:
///
/// - an error was encountered while listening
/// - an error was encountered while serving a request
///
pub async fn serve(
self,
mut shutdown_rx: watch::Receiver<()>,
) -> io::Result<()> {
pub async fn serve(mut self) -> io::Result<()> {
let mut handler_tasks = vec![];
loop {
@ -110,14 +131,14 @@ impl FakeServer {
frame_stream: FrameStream::new(stream),
peer_address,
},
shutdown_rx: shutdown_rx.clone(),
shutdown_rx: self.shutdown_rx.clone(),
};
handler_tasks.push(tokio::spawn(handler.run()));
},
// Ignore receive errors - if shutdown_rx's sender is dropped, we take
// that as a signal to shut down too.
_ = shutdown_rx.changed() => break,
_ = self.shutdown_rx.changed() => break,
);
}
@ -130,31 +151,42 @@ impl FakeServer {
}
}
pub async fn fake_server() -> io::Result<(FakeServer, FakeServerHandle)> {
let (shutdown_tx, shutdown_rx) = watch::channel(());
let server = FakeServer::bind(shutdown_rx).await?;
let address = server.address()?;
Ok((
server,
FakeServerHandle {
shutdown_tx,
address,
},
))
}
#[cfg(test)]
mod tests {
use tokio::net::TcpStream;
use tokio::sync::watch;
use super::FakeServer;
use super::fake_server;
#[tokio::test]
async fn new_binds_to_localhost() {
let server = FakeServer::new().await.unwrap();
let (server, _handle) = fake_server().await.unwrap();
assert!(server.address().unwrap().ip().is_loopback());
}
#[tokio::test]
async fn accepts_incoming_connections() {
let server = FakeServer::new().await.unwrap();
let address = server.address().unwrap();
let (shutdown_tx, shutdown_rx) = watch::channel(());
let server_task = tokio::spawn(server.serve(shutdown_rx));
let (server, handle) = fake_server().await.unwrap();
let server_task = tokio::spawn(server.serve());
// The connection succeeds.
let _ = TcpStream::connect(address).await.unwrap();
let _ = TcpStream::connect(handle.address()).await.unwrap();
drop(shutdown_tx);
handle.shutdown();
server_task.await.unwrap().unwrap();
}
}

Loading…
Cancel
Save