From bcf05d5772219451493d5a3c75e1c41318196fd7 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Sat, 13 Jan 2018 20:05:07 -0500 Subject: [PATCH] Refactor transport module a little. --- src/proto/mod.rs | 1 - src/proto/transport.rs | 161 +++++++++++++---------------------------- 2 files changed, 50 insertions(+), 112 deletions(-) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 6fbc99b..0282c60 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -13,5 +13,4 @@ pub use self::handler::*; pub use self::packet::*; pub use self::stream::*; pub use self::server::{ServerResponse, ServerRequest}; -pub use self::transport::{PeerTransport, ServerTransport}; pub use self::user::{User, UserStatus}; diff --git a/src/proto/transport.rs b/src/proto/transport.rs index 1c1635f..05435d9 100644 --- a/src/proto/transport.rs +++ b/src/proto/transport.rs @@ -1,145 +1,84 @@ +use std::fmt; use std::io; use bytes::{Buf, BytesMut}; -use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; use proto::peer; use proto::{Decode, ProtoEncode, ProtoEncoder, ServerResponse, ServerRequest}; -/* ------- * - * Helpers * - * ------- */ - -fn new_framed(io: T) -> length_delimited::Framed { - length_delimited::Builder::new() - .length_field_length(4) - .little_endian() - .new_framed(io) -} - -fn decode_server_response(bytes: &mut BytesMut) -> io::Result { - unimplemented!(); -} - -fn encode_server_request(request: &ServerRequest) -> Result { - unimplemented!(); -} - -fn decode_peer_message(bytes: BytesMut) -> io::Result { +fn decode_frame<'a, T>(frame_type: &str, bytes: &'a mut BytesMut) -> io::Result +where + T: fmt::Debug, + io::Cursor<&'a mut BytesMut>: Decode, +{ let mut cursor = io::Cursor::new(bytes); - let message = cursor.decode()?; + let frame = cursor.decode()?; if cursor.has_remaining() { warn!( - "Received peer message with trailing bytes. Message:\n{:?}Bytes:{:?}", - message, + "Received {} with trailing bytes. Frame:\n{:?}Bytes:{:?}", + frame_type, + frame, cursor.bytes() ); } - Ok(message) + Ok(frame) } -fn encode_peer_message(message: &peer::Message) -> Result { +fn encode_frame(frame: &T) -> io::Result { let mut bytes = BytesMut::new(); - message.encode(&mut ProtoEncoder::new(&mut bytes))?; + frame.encode(&mut ProtoEncoder::new(&mut bytes))?; Ok(bytes) } -/* --------------- * - * ServerTransport * - * --------------- */ - -pub struct ServerTransport { - framed: length_delimited::Framed, -} - -impl ServerTransport { - pub fn new(io: T) -> ServerTransport { - ServerTransport { framed: new_framed(io) } - } +/// Wraps a raw byte async I/O object, providing it with the ability to read and +/// write entire frames at once. +/// The returned stream and sink of frames is intended to be combined (using +/// `Stream::and_then` and `Sink::with`) with the following decoding and +/// encoding functions to create a stream/sink of decoded messages. +pub fn new_framed(io: T) -> length_delimited::Framed { + length_delimited::Builder::new() + .length_field_length(4) + .little_endian() + .new_framed(io) } -impl Stream for ServerTransport { - type Item = ServerResponse; - type Error = io::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.framed.poll() { - Ok(Async::Ready(Some(mut bytes))) => { - let response = decode_server_response(&mut bytes)?; - Ok(Async::Ready(Some(response))) - } - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err), - } - } +/// Decodes a server response from the given byte buffer, which should contain +/// exactly the bytes encoding the returned response. +/// Intended to be used on the result of `new_framed` using `Stream::and_then`. +pub fn decode_server_response(bytes: &mut BytesMut) -> io::Result { + decode_frame("server response", bytes) } -impl Sink for ServerTransport { - type SinkItem = ServerRequest; - type SinkError = io::Error; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - let bytes = encode_server_request(&item)?; - match self.framed.start_send(bytes) { - Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), - Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(item)), - Err(err) => Err(err), - } - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.framed.poll_complete() - } +/// Encodes the given server response into a byte buffer, then returns it. +/// Intended to be used on a sink of BytesMut objects, using `Sink::with`. +pub fn encode_server_response(response: &ServerResponse) -> io::Result { + encode_frame(response) } -/* ------------- * - * PeerTransport * - * ------------- */ - -pub struct PeerTransport { - framed: length_delimited::Framed, +/// Decodes a server request from the given byte buffer, which should contain +/// exactly the bytes encoding the returned response. +/// Intended to be used on the result of `new_framed` using `Stream::and_then`. +pub fn decode_server_request(bytes: &mut BytesMut) -> io::Result { + decode_frame("server request", bytes) } -impl PeerTransport { - fn new(io: T) -> PeerTransport { - PeerTransport { framed: new_framed(io) } - } +/// Encodes the given server request into a byte buffer, then returns it. +/// Intended to be used on a sink of BytesMut objects, using `Sink::with`. +pub fn encode_server_request(request: &ServerRequest) -> io::Result { + encode_frame(request) } -impl Stream for PeerTransport { - type Item = peer::Message; - type Error = io::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.framed.poll() { - Ok(Async::Ready(Some(bytes))) => { - let message = decode_peer_message(bytes)?; - Ok(Async::Ready(Some(message))) - } - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err), - } - } +/// Decodes a peer message from the given byte buffer, which should contain +/// exactly the bytes encoding the returned response. +/// Intended to be used on the result of `new_framed` using `Stream::and_then`. +pub fn decode_peer_message(bytes: &mut BytesMut) -> io::Result { + decode_frame("peer message", bytes) } -impl Sink for PeerTransport { - type SinkItem = peer::Message; - type SinkError = io::Error; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - let bytes = encode_peer_message(&item)?; - match self.framed.start_send(bytes) { - Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), - Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(item)), - Err(err) => Err(err), - } - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.framed.poll_complete() - } +/// Encodes the given peer message into a byte buffer, then returns it. +/// Intended to be used on a sink of BytesMut objects, using `Sink::with`. +pub fn encode_peer_message(message: &peer::Message) -> io::Result { + encode_frame(message) }