diff --git a/proto/src/server/client.rs b/proto/src/server/client.rs index 617d177..aff1746 100644 --- a/proto/src/server/client.rs +++ b/proto/src/server/client.rs @@ -61,9 +61,9 @@ impl ClientRunError { } } -enum RunOnceResult { - Break, - Continue, +enum WaitResult { + Shutdown, + Request(ServerRequest), Response(ServerResponse), } @@ -115,30 +115,49 @@ impl Client { } } - async fn run_once(&mut self, request_stream: &mut S) -> Result + async fn read( + reader: &mut FrameReader, + ) -> Result { + match reader.read().await? { + Some(response) => { + debug!("Client: received response: {:?}", response); + Ok(response) + } + None => Err(ClientRunError::StreamClosed), + } + } + + async fn wait(&mut self, request_stream: &mut S) -> Result where S: Stream + Unpin, { tokio::select!( maybe_request = request_stream.next() => { - if let Some(request) = maybe_request { - debug!("Client: sending request: {:?}", request); - self.writer.write(&request).await?; - Ok(RunOnceResult::Continue) - } else { - // Sender has been dropped. - Ok(RunOnceResult::Break) - } - }, - read_result = self.reader.read() => { - match read_result? { - Some(response) => { - debug!("Client: received response: {:?}", response); - Ok(RunOnceResult::Response(response)) - } - None => Err(ClientRunError::StreamClosed), + match maybe_request { + None => Ok(WaitResult::Shutdown), // Sender has been dropped. + Some(request) => Ok(WaitResult::Request(request)), } }, + read_result = Self::read(&mut self.reader) => { + let response = read_result?; + Ok(WaitResult::Response(response)) + } + ) + } + + async fn send( + &mut self, + request: &ServerRequest, + ) -> Result, ClientRunError> { + tokio::select!( + write_result = self.writer.write(request) => { + write_result?; + Ok(None) + }, + read_result = Self::read(&mut self.reader) => { + let response = read_result?; + Ok(Some(response)) + }, ) } @@ -152,10 +171,17 @@ impl Client { Box::pin(async_stream::try_stream! { // Drive the main loop: send requests and receive responses. loop { - match self.run_once(&mut request_stream).await? { - RunOnceResult::Break => break, - RunOnceResult::Continue => continue, - RunOnceResult::Response(response) => yield response, + let request = match self.wait(&mut request_stream).await? { + WaitResult::Shutdown => break, + WaitResult::Request(request) => request, + WaitResult::Response(response) => { + yield response; + continue + }, + }; + + if let Some(response) = self.send(&request).await? { + yield response } }