diff --git a/src/proto/core/frame.rs b/src/proto/core/frame.rs index ead6743..3ff1aa3 100644 --- a/src/proto/core/frame.rs +++ b/src/proto/core/frame.rs @@ -182,6 +182,8 @@ where self.encoder.encode_to(frame, &mut bytes)?; self.stream.write_all(bytes.as_ref()).await } + + pub fn get_mut_stream(&mut self) -> &mut TcpStream { &mut self.stream } } mod tests { diff --git a/src/proto/server/client.rs b/src/proto/server/client.rs index e16fb0a..2116039 100644 --- a/src/proto/server/client.rs +++ b/src/proto/server/client.rs @@ -4,6 +4,7 @@ use std::io; use futures::stream::{Stream, StreamExt}; use thiserror::Error; +use tokio::io::AsyncWriteExt; use tokio::net; use crate::proto::core::frame::FrameStream; @@ -135,7 +136,8 @@ impl Client { self.frame_stream.write(&request).await?; Ok(RunOnceResult::Continue) } else { - // Sender has been dropped. + // Sender has been dropped. Shut down the write half of the stream. + self.frame_stream.get_mut_stream().shutdown().await?; Ok(RunOnceResult::Break) } }, @@ -194,19 +196,14 @@ mod tests { let client = Client::login(stream, options).await.unwrap(); let outbound = Box::pin(async_stream::stream! { - loop { - yield ServerRequest::UserStatusRequest(UserStatusRequest { - user_name: "bob".to_string(), - }); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } + yield ServerRequest::UserStatusRequest(UserStatusRequest { + user_name: "bob".to_string(), + }); }); let mut inbound = client.run(outbound); while let Some(result) = inbound.next().await { let _ = dbg!(result); } - - // TODO: Way to run FakeServer and stop it at will? } }