Browse Source

Fix handling of concurrent send and receive.

wip
Titouan Rigoudy 4 years ago
parent
commit
0f13f774cd
1 changed files with 41 additions and 45 deletions
  1. +41
    -45
      proto/src/server/client.rs

+ 41
- 45
proto/src/server/client.rs View File

@ -1,5 +1,6 @@
//! A client interface for remote servers. //! A client interface for remote servers.
use std::future::Future;
use std::io; use std::io;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
@ -61,12 +62,6 @@ impl ClientRunError {
} }
} }
enum WaitResult {
Shutdown,
Request(ServerRequest),
Response(ServerResponse),
}
impl Client { impl Client {
pub async fn login( pub async fn login(
tcp_stream: TcpStream, tcp_stream: TcpStream,
@ -127,62 +122,63 @@ impl Client {
} }
} }
async fn wait<S>(&mut self, request_stream: &mut S) -> Result<WaitResult, ClientRunError>
// This future sends all the requests from `request_stream` through `writer`
// until the stream is finished, then resolves.
async fn send<S>(
writer: &mut FrameWriter<ServerRequest, OwnedWriteHalf>,
mut request_stream: S,
) -> io::Result<()>
where where
S: Stream<Item = ServerRequest> + Unpin, S: Stream<Item = ServerRequest> + Unpin,
{ {
tokio::select!(
maybe_request = request_stream.next() => {
match maybe_request {
None => Ok(WaitResult::Shutdown), // Sender has been dropped.
Some(request) => Ok(WaitResult::Request(request)),
}
},
read_result = Self::read(&mut self.reader) => {
let response = read_result?;
Ok(WaitResult::Response(response))
}
)
while let Some(request) = request_stream.next().await {
debug!("Client: sending request: {:?}", request);
writer.write(&request).await?;
}
Ok(())
} }
async fn send(
&mut self,
request: &ServerRequest,
// It would be easier to inline this `select!` call inside `run()`, but that
// fails due to some weird, undiagnosed error due to the interaction of
// `async_stream::try_stream!`, `select!` and the `?` operator.
async fn run_once(
send: impl Future<Output = io::Result<()>>,
reader: &mut FrameReader<ServerResponse, OwnedReadHalf>,
) -> Result<Option<ServerResponse>, ClientRunError> { ) -> Result<Option<ServerResponse>, ClientRunError> {
tokio::select!(
write_result = self.writer.write(request) => {
write_result?;
Ok(None)
},
read_result = Self::read(&mut self.reader) => {
let response = read_result?;
Ok(Some(response))
},
)
tokio::select! {
send_result = send => {
send_result?;
Ok(None)
},
read_result = Self::read(reader) => {
let response = read_result?;
Ok(Some(response))
},
}
} }
pub fn run<S>( pub fn run<S>(
mut self, mut self,
mut request_stream: S,
request_stream: S,
) -> impl Stream<Item = Result<ServerResponse, ClientRunError>> + Unpin ) -> impl Stream<Item = Result<ServerResponse, ClientRunError>> + Unpin
where where
S: Stream<Item = ServerRequest> + Unpin, S: Stream<Item = ServerRequest> + Unpin,
{ {
Box::pin(async_stream::try_stream! { Box::pin(async_stream::try_stream! {
// Drive the main loop: send requests and receive responses. // Drive the main loop: send requests and receive responses.
loop {
let request = match self.wait(&mut request_stream).await? {
WaitResult::Shutdown => break,
WaitResult::Request(request) => request,
WaitResult::Response(response) => {
//
// We make a big future out of the operation of waiting for requests
// to send and from `request_stream` and sending them out through
// `self.writer`, that we can then poll repeatedly and concurrently
// with polling for responses. This allows us to concurrently write
// and read from the underlying `TcpStream` in full duplex mode.
{
let send = Self::send(&mut self.writer, request_stream);
tokio::pin!(send);
while let Some(response) = Self::run_once(&mut send, &mut self.reader).await? {
yield response; yield response;
continue
},
};
if let Some(response) = self.send(&request).await? {
yield response
}
}
} }
debug!("Client: shutting down outbound stream"); debug!("Client: shutting down outbound stream");


Loading…
Cancel
Save