From 1f94a399fc490a6a7b0f0c1de01802b17b894ba6 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Tue, 26 Dec 2017 14:13:57 -0500 Subject: [PATCH] Add ServerTransport. --- .gitignore | 3 ++ src/proto/codec.rs | 41 +++------------------------ src/proto/mod.rs | 3 ++ src/proto/transport.rs | 63 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 37 deletions(-) create mode 100644 src/proto/transport.rs diff --git a/.gitignore b/.gitignore index 6c2a0d5..ad20f86 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ *.rlib *.dll +# Rustfmt backups +*.bk + # Executables *.exe diff --git a/src/proto/codec.rs b/src/proto/codec.rs index bcb3025..42038a5 100644 --- a/src/proto/codec.rs +++ b/src/proto/codec.rs @@ -5,15 +5,8 @@ use std::net; use std::u16; use bytes::{Buf, BufMut, BytesMut, LittleEndian}; -use bytes::buf::IntoBuf; use encoding::{Encoding, EncoderTrap, DecoderTrap}; use encoding::all::WINDOWS_1252; -use tokio_core::io::EasyBuf; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::{Decoder, Encoder}; -use tokio_io::codec::length_delimited; - -use proto::server::ServerResponse; /// Length of an encoded 32-bit integer in bytes. const U32_BYTE_LEN: usize = 4; @@ -87,9 +80,9 @@ fn unexpected_eof_error(value_type: &str) -> DecodeError { DecodeError::from(io::Error::new(io::ErrorKind::UnexpectedEof, value_type)) } -/*=================* - * DECODE / ENCODE * - *=================*/ +/*===================================* + * BASIC TYPES ENCODING AND DECODING * + *===================================*/ // The protocol is pretty basic, though quirky. Base types are serialized in // the following way: @@ -105,7 +98,7 @@ fn unexpected_eof_error(value_type: &str) -> DecodeError { /// This trait is implemented by types that can be decoded from messages with /// a `ProtoDecoder`. -/// Only here to enable ProtoDecoder::decode_vec. +/// Only here to enable `ProtoDecoder::decode_vec`. pub trait ProtoDecode: Sized { /// Attempts to decode an instance of `Self` using the given decoder. fn decode(decoder: &mut ProtoDecoder) -> Result; @@ -333,32 +326,6 @@ impl ProtoEncode for Vec { } } -/*=================* - * DECODER/ENCODER * - *=================*/ - -fn new_length_prefixed_framed(inner: T) -> length_delimited::Framed -where - T: AsyncRead + AsyncWrite, - B: IntoBuf, -{ - length_delimited::Builder::new() - .length_field_length(4) - .little_endian() - .new_framed(inner) -} - -struct ServerResponseDecoder; - -impl Decoder for ServerResponseDecoder { - type Item = ServerResponse; - type Error = DecodeError; - - fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { - unimplemented!(); - } -} - /*=======* * TESTS * *=======*/ diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 5aecc40..1ff8f94 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -5,7 +5,10 @@ pub mod peer; pub mod server; mod stream; mod codec; +mod transport; pub use self::handler::*; pub use self::packet::*; pub use self::stream::*; +pub use self::server::{ServerResponse, ServerRequest}; +pub use self::transport::ServerTransport; diff --git a/src/proto/transport.rs b/src/proto/transport.rs new file mode 100644 index 0000000..acef6d6 --- /dev/null +++ b/src/proto/transport.rs @@ -0,0 +1,63 @@ +use std::io; + +use bytes::BytesMut; +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::{Decoder, Encoder, length_delimited}; + +use super::codec::DecodeError; +use super::{ServerResponse, ServerRequest}; + +pub struct ServerTransport { + framed: length_delimited::Framed, +} + +impl ServerTransport { + fn new(io: T) -> ServerTransport { + ServerTransport { + framed: length_delimited::Builder::new() + .length_field_length(4) + .little_endian() + .new_framed(io), + } + } +} + +fn decode(bytes: &mut BytesMut) -> ServerResponse { + unimplemented!(); +} + +fn encode(request: &ServerRequest) -> BytesMut { + unimplemented!(); +} + +impl Stream for ServerTransport { + type Item = ServerResponse; + type Error = DecodeError; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.framed.poll() { + Ok(Async::Ready(Some(mut bytes))) => Ok(Async::Ready(Some(decode(&mut bytes)))), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(DecodeError::from(err)), + } + } +} + +impl Sink for ServerTransport { + type SinkItem = ServerRequest; + type SinkError = io::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + match self.framed.start_send(encode(&item)) { + Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), + Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(item)), + Err(err) => Err(err), + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.framed.poll_complete() + } +}