diff --git a/src/proto/server/client.rs b/src/proto/server/client.rs index 1e253b9..a01a485 100644 --- a/src/proto/server/client.rs +++ b/src/proto/server/client.rs @@ -42,10 +42,22 @@ pub enum ClientLoginError { /// An error that arose while running the client. #[derive(Debug, Error)] pub enum ClientRunError { + #[error("underlying stream was closed unexpectedly")] + StreamClosed, + #[error("i/o error: {0}")] IOError(#[from] io::Error), } +impl ClientRunError { + fn is_stream_closed(&self) -> bool { + match self { + ClientRunError::StreamClosed => true, + _ => false, + } + } +} + enum RunOnceResult { Break, Continue, @@ -126,8 +138,7 @@ impl Client { debug!("Client: received response: {:?}", response); Ok(RunOnceResult::Response(response)) } - // TODO: Consider returning error here. - None => Ok(RunOnceResult::Break), + None => Err(ClientRunError::StreamClosed), } }, ) @@ -166,6 +177,7 @@ impl Client { mod tests { use futures::stream::{empty, StreamExt}; use tokio::net; + use tokio::sync::mpsc; use crate::proto::server::testing::{fake_server, UserStatusMap}; use crate::proto::server::{ @@ -256,4 +268,47 @@ mod tests { handle.shutdown(); server_task.await.unwrap().unwrap(); } + + #[tokio::test] + async fn stream_closed() { + init(); + + let (server, handle) = fake_server().await.unwrap(); + let server_task = tokio::spawn(server.serve()); + + let stream = net::TcpStream::connect(handle.address()).await.unwrap(); + + let user_name = "alice".to_string(); + let password = "sekrit".to_string(); + let credentials = Credentials::new(user_name, password).unwrap(); + + let options = ClientOptions { + credentials, + version: Version::default(), + }; + let client = Client::login(stream, options).await.unwrap(); + + let (_request_tx, mut request_rx) = mpsc::channel(1); + let outbound = Box::pin(async_stream::stream! { + while let Some(request) = request_rx.recv().await { + yield request; + } + }); + + let mut inbound = client.run(outbound); + + // Server shuts down, closing its connection before the client has had a + // chance to send all of `outbound`. + handle.shutdown(); + + // Check that the client returns the correct error. + assert!(inbound + .next() + .await + .unwrap() + .unwrap_err() + .is_stream_closed()); + + server_task.await.unwrap().unwrap(); + } }