|
|
|
@ -28,7 +28,7 @@ async fn forward_incoming<ReadFrame: ValueDecode + Debug>( |
|
|
|
incoming_tx: mpsc::Sender<ReadFrame>,
|
|
|
|
) -> Result<(), WorkerError> {
|
|
|
|
while let Some(frame) = reader.read().await.map_err(WorkerError::ReadError)? {
|
|
|
|
debug!("Channel: received frame: {:?}", frame);
|
|
|
|
debug!("Received frame: {:?}", frame);
|
|
|
|
|
|
|
|
if let Err(_) = incoming_tx.send(frame).await {
|
|
|
|
return Err(WorkerError::IncomingChannelClosed);
|
|
|
|
@ -44,7 +44,7 @@ async fn forward_outgoing<WriteFrame: ValueEncode + Debug>( |
|
|
|
mut writer: FrameWriter<WriteFrame, OwnedWriteHalf>,
|
|
|
|
) -> Result<(), WorkerError> {
|
|
|
|
while let Some(frame) = outgoing_rx.recv().await {
|
|
|
|
debug!("Channel: sending frame: {:?}", frame);
|
|
|
|
debug!("Sending frame: {:?}", frame);
|
|
|
|
writer
|
|
|
|
.write(&frame)
|
|
|
|
.await
|
|
|
|
@ -86,12 +86,8 @@ where |
|
|
|
|
|
|
|
async fn run(self) -> Result<(), WorkerError> {
|
|
|
|
tokio::select! {
|
|
|
|
result = forward_incoming(self.reader, self.incoming_tx) => {
|
|
|
|
debug!("{:?}", result);
|
|
|
|
},
|
|
|
|
result = forward_outgoing(self.outgoing_rx, self.writer) => {
|
|
|
|
debug!("{:?}", result);
|
|
|
|
},
|
|
|
|
result = forward_incoming(self.reader, self.incoming_tx) => result?,
|
|
|
|
result = forward_outgoing(self.outgoing_rx, self.writer) => result?,
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
@ -113,6 +109,8 @@ mod tests { |
|
|
|
let _ = env_logger::builder().is_test(true).try_init();
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: test for all 3 error conditions.
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn forwards_incoming_frames() {
|
|
|
|
init();
|
|
|
|
|