|
|
|
@ -33,10 +33,39 @@ impl ChannelError { |
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// A wrapper around a frame reader. Logically a part of `Channel`.
|
|
|
|
///
|
|
|
|
/// Exists to provide `Channel` functionality that requires borrowing the
|
|
|
|
/// channel's reader only. This allows borrowing both the reader and the writer
|
|
|
|
/// at the same time in `Channel` without resorting to static methods.
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct ChannelReader<ReadFrame> {
|
|
|
|
inner: FrameReader<ReadFrame, OwnedReadHalf>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<ReadFrame> ChannelReader<ReadFrame>
|
|
|
|
where
|
|
|
|
ReadFrame: ValueDecode + Debug,
|
|
|
|
{
|
|
|
|
async fn read(&mut self) -> io::Result<Option<ReadFrame>> {
|
|
|
|
self.inner.read().await.map(|frame| {
|
|
|
|
debug!("Channel: received frame: {:?}", frame);
|
|
|
|
frame
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn read_strict(&mut self) -> Result<ReadFrame, ChannelError> {
|
|
|
|
match self.read().await? {
|
|
|
|
None => Err(ChannelError::UnexpectedEof),
|
|
|
|
Some(frame) => Ok(frame),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// An asynchronous bidirectional message channel over TCP.
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct Channel<ReadFrame, WriteFrame> {
|
|
|
|
reader: FrameReader<ReadFrame, OwnedReadHalf>,
|
|
|
|
reader: ChannelReader<ReadFrame>,
|
|
|
|
writer: FrameWriter<WriteFrame, OwnedWriteHalf>,
|
|
|
|
}
|
|
|
|
|
|
|
|
@ -49,29 +78,13 @@ where |
|
|
|
pub fn new(stream: TcpStream) -> Self {
|
|
|
|
let (read_half, write_half) = stream.into_split();
|
|
|
|
Self {
|
|
|
|
reader: FrameReader::new(read_half),
|
|
|
|
reader: ChannelReader {
|
|
|
|
inner: FrameReader::new(read_half),
|
|
|
|
},
|
|
|
|
writer: FrameWriter::new(write_half),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn read(
|
|
|
|
reader: &mut FrameReader<ReadFrame, OwnedReadHalf>,
|
|
|
|
) -> io::Result<Option<ReadFrame>> {
|
|
|
|
reader.read().await.map(|frame| {
|
|
|
|
debug!("Channel: received frame: {:?}", frame);
|
|
|
|
frame
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn read_strict(
|
|
|
|
reader: &mut FrameReader<ReadFrame, OwnedReadHalf>,
|
|
|
|
) -> Result<ReadFrame, ChannelError> {
|
|
|
|
match reader.read().await? {
|
|
|
|
None => Err(ChannelError::UnexpectedEof),
|
|
|
|
Some(frame) => Ok(frame),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// This future sends all the requests from `request_stream` through `writer`
|
|
|
|
// until the stream is finished, then resolves.
|
|
|
|
async fn send<S: Stream<Item = WriteFrame>>(
|
|
|
|
@ -91,24 +104,24 @@ where |
|
|
|
// `async_stream::try_stream!`, `select!` and the `?` operator.
|
|
|
|
async fn run_once<S: Future<Output = io::Result<()>>>(
|
|
|
|
send_task: S,
|
|
|
|
reader: &mut FrameReader<ReadFrame, OwnedReadHalf>,
|
|
|
|
reader: &mut ChannelReader<ReadFrame>,
|
|
|
|
) -> Result<Option<ReadFrame>, ChannelError> {
|
|
|
|
tokio::select! {
|
|
|
|
send_result = send_task => {
|
|
|
|
send_result?;
|
|
|
|
Ok(None)
|
|
|
|
},
|
|
|
|
read_result = Self::read_strict(reader) => read_result.map(Some),
|
|
|
|
read_result = reader.read_strict() => read_result.map(Some),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Attempts to read a single frame from the underlying stream.
|
|
|
|
pub async fn read_once(&mut self) -> Result<ReadFrame, ChannelError> {
|
|
|
|
Self::read_strict(&mut self.reader).await
|
|
|
|
pub async fn read(&mut self) -> Result<ReadFrame, ChannelError> {
|
|
|
|
self.reader.read_strict().await
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Attempts to write a single frame to the underlying stream.
|
|
|
|
pub async fn write_once(&mut self, frame: &WriteFrame) -> io::Result<()> {
|
|
|
|
pub async fn write(&mut self, frame: &WriteFrame) -> io::Result<()> {
|
|
|
|
self.writer.write(frame).await
|
|
|
|
}
|
|
|
|
|
|
|
|
@ -141,7 +154,7 @@ where |
|
|
|
self.writer.shutdown().await?;
|
|
|
|
|
|
|
|
// Drain the receiving end of the connection.
|
|
|
|
while let Some(frame) = Self::read(&mut self.reader).await? {
|
|
|
|
while let Some(frame) = self.reader.read().await? {
|
|
|
|
yield frame;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|