Browse Source

Fix stream_closed looping forever.

wip
Titouan Rigoudy 4 years ago
parent
commit
dd1a660c33
2 changed files with 136 additions and 35 deletions
  1. +9
    -6
      src/proto/server/client.rs
  2. +127
    -29
      src/proto/server/testing.rs

+ 9
- 6
src/proto/server/client.rs View File

@ -179,7 +179,9 @@ mod tests {
use tokio::net;
use tokio::sync::mpsc;
use crate::proto::server::testing::{ServerBuilder, UserStatusMap};
use crate::proto::server::testing::{
ServerBuilder, ShutdownType, UserStatusMap,
};
use crate::proto::server::{
Credentials, ServerRequest, ServerResponse, UserStatusRequest,
UserStatusResponse,
@ -220,7 +222,7 @@ mod tests {
let mut inbound = client.run(empty());
assert!(inbound.next().await.is_none());
handle.shutdown();
handle.shutdown(ShutdownType::LameDuck);
server_task.await.unwrap().unwrap();
}
@ -264,7 +266,7 @@ mod tests {
);
assert!(inbound.next().await.is_none());
handle.shutdown();
handle.shutdown(ShutdownType::LameDuck);
server_task.await.unwrap().unwrap();
}
@ -290,7 +292,10 @@ mod tests {
// Server shuts down, closing its connection before the client has had a
// chance to send all of `outbound`.
handle.shutdown();
handle.shutdown(ShutdownType::Immediate);
// Wait for the server to terminate, to avoid race conditions.
server_task.await.unwrap().unwrap();
// Check that the client returns the correct error, then stops running.
assert!(inbound
@ -300,7 +305,5 @@ mod tests {
.unwrap_err()
.is_stream_closed());
assert!(inbound.next().await.is_none());
server_task.await.unwrap().unwrap();
}
}

+ 127
- 29
src/proto/server/testing.rs View File

@ -7,7 +7,9 @@ use std::sync::Arc;
use log::{info, warn};
use parking_lot::Mutex;
use tokio::net::TcpListener;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use crate::proto::core::frame::FrameStream;
@ -137,6 +139,18 @@ impl GracefulHandler {
}
}
struct SenderHandler {
handler: GracefulHandler,
result_tx: mpsc::Sender<io::Result<()>>,
}
impl SenderHandler {
async fn run(self) {
let result = self.handler.run().await;
let _ = self.result_tx.send(result).await;
}
}
/// A builder for Server instances.
#[derive(Default)]
pub struct ServerBuilder {
@ -161,12 +175,20 @@ impl ServerBuilder {
None => Arc::new(Mutex::new(UserStatusMap::default())),
};
let (shutdown_tx, shutdown_rx) = watch::channel(());
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (handler_shutdown_tx, handler_shutdown_rx) = watch::channel(());
// TODO: configurable channel capacity.
let (result_tx, result_rx) = mpsc::channel(100);
Ok((
Server {
listener,
shutdown_rx,
handler_shutdown_tx,
handler_shutdown_rx,
result_tx,
result_rx,
user_status_map,
},
ServerHandle {
@ -177,16 +199,38 @@ impl ServerBuilder {
}
}
/// Specifies how to shut down a server.
pub enum ShutdownType {
/// Shut down immediately, sever open connections.
Immediate,
/// Stop accepting new connections, wait for open connections to close.
LameDuck,
}
/// A simple server for connecting to in tests.
pub struct Server {
// Listener for new connections.
listener: TcpListener,
shutdown_rx: watch::Receiver<()>,
// Receiver for ServerHandle shutdown() notification.
shutdown_rx: oneshot::Receiver<ShutdownType>,
// Watch channel for signalling immediate termination to handlers.
handler_shutdown_tx: watch::Sender<()>,
handler_shutdown_rx: watch::Receiver<()>,
// Channel for receiving results back from handlers.
result_tx: mpsc::Sender<io::Result<()>>,
result_rx: mpsc::Receiver<io::Result<()>>,
// Shared state for handlers to use when serving responses.
user_status_map: Arc<Mutex<UserStatusMap>>,
}
/// Allows interacting with a running `Server`.
pub struct ServerHandle {
shutdown_tx: watch::Sender<()>,
shutdown_tx: oneshot::Sender<ShutdownType>,
address: SocketAddr,
}
@ -198,9 +242,9 @@ impl ServerHandle {
/// Starts shutting down the server.
/// Does nothing if the server is already shutting down or even dropped.
pub fn shutdown(self) {
pub fn shutdown(self, how: ShutdownType) {
// Ignore send errors, which mean that the server has been dropped.
let _ = self.shutdown_tx.send(());
let _ = self.shutdown_tx.send(how);
}
}
@ -211,6 +255,33 @@ impl Server {
self.listener.local_addr()
}
/// Spawns a handler for the given new stream, initiated by a remote peer.
fn spawn_handler(&mut self, stream: TcpStream, peer_address: SocketAddr) {
let handler = SenderHandler {
handler: GracefulHandler {
handler: Handler {
frame_stream: FrameStream::new(stream),
peer_address,
user_status_map: self.user_status_map.clone(),
},
shutdown_rx: self.handler_shutdown_rx.clone(),
},
result_tx: self.result_tx.clone(),
};
tokio::spawn(handler.run());
}
/// Accepts a single connection, spawns a handler for it and returns.
///
/// Useful for tests who need to guarantee a handler is spawned before the
/// server shuts down.
async fn accept(&mut self) -> io::Result<()> {
let (stream, peer_address) = self.listener.accept().await?;
self.spawn_handler(stream, peer_address);
Ok(())
}
/// Runs the server: accepts incoming connections and responds to requests.
///
/// Returns an error if:
@ -219,35 +290,32 @@ impl Server {
/// - an error was encountered while serving a request
///
pub async fn serve(mut self) -> io::Result<()> {
let mut handler_tasks = vec![];
loop {
tokio::select!(
result = self.listener.accept() => {
let (stream, peer_address) = result?;
let handler = GracefulHandler {
handler: Handler {
frame_stream: FrameStream::new(stream),
peer_address,
user_status_map: self.user_status_map.clone(),
},
shutdown_rx: self.shutdown_rx.clone(),
};
handler_tasks.push(tokio::spawn(handler.run()));
self.spawn_handler(stream, peer_address);
},
// Ignore receive errors - if shutdown_rx's sender is dropped, we take
// that as a signal to shut down too.
_ = self.shutdown_rx.changed() => break,
// If shutdown_rx's sender is dropped and we receive an error, we take
// that as a signal to shut down immediately too.
result = &mut self.shutdown_rx => {
match result.unwrap_or(ShutdownType::Immediate) {
ShutdownType::LameDuck => break,
ShutdownType::Immediate => {
// Send errors cannot happen, since we hold onto a receiver in
// `self.handler_shutdown_rx`.
self.handler_shutdown_tx.send(()).unwrap();
break
}
}
}
);
}
info!("Server: shutting down");
// TODO: pass results back instead through an mpsc channel.
for task in handler_tasks {
task.await??;
drop(self.result_tx);
while let Some(result) = self.result_rx.recv().await {
result?;
}
Ok(())
@ -256,9 +324,11 @@ impl Server {
#[cfg(test)]
mod tests {
use std::io;
use tokio::net::TcpStream;
use super::ServerBuilder;
use super::{ServerBuilder, ShutdownType};
// Enable capturing logs in tests.
fn init() {
@ -284,7 +354,35 @@ mod tests {
// The connection succeeds.
let _ = TcpStream::connect(handle.address()).await.unwrap();
handle.shutdown();
server_task.await.unwrap().unwrap();
handle.shutdown(ShutdownType::Immediate);
// Ignore errors, which can happen when the handler task is spawned right
// before we call `handle.shutdown()`. See `serve_yields_handler_error`.
let _ = server_task.await.unwrap();
}
// This test verifies that when a handler encounters an error, it is
// reflected in `Server::serve()`'s return value.
#[tokio::test]
async fn serve_yields_handler_error() {
init();
let (mut server, handle) = ServerBuilder::default().bind().await.unwrap();
// The connection is accepted, then immediately closed.
let address = handle.address();
tokio::spawn(async move {
let _ = TcpStream::connect(address).await.unwrap();
});
// Accept the connection on the server and spawn a handler for it.
server.accept().await.unwrap();
// Signal that the server should stop accepting incoming connections.
handle.shutdown(ShutdownType::LameDuck);
// Drain outstanding requests, encountering the error.
let error = server.serve().await.unwrap_err();
assert_eq!(error.kind(), io::ErrorKind::UnexpectedEof);
}
}

Loading…
Cancel
Save