From 0f13f774cd3639a32569d8394dcc075074c6dd6f Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Sun, 11 Jul 2021 13:47:51 -0400 Subject: [PATCH] Fix handling of concurrent send and receive. --- proto/src/server/client.rs | 86 ++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/proto/src/server/client.rs b/proto/src/server/client.rs index aff1746..3b1a772 100644 --- a/proto/src/server/client.rs +++ b/proto/src/server/client.rs @@ -1,5 +1,6 @@ //! A client interface for remote servers. +use std::future::Future; use std::io; use futures::stream::{Stream, StreamExt}; @@ -61,12 +62,6 @@ impl ClientRunError { } } -enum WaitResult { - Shutdown, - Request(ServerRequest), - Response(ServerResponse), -} - impl Client { pub async fn login( tcp_stream: TcpStream, @@ -127,62 +122,63 @@ impl Client { } } - async fn wait(&mut self, request_stream: &mut S) -> Result + // This future sends all the requests from `request_stream` through `writer` + // until the stream is finished, then resolves. + async fn send( + writer: &mut FrameWriter, + mut request_stream: S, + ) -> io::Result<()> where S: Stream + Unpin, { - tokio::select!( - maybe_request = request_stream.next() => { - match maybe_request { - None => Ok(WaitResult::Shutdown), // Sender has been dropped. - Some(request) => Ok(WaitResult::Request(request)), - } - }, - read_result = Self::read(&mut self.reader) => { - let response = read_result?; - Ok(WaitResult::Response(response)) - } - ) + while let Some(request) = request_stream.next().await { + debug!("Client: sending request: {:?}", request); + writer.write(&request).await?; + } + Ok(()) } - async fn send( - &mut self, - request: &ServerRequest, + // It would be easier to inline this `select!` call inside `run()`, but that + // fails due to some weird, undiagnosed error due to the interaction of + // `async_stream::try_stream!`, `select!` and the `?` operator. + async fn run_once( + send: impl Future>, + reader: &mut FrameReader, ) -> Result, ClientRunError> { - tokio::select!( - write_result = self.writer.write(request) => { - write_result?; - Ok(None) - }, - read_result = Self::read(&mut self.reader) => { - let response = read_result?; - Ok(Some(response)) - }, - ) + tokio::select! { + send_result = send => { + send_result?; + Ok(None) + }, + read_result = Self::read(reader) => { + let response = read_result?; + Ok(Some(response)) + }, + } } pub fn run( mut self, - mut request_stream: S, + request_stream: S, ) -> impl Stream> + Unpin where S: Stream + Unpin, { Box::pin(async_stream::try_stream! { // Drive the main loop: send requests and receive responses. - loop { - let request = match self.wait(&mut request_stream).await? { - WaitResult::Shutdown => break, - WaitResult::Request(request) => request, - WaitResult::Response(response) => { + // + // We make a big future out of the operation of waiting for requests + // to send and from `request_stream` and sending them out through + // `self.writer`, that we can then poll repeatedly and concurrently + // with polling for responses. This allows us to concurrently write + // and read from the underlying `TcpStream` in full duplex mode. + { + let send = Self::send(&mut self.writer, request_stream); + tokio::pin!(send); + + while let Some(response) = Self::run_once(&mut send, &mut self.reader).await? { yield response; - continue - }, - }; - - if let Some(response) = self.send(&request).await? { - yield response - } + } } debug!("Client: shutting down outbound stream");