From dd1a660c3348f37eb2b2749ebdad7dc4a09a0214 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Sat, 23 Jan 2021 16:31:13 +0100 Subject: [PATCH] Fix stream_closed looping forever. --- src/proto/server/client.rs | 15 ++-- src/proto/server/testing.rs | 156 +++++++++++++++++++++++++++++------- 2 files changed, 136 insertions(+), 35 deletions(-) diff --git a/src/proto/server/client.rs b/src/proto/server/client.rs index acaea6d..15f12f8 100644 --- a/src/proto/server/client.rs +++ b/src/proto/server/client.rs @@ -179,7 +179,9 @@ mod tests { use tokio::net; use tokio::sync::mpsc; - use crate::proto::server::testing::{ServerBuilder, UserStatusMap}; + use crate::proto::server::testing::{ + ServerBuilder, ShutdownType, UserStatusMap, + }; use crate::proto::server::{ Credentials, ServerRequest, ServerResponse, UserStatusRequest, UserStatusResponse, @@ -220,7 +222,7 @@ mod tests { let mut inbound = client.run(empty()); assert!(inbound.next().await.is_none()); - handle.shutdown(); + handle.shutdown(ShutdownType::LameDuck); server_task.await.unwrap().unwrap(); } @@ -264,7 +266,7 @@ mod tests { ); assert!(inbound.next().await.is_none()); - handle.shutdown(); + handle.shutdown(ShutdownType::LameDuck); server_task.await.unwrap().unwrap(); } @@ -290,7 +292,10 @@ mod tests { // Server shuts down, closing its connection before the client has had a // chance to send all of `outbound`. - handle.shutdown(); + handle.shutdown(ShutdownType::Immediate); + + // Wait for the server to terminate, to avoid race conditions. + server_task.await.unwrap().unwrap(); // Check that the client returns the correct error, then stops running. assert!(inbound @@ -300,7 +305,5 @@ mod tests { .unwrap_err() .is_stream_closed()); assert!(inbound.next().await.is_none()); - - server_task.await.unwrap().unwrap(); } } diff --git a/src/proto/server/testing.rs b/src/proto/server/testing.rs index 843e210..710a2a5 100644 --- a/src/proto/server/testing.rs +++ b/src/proto/server/testing.rs @@ -7,7 +7,9 @@ use std::sync::Arc; use log::{info, warn}; use parking_lot::Mutex; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::sync::watch; use crate::proto::core::frame::FrameStream; @@ -137,6 +139,18 @@ impl GracefulHandler { } } +struct SenderHandler { + handler: GracefulHandler, + result_tx: mpsc::Sender>, +} + +impl SenderHandler { + async fn run(self) { + let result = self.handler.run().await; + let _ = self.result_tx.send(result).await; + } +} + /// A builder for Server instances. #[derive(Default)] pub struct ServerBuilder { @@ -161,12 +175,20 @@ impl ServerBuilder { None => Arc::new(Mutex::new(UserStatusMap::default())), }; - let (shutdown_tx, shutdown_rx) = watch::channel(()); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let (handler_shutdown_tx, handler_shutdown_rx) = watch::channel(()); + + // TODO: configurable channel capacity. + let (result_tx, result_rx) = mpsc::channel(100); Ok(( Server { listener, shutdown_rx, + handler_shutdown_tx, + handler_shutdown_rx, + result_tx, + result_rx, user_status_map, }, ServerHandle { @@ -177,16 +199,38 @@ impl ServerBuilder { } } +/// Specifies how to shut down a server. +pub enum ShutdownType { + /// Shut down immediately, sever open connections. + Immediate, + + /// Stop accepting new connections, wait for open connections to close. + LameDuck, +} + /// A simple server for connecting to in tests. pub struct Server { + // Listener for new connections. listener: TcpListener, - shutdown_rx: watch::Receiver<()>, + + // Receiver for ServerHandle shutdown() notification. + shutdown_rx: oneshot::Receiver, + + // Watch channel for signalling immediate termination to handlers. + handler_shutdown_tx: watch::Sender<()>, + handler_shutdown_rx: watch::Receiver<()>, + + // Channel for receiving results back from handlers. + result_tx: mpsc::Sender>, + result_rx: mpsc::Receiver>, + + // Shared state for handlers to use when serving responses. user_status_map: Arc>, } /// Allows interacting with a running `Server`. pub struct ServerHandle { - shutdown_tx: watch::Sender<()>, + shutdown_tx: oneshot::Sender, address: SocketAddr, } @@ -198,9 +242,9 @@ impl ServerHandle { /// Starts shutting down the server. /// Does nothing if the server is already shutting down or even dropped. - pub fn shutdown(self) { + pub fn shutdown(self, how: ShutdownType) { // Ignore send errors, which mean that the server has been dropped. - let _ = self.shutdown_tx.send(()); + let _ = self.shutdown_tx.send(how); } } @@ -211,6 +255,33 @@ impl Server { self.listener.local_addr() } + /// Spawns a handler for the given new stream, initiated by a remote peer. + fn spawn_handler(&mut self, stream: TcpStream, peer_address: SocketAddr) { + let handler = SenderHandler { + handler: GracefulHandler { + handler: Handler { + frame_stream: FrameStream::new(stream), + peer_address, + user_status_map: self.user_status_map.clone(), + }, + shutdown_rx: self.handler_shutdown_rx.clone(), + }, + result_tx: self.result_tx.clone(), + }; + + tokio::spawn(handler.run()); + } + + /// Accepts a single connection, spawns a handler for it and returns. + /// + /// Useful for tests who need to guarantee a handler is spawned before the + /// server shuts down. + async fn accept(&mut self) -> io::Result<()> { + let (stream, peer_address) = self.listener.accept().await?; + self.spawn_handler(stream, peer_address); + Ok(()) + } + /// Runs the server: accepts incoming connections and responds to requests. /// /// Returns an error if: @@ -219,35 +290,32 @@ impl Server { /// - an error was encountered while serving a request /// pub async fn serve(mut self) -> io::Result<()> { - let mut handler_tasks = vec![]; - loop { tokio::select!( result = self.listener.accept() => { let (stream, peer_address) = result?; - - let handler = GracefulHandler { - handler: Handler { - frame_stream: FrameStream::new(stream), - peer_address, - user_status_map: self.user_status_map.clone(), - }, - shutdown_rx: self.shutdown_rx.clone(), - }; - - handler_tasks.push(tokio::spawn(handler.run())); + self.spawn_handler(stream, peer_address); }, - // Ignore receive errors - if shutdown_rx's sender is dropped, we take - // that as a signal to shut down too. - _ = self.shutdown_rx.changed() => break, + // If shutdown_rx's sender is dropped and we receive an error, we take + // that as a signal to shut down immediately too. + result = &mut self.shutdown_rx => { + match result.unwrap_or(ShutdownType::Immediate) { + ShutdownType::LameDuck => break, + ShutdownType::Immediate => { + // Send errors cannot happen, since we hold onto a receiver in + // `self.handler_shutdown_rx`. + self.handler_shutdown_tx.send(()).unwrap(); + break + } + } + } ); } info!("Server: shutting down"); - - // TODO: pass results back instead through an mpsc channel. - for task in handler_tasks { - task.await??; + drop(self.result_tx); + while let Some(result) = self.result_rx.recv().await { + result?; } Ok(()) @@ -256,9 +324,11 @@ impl Server { #[cfg(test)] mod tests { + use std::io; + use tokio::net::TcpStream; - use super::ServerBuilder; + use super::{ServerBuilder, ShutdownType}; // Enable capturing logs in tests. fn init() { @@ -284,7 +354,35 @@ mod tests { // The connection succeeds. let _ = TcpStream::connect(handle.address()).await.unwrap(); - handle.shutdown(); - server_task.await.unwrap().unwrap(); + handle.shutdown(ShutdownType::Immediate); + + // Ignore errors, which can happen when the handler task is spawned right + // before we call `handle.shutdown()`. See `serve_yields_handler_error`. + let _ = server_task.await.unwrap(); + } + + // This test verifies that when a handler encounters an error, it is + // reflected in `Server::serve()`'s return value. + #[tokio::test] + async fn serve_yields_handler_error() { + init(); + + let (mut server, handle) = ServerBuilder::default().bind().await.unwrap(); + + // The connection is accepted, then immediately closed. + let address = handle.address(); + tokio::spawn(async move { + let _ = TcpStream::connect(address).await.unwrap(); + }); + + // Accept the connection on the server and spawn a handler for it. + server.accept().await.unwrap(); + + // Signal that the server should stop accepting incoming connections. + handle.shutdown(ShutdownType::LameDuck); + + // Drain outstanding requests, encountering the error. + let error = server.serve().await.unwrap_err(); + assert_eq!(error.kind(), io::ErrorKind::UnexpectedEof); } }