From 38f6f0938ab2b288ec91f26b5d2399e2233a2499 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Tue, 26 Dec 2017 16:33:18 -0500 Subject: [PATCH] Add PeerTransport. --- src/proto/mod.rs | 2 +- src/proto/transport.rs | 100 ++++++++++++++++++++++++++++++++++------- 2 files changed, 86 insertions(+), 16 deletions(-) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 1ff8f94..d2b4054 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -11,4 +11,4 @@ pub use self::handler::*; pub use self::packet::*; pub use self::stream::*; pub use self::server::{ServerResponse, ServerRequest}; -pub use self::transport::ServerTransport; +pub use self::transport::{PeerTransport, ServerTransport}; diff --git a/src/proto/transport.rs b/src/proto/transport.rs index acef6d6..4dda433 100644 --- a/src/proto/transport.rs +++ b/src/proto/transport.rs @@ -5,39 +5,61 @@ use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::{Decoder, Encoder, length_delimited}; +use super::peer; use super::codec::DecodeError; use super::{ServerResponse, ServerRequest}; -pub struct ServerTransport { - framed: length_delimited::Framed, +/* ------- * + * Helpers * + * ------- */ + +fn new_framed(io: T) -> length_delimited::Framed { + length_delimited::Builder::new() + .length_field_length(4) + .little_endian() + .new_framed(io) } -impl ServerTransport { - fn new(io: T) -> ServerTransport { - ServerTransport { - framed: length_delimited::Builder::new() - .length_field_length(4) - .little_endian() - .new_framed(io), - } - } +fn decode_server_response(bytes: &mut BytesMut) -> Result { + unimplemented!(); +} + +fn encode_server_request(request: &ServerRequest) -> BytesMut { + unimplemented!(); } -fn decode(bytes: &mut BytesMut) -> ServerResponse { +fn decode_peer_message(bytes: &mut BytesMut) -> Result { unimplemented!(); } -fn encode(request: &ServerRequest) -> BytesMut { +fn encode_peer_message(message: &peer::Message) -> BytesMut { unimplemented!(); } +/* --------------- * + * ServerTransport * + * --------------- */ + +pub struct ServerTransport { + framed: length_delimited::Framed, +} + +impl ServerTransport { + fn new(io: T) -> ServerTransport { + ServerTransport { framed: new_framed(io) } + } +} + impl Stream for ServerTransport { type Item = ServerResponse; type Error = DecodeError; fn poll(&mut self) -> Poll, Self::Error> { match self.framed.poll() { - Ok(Async::Ready(Some(mut bytes))) => Ok(Async::Ready(Some(decode(&mut bytes)))), + Ok(Async::Ready(Some(mut bytes))) => { + let response = decode_server_response(&mut bytes)?; + Ok(Async::Ready(Some(response))) + } Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::NotReady) => Ok(Async::NotReady), Err(err) => Err(DecodeError::from(err)), @@ -50,7 +72,55 @@ impl Sink for ServerTransport { type SinkError = io::Error; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - match self.framed.start_send(encode(&item)) { + match self.framed.start_send(encode_server_request(&item)) { + Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), + Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(item)), + Err(err) => Err(err), + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.framed.poll_complete() + } +} + +/* ------------- * + * PeerTransport * + * ------------- */ + +pub struct PeerTransport { + framed: length_delimited::Framed, +} + +impl PeerTransport { + fn new(io: T) -> PeerTransport { + PeerTransport { framed: new_framed(io) } + } +} + +impl Stream for PeerTransport { + type Item = peer::Message; + type Error = DecodeError; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.framed.poll() { + Ok(Async::Ready(Some(mut bytes))) => { + let message = decode_peer_message(&mut bytes)?; + Ok(Async::Ready(Some(message))) + } + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(DecodeError::from(err)), + } + } +} + +impl Sink for PeerTransport { + type SinkItem = peer::Message; + type SinkError = io::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + match self.framed.start_send(encode_peer_message(&item)) { Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(item)), Err(err) => Err(err),