diff --git a/proto/src/core/channel.rs b/proto/src/core/channel.rs index f0379b5..66324a0 100644 --- a/proto/src/core/channel.rs +++ b/proto/src/core/channel.rs @@ -33,10 +33,39 @@ impl ChannelError { } } +/// A wrapper around a frame reader. Logically a part of `Channel`. +/// +/// Exists to provide `Channel` functionality that requires borrowing the +/// channel's reader only. This allows borrowing both the reader and the writer +/// at the same time in `Channel` without resorting to static methods. +#[derive(Debug)] +struct ChannelReader { + inner: FrameReader, +} + +impl ChannelReader +where + ReadFrame: ValueDecode + Debug, +{ + async fn read(&mut self) -> io::Result> { + self.inner.read().await.map(|frame| { + debug!("Channel: received frame: {:?}", frame); + frame + }) + } + + async fn read_strict(&mut self) -> Result { + match self.read().await? { + None => Err(ChannelError::UnexpectedEof), + Some(frame) => Ok(frame), + } + } +} + /// An asynchronous bidirectional message channel over TCP. #[derive(Debug)] pub struct Channel { - reader: FrameReader, + reader: ChannelReader, writer: FrameWriter, } @@ -49,29 +78,13 @@ where pub fn new(stream: TcpStream) -> Self { let (read_half, write_half) = stream.into_split(); Self { - reader: FrameReader::new(read_half), + reader: ChannelReader { + inner: FrameReader::new(read_half), + }, writer: FrameWriter::new(write_half), } } - async fn read( - reader: &mut FrameReader, - ) -> io::Result> { - reader.read().await.map(|frame| { - debug!("Channel: received frame: {:?}", frame); - frame - }) - } - - async fn read_strict( - reader: &mut FrameReader, - ) -> Result { - match reader.read().await? { - None => Err(ChannelError::UnexpectedEof), - Some(frame) => Ok(frame), - } - } - // This future sends all the requests from `request_stream` through `writer` // until the stream is finished, then resolves. async fn send>( @@ -91,24 +104,24 @@ where // `async_stream::try_stream!`, `select!` and the `?` operator. async fn run_once>>( send_task: S, - reader: &mut FrameReader, + reader: &mut ChannelReader, ) -> Result, ChannelError> { tokio::select! { send_result = send_task => { send_result?; Ok(None) }, - read_result = Self::read_strict(reader) => read_result.map(Some), + read_result = reader.read_strict() => read_result.map(Some), } } /// Attempts to read a single frame from the underlying stream. - pub async fn read_once(&mut self) -> Result { - Self::read_strict(&mut self.reader).await + pub async fn read(&mut self) -> Result { + self.reader.read_strict().await } /// Attempts to write a single frame to the underlying stream. - pub async fn write_once(&mut self, frame: &WriteFrame) -> io::Result<()> { + pub async fn write(&mut self, frame: &WriteFrame) -> io::Result<()> { self.writer.write(frame).await } @@ -141,7 +154,7 @@ where self.writer.shutdown().await?; // Drain the receiving end of the connection. - while let Some(frame) = Self::read(&mut self.reader).await? { + while let Some(frame) = self.reader.read().await? { yield frame; } } diff --git a/proto/src/server/client.rs b/proto/src/server/client.rs index 39fbd10..dbcb410 100644 --- a/proto/src/server/client.rs +++ b/proto/src/server/client.rs @@ -61,9 +61,9 @@ impl Client { debug!("Client: sending login request: {:?}", login_request); let request = login_request.into(); - self.channel.write_once(&request).await?; + self.channel.write(&request).await?; - let response = self.channel.read_once().await?; + let response = self.channel.read().await?; debug!("Client: received first response: {:?}", response); match response {