| @ -1,145 +1,84 @@ | |||
| use std::fmt; | |||
| use std::io; | |||
| use bytes::{Buf, BytesMut}; | |||
| use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; | |||
| use tokio_io::{AsyncRead, AsyncWrite}; | |||
| use tokio_io::codec::length_delimited; | |||
| use proto::peer; | |||
| use proto::{Decode, ProtoEncode, ProtoEncoder, ServerResponse, ServerRequest}; | |||
| /* ------- * | |||
| * Helpers * | |||
| * ------- */ | |||
| fn new_framed<T: AsyncRead + AsyncWrite>(io: T) -> length_delimited::Framed<T, BytesMut> { | |||
| length_delimited::Builder::new() | |||
| .length_field_length(4) | |||
| .little_endian() | |||
| .new_framed(io) | |||
| } | |||
| fn decode_server_response(bytes: &mut BytesMut) -> io::Result<ServerResponse> { | |||
| unimplemented!(); | |||
| } | |||
| fn encode_server_request(request: &ServerRequest) -> Result<BytesMut, io::Error> { | |||
| unimplemented!(); | |||
| } | |||
| fn decode_peer_message(bytes: BytesMut) -> io::Result<peer::Message> { | |||
| fn decode_frame<'a, T>(frame_type: &str, bytes: &'a mut BytesMut) -> io::Result<T> | |||
| where | |||
| T: fmt::Debug, | |||
| io::Cursor<&'a mut BytesMut>: Decode<T>, | |||
| { | |||
| let mut cursor = io::Cursor::new(bytes); | |||
| let message = cursor.decode()?; | |||
| let frame = cursor.decode()?; | |||
| if cursor.has_remaining() { | |||
| warn!( | |||
| "Received peer message with trailing bytes. Message:\n{:?}Bytes:{:?}", | |||
| message, | |||
| "Received {} with trailing bytes. Frame:\n{:?}Bytes:{:?}", | |||
| frame_type, | |||
| frame, | |||
| cursor.bytes() | |||
| ); | |||
| } | |||
| Ok(message) | |||
| Ok(frame) | |||
| } | |||
| fn encode_peer_message(message: &peer::Message) -> Result<BytesMut, io::Error> { | |||
| fn encode_frame<T: ProtoEncode>(frame: &T) -> io::Result<BytesMut> { | |||
| let mut bytes = BytesMut::new(); | |||
| message.encode(&mut ProtoEncoder::new(&mut bytes))?; | |||
| frame.encode(&mut ProtoEncoder::new(&mut bytes))?; | |||
| Ok(bytes) | |||
| } | |||
| /* --------------- * | |||
| * ServerTransport * | |||
| * --------------- */ | |||
| pub struct ServerTransport<T> { | |||
| framed: length_delimited::Framed<T, BytesMut>, | |||
| } | |||
| impl<T: AsyncRead + AsyncWrite> ServerTransport<T> { | |||
| pub fn new(io: T) -> ServerTransport<T> { | |||
| ServerTransport { framed: new_framed(io) } | |||
| } | |||
| /// Wraps a raw byte async I/O object, providing it with the ability to read and | |||
| /// write entire frames at once. | |||
| /// The returned stream and sink of frames is intended to be combined (using | |||
| /// `Stream::and_then` and `Sink::with`) with the following decoding and | |||
| /// encoding functions to create a stream/sink of decoded messages. | |||
| pub fn new_framed<T: AsyncRead + AsyncWrite>(io: T) -> length_delimited::Framed<T, BytesMut> { | |||
| length_delimited::Builder::new() | |||
| .length_field_length(4) | |||
| .little_endian() | |||
| .new_framed(io) | |||
| } | |||
| impl<T: AsyncRead> Stream for ServerTransport<T> { | |||
| type Item = ServerResponse; | |||
| type Error = io::Error; | |||
| fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |||
| match self.framed.poll() { | |||
| Ok(Async::Ready(Some(mut bytes))) => { | |||
| let response = decode_server_response(&mut bytes)?; | |||
| Ok(Async::Ready(Some(response))) | |||
| } | |||
| Ok(Async::Ready(None)) => Ok(Async::Ready(None)), | |||
| Ok(Async::NotReady) => Ok(Async::NotReady), | |||
| Err(err) => Err(err), | |||
| } | |||
| } | |||
| /// Decodes a server response from the given byte buffer, which should contain | |||
| /// exactly the bytes encoding the returned response. | |||
| /// Intended to be used on the result of `new_framed` using `Stream::and_then`. | |||
| pub fn decode_server_response(bytes: &mut BytesMut) -> io::Result<ServerResponse> { | |||
| decode_frame("server response", bytes) | |||
| } | |||
| impl<T: AsyncWrite> Sink for ServerTransport<T> { | |||
| type SinkItem = ServerRequest; | |||
| type SinkError = io::Error; | |||
| fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { | |||
| let bytes = encode_server_request(&item)?; | |||
| match self.framed.start_send(bytes) { | |||
| Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), | |||
| Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(item)), | |||
| Err(err) => Err(err), | |||
| } | |||
| } | |||
| fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { | |||
| self.framed.poll_complete() | |||
| } | |||
| /// Encodes the given server response into a byte buffer, then returns it. | |||
| /// Intended to be used on a sink of BytesMut objects, using `Sink::with`. | |||
| pub fn encode_server_response(response: &ServerResponse) -> io::Result<BytesMut> { | |||
| encode_frame(response) | |||
| } | |||
| /* ------------- * | |||
| * PeerTransport * | |||
| * ------------- */ | |||
| pub struct PeerTransport<T> { | |||
| framed: length_delimited::Framed<T, BytesMut>, | |||
| /// Decodes a server request from the given byte buffer, which should contain | |||
| /// exactly the bytes encoding the returned response. | |||
| /// Intended to be used on the result of `new_framed` using `Stream::and_then`. | |||
| pub fn decode_server_request(bytes: &mut BytesMut) -> io::Result<ServerRequest> { | |||
| decode_frame("server request", bytes) | |||
| } | |||
| impl<T: AsyncRead + AsyncWrite> PeerTransport<T> { | |||
| fn new(io: T) -> PeerTransport<T> { | |||
| PeerTransport { framed: new_framed(io) } | |||
| } | |||
| /// Encodes the given server request into a byte buffer, then returns it. | |||
| /// Intended to be used on a sink of BytesMut objects, using `Sink::with`. | |||
| pub fn encode_server_request(request: &ServerRequest) -> io::Result<BytesMut> { | |||
| encode_frame(request) | |||
| } | |||
| impl<T: AsyncRead> Stream for PeerTransport<T> { | |||
| type Item = peer::Message; | |||
| type Error = io::Error; | |||
| fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |||
| match self.framed.poll() { | |||
| Ok(Async::Ready(Some(bytes))) => { | |||
| let message = decode_peer_message(bytes)?; | |||
| Ok(Async::Ready(Some(message))) | |||
| } | |||
| Ok(Async::Ready(None)) => Ok(Async::Ready(None)), | |||
| Ok(Async::NotReady) => Ok(Async::NotReady), | |||
| Err(err) => Err(err), | |||
| } | |||
| } | |||
| /// Decodes a peer message from the given byte buffer, which should contain | |||
| /// exactly the bytes encoding the returned response. | |||
| /// Intended to be used on the result of `new_framed` using `Stream::and_then`. | |||
| pub fn decode_peer_message(bytes: &mut BytesMut) -> io::Result<peer::Message> { | |||
| decode_frame("peer message", bytes) | |||
| } | |||
| impl<T: AsyncWrite> Sink for PeerTransport<T> { | |||
| type SinkItem = peer::Message; | |||
| type SinkError = io::Error; | |||
| fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { | |||
| let bytes = encode_peer_message(&item)?; | |||
| match self.framed.start_send(bytes) { | |||
| Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), | |||
| Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(item)), | |||
| Err(err) => Err(err), | |||
| } | |||
| } | |||
| fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { | |||
| self.framed.poll_complete() | |||
| } | |||
| /// Encodes the given peer message into a byte buffer, then returns it. | |||
| /// Intended to be used on a sink of BytesMut objects, using `Sink::with`. | |||
| pub fn encode_peer_message(message: &peer::Message) -> io::Result<BytesMut> { | |||
| encode_frame(message) | |||
| } | |||