Browse Source

Make Client::run() yield an error when stream is closed.

wip
Titouan Rigoudy 4 years ago
parent
commit
756a236608
1 changed files with 57 additions and 2 deletions
  1. +57
    -2
      src/proto/server/client.rs

+ 57
- 2
src/proto/server/client.rs View File

@ -42,10 +42,22 @@ pub enum ClientLoginError {
/// An error that arose while running the client.
#[derive(Debug, Error)]
pub enum ClientRunError {
#[error("underlying stream was closed unexpectedly")]
StreamClosed,
#[error("i/o error: {0}")]
IOError(#[from] io::Error),
}
impl ClientRunError {
fn is_stream_closed(&self) -> bool {
match self {
ClientRunError::StreamClosed => true,
_ => false,
}
}
}
enum RunOnceResult {
Break,
Continue,
@ -126,8 +138,7 @@ impl Client {
debug!("Client: received response: {:?}", response);
Ok(RunOnceResult::Response(response))
}
// TODO: Consider returning error here.
None => Ok(RunOnceResult::Break),
None => Err(ClientRunError::StreamClosed),
}
},
)
@ -166,6 +177,7 @@ impl Client {
mod tests {
use futures::stream::{empty, StreamExt};
use tokio::net;
use tokio::sync::mpsc;
use crate::proto::server::testing::{fake_server, UserStatusMap};
use crate::proto::server::{
@ -256,4 +268,47 @@ mod tests {
handle.shutdown();
server_task.await.unwrap().unwrap();
}
#[tokio::test]
async fn stream_closed() {
init();
let (server, handle) = fake_server().await.unwrap();
let server_task = tokio::spawn(server.serve());
let stream = net::TcpStream::connect(handle.address()).await.unwrap();
let user_name = "alice".to_string();
let password = "sekrit".to_string();
let credentials = Credentials::new(user_name, password).unwrap();
let options = ClientOptions {
credentials,
version: Version::default(),
};
let client = Client::login(stream, options).await.unwrap();
let (_request_tx, mut request_rx) = mpsc::channel(1);
let outbound = Box::pin(async_stream::stream! {
while let Some(request) = request_rx.recv().await {
yield request;
}
});
let mut inbound = client.run(outbound);
// Server shuts down, closing its connection before the client has had a
// chance to send all of `outbound`.
handle.shutdown();
// Check that the client returns the correct error.
assert!(inbound
.next()
.await
.unwrap()
.unwrap_err()
.is_stream_closed());
server_task.await.unwrap().unwrap();
}
}

Loading…
Cancel
Save