|
|
|
@ -61,9 +61,9 @@ impl ClientRunError { |
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
enum RunOnceResult {
|
|
|
|
Break,
|
|
|
|
Continue,
|
|
|
|
enum WaitResult {
|
|
|
|
Shutdown,
|
|
|
|
Request(ServerRequest),
|
|
|
|
Response(ServerResponse),
|
|
|
|
}
|
|
|
|
|
|
|
|
@ -115,30 +115,49 @@ impl Client { |
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn run_once<S>(&mut self, request_stream: &mut S) -> Result<RunOnceResult, ClientRunError>
|
|
|
|
async fn read(
|
|
|
|
reader: &mut FrameReader<ServerResponse, OwnedReadHalf>,
|
|
|
|
) -> Result<ServerResponse, ClientRunError> {
|
|
|
|
match reader.read().await? {
|
|
|
|
Some(response) => {
|
|
|
|
debug!("Client: received response: {:?}", response);
|
|
|
|
Ok(response)
|
|
|
|
}
|
|
|
|
None => Err(ClientRunError::StreamClosed),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn wait<S>(&mut self, request_stream: &mut S) -> Result<WaitResult, ClientRunError>
|
|
|
|
where
|
|
|
|
S: Stream<Item = ServerRequest> + Unpin,
|
|
|
|
{
|
|
|
|
tokio::select!(
|
|
|
|
maybe_request = request_stream.next() => {
|
|
|
|
if let Some(request) = maybe_request {
|
|
|
|
debug!("Client: sending request: {:?}", request);
|
|
|
|
self.writer.write(&request).await?;
|
|
|
|
Ok(RunOnceResult::Continue)
|
|
|
|
} else {
|
|
|
|
// Sender has been dropped.
|
|
|
|
Ok(RunOnceResult::Break)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
read_result = self.reader.read() => {
|
|
|
|
match read_result? {
|
|
|
|
Some(response) => {
|
|
|
|
debug!("Client: received response: {:?}", response);
|
|
|
|
Ok(RunOnceResult::Response(response))
|
|
|
|
}
|
|
|
|
None => Err(ClientRunError::StreamClosed),
|
|
|
|
match maybe_request {
|
|
|
|
None => Ok(WaitResult::Shutdown), // Sender has been dropped.
|
|
|
|
Some(request) => Ok(WaitResult::Request(request)),
|
|
|
|
}
|
|
|
|
},
|
|
|
|
read_result = Self::read(&mut self.reader) => {
|
|
|
|
let response = read_result?;
|
|
|
|
Ok(WaitResult::Response(response))
|
|
|
|
}
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn send(
|
|
|
|
&mut self,
|
|
|
|
request: &ServerRequest,
|
|
|
|
) -> Result<Option<ServerResponse>, ClientRunError> {
|
|
|
|
tokio::select!(
|
|
|
|
write_result = self.writer.write(request) => {
|
|
|
|
write_result?;
|
|
|
|
Ok(None)
|
|
|
|
},
|
|
|
|
read_result = Self::read(&mut self.reader) => {
|
|
|
|
let response = read_result?;
|
|
|
|
Ok(Some(response))
|
|
|
|
},
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
@ -152,10 +171,17 @@ impl Client { |
|
|
|
Box::pin(async_stream::try_stream! {
|
|
|
|
// Drive the main loop: send requests and receive responses.
|
|
|
|
loop {
|
|
|
|
match self.run_once(&mut request_stream).await? {
|
|
|
|
RunOnceResult::Break => break,
|
|
|
|
RunOnceResult::Continue => continue,
|
|
|
|
RunOnceResult::Response(response) => yield response,
|
|
|
|
let request = match self.wait(&mut request_stream).await? {
|
|
|
|
WaitResult::Shutdown => break,
|
|
|
|
WaitResult::Request(request) => request,
|
|
|
|
WaitResult::Response(response) => {
|
|
|
|
yield response;
|
|
|
|
continue
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Some(response) = self.send(&request).await? {
|
|
|
|
yield response
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|