From c163a065c3b3d77346d097ff77b2fe205d954966 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 2 May 2016 15:47:06 +0200 Subject: [PATCH] Move PacketStream to proto::stream. --- src/proto/constants.rs | 6 ++ src/proto/mod.rs | 8 ++- src/proto/packet.rs | 136 +---------------------------------------- src/proto/stream.rs | 130 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 137 deletions(-) create mode 100644 src/proto/constants.rs create mode 100644 src/proto/stream.rs diff --git a/src/proto/constants.rs b/src/proto/constants.rs new file mode 100644 index 0000000..470f9d8 --- /dev/null +++ b/src/proto/constants.rs @@ -0,0 +1,6 @@ +pub const MAX_PACKET_SIZE: usize = 1 << 20; // 1 MiB +pub const U32_SIZE: usize = 4; +pub const MAX_MESSAGE_SIZE: usize = MAX_PACKET_SIZE - U32_SIZE; + +pub const MAX_PORT: u32 = (1 << 16) - 1; + diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ca8f8b2..7ed9dd0 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,17 +1,19 @@ +mod constants; mod handler; mod packet; pub mod server; +mod stream; pub use self::handler::*; pub use self::packet::*; -use self::server::{ServerRequest, ServerResponse}; +pub use self::stream::*; pub enum Request { - ServerRequest(ServerRequest), + ServerRequest(server::ServerRequest), } pub enum Response { - ServerResponse(ServerResponse), + ServerResponse(server::ServerResponse), } diff --git a/src/proto/packet.rs b/src/proto/packet.rs index 986a18a..ea77856 100644 --- a/src/proto/packet.rs +++ b/src/proto/packet.rs @@ -1,23 +1,14 @@ use std::error; use std::fmt; use std::io; -use std::mem; use std::net; -use std::iter; use std::io::{Read, Write}; use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt}; use encoding::{Encoding, DecoderTrap, EncoderTrap}; use encoding::all::ISO_8859_1; -use mio::{ - Evented, EventLoop, EventSet, Handler, PollOpt, Token, TryRead -}; -const MAX_PACKET_SIZE: usize = 1 << 20; // 1 MiB -const U32_SIZE: usize = 4; -const MAX_MESSAGE_SIZE: usize = MAX_PACKET_SIZE - U32_SIZE; - -const MAX_PORT: u32 = (1 << 16) - 1; +use super::constants::*; /*==================* * READ-ONLY PACKET * @@ -47,7 +38,7 @@ impl io::Read for Packet { impl Packet { /// Returns a readable packet struct from the wire representation of a /// packet. - fn from_bytes(bytes: Vec) -> Self { + pub fn from_bytes(bytes: Vec) -> Self { // Check that the packet is long enough to contain at least a code. assert!(bytes.len() >= 2*U32_SIZE); // Read the purported length of the packet. @@ -339,126 +330,3 @@ impl<'a> WriteToPacket for &'a String { packet.write_value::<&'a str>(self) } } - -/*===============* - * PACKET STREAM * - *===============*/ - -/// This enum defines the possible states a PacketStream state machine can be -/// in. -#[derive(Debug, Clone, Copy)] -enum State { - /// The PacketStream is waiting to read enough bytes to determine the - /// length of the following packet. - ReadingLength, - /// The PacketStream is waiting to read enough bytes to form the entire - /// packet. - ReadingPacket, -} - -/// This struct wraps around an mio byte stream and provides the ability to -/// read 32-bit-length-prefixed packets of bytes from it. -#[derive(Debug)] -pub struct PacketStream { - stream: T, - state: State, - num_bytes_left: usize, - buffer: Vec, -} - -impl PacketStream { - - /// Returns a new PacketStream wrapping the provided byte stream. - pub fn new(stream: T) -> Self { - PacketStream { - stream: stream, - state: State::ReadingLength, - num_bytes_left: U32_SIZE, - buffer: vec![0; U32_SIZE], - } - } - - /// Attemps to read a packet in a non-blocking fashion. - /// If enough bytes can be read from the underlying byte stream to form a - /// complete packet `p`, returns `Ok(Some(p))`. - /// If not enough bytes are available, returns `Ok(None)`. - /// If an I/O error `e` arises when trying to read the underlying stream, - /// returns `Err(e)`. - /// Note: as long as this function returns `Ok(Some(p))`, the caller is - /// responsible for calling it once more to ensure that all packets are - /// read as soon as possible. - pub fn try_read(&mut self) -> io::Result> { - // Try to read as many bytes as we currently need from the underlying - // byte stream. - let offset = self.buffer.len() - self.num_bytes_left; - match try!(self.stream.try_read(&mut self.buffer[offset..])) { - None => (), - - Some(num_bytes_read) => { - assert!(num_bytes_read <= self.num_bytes_left); - self.num_bytes_left -= num_bytes_read; - }, - } - - // If we haven't read enough bytes, return. - if self.num_bytes_left > 0 { - return Ok(None); - } - - // Otherwise, the behavior depends on what state we were in. - match self.state { - State::ReadingLength => { - // If we have finished reading the length prefix, then - // deserialize it, switch states and try to read the packet - // bytes. - let message_len = - LittleEndian::read_u32(&mut self.buffer) as usize; - if message_len > MAX_MESSAGE_SIZE { - unimplemented!(); - }; - self.state = State::ReadingPacket; - self.num_bytes_left = message_len; - self.buffer.extend(iter::repeat(0).take(message_len)); - self.try_read() - }, - - State::ReadingPacket => { - // If we have finished reading the packet, swap the full buffer - // out and return the packet made from the full buffer. - self.state = State::ReadingLength; - self.num_bytes_left = U32_SIZE; - let new_buffer = vec![0;U32_SIZE]; - let old_buffer = mem::replace(&mut self.buffer, new_buffer); - Ok(Some(Packet::from_bytes(old_buffer))) - } - } - } - - /// Register the packet stream with the given mio event loop. - pub fn register( - &self, event_loop: &mut EventLoop, token: Token, - event_set: EventSet, poll_opt: PollOpt) - -> io::Result<()> - { - event_loop.register(&self.stream, token, event_set, poll_opt) - } - - /// Re-register the packet stream with the given mio event loop. - pub fn reregister( - &self, event_loop: &mut EventLoop, token: Token, - event_set: EventSet, poll_opt: PollOpt) - -> io::Result<()> - { - event_loop.reregister(&self.stream, token, event_set, poll_opt) - } -} - -impl io::Write for PacketStream { - fn write(&mut self, bytes: &[u8]) -> io::Result { - self.stream.write(bytes) - } - - fn flush(&mut self) -> io::Result<()> { - self.stream.flush() - } -} diff --git a/src/proto/stream.rs b/src/proto/stream.rs new file mode 100644 index 0000000..609fdac --- /dev/null +++ b/src/proto/stream.rs @@ -0,0 +1,130 @@ +use std::io; +use std::iter; +use std::mem; + +use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; +use mio; +use mio::TryRead; + +use super::constants::*; +use super::packet::Packet; + +/// This enum defines the possible states a PacketStream state machine can be +/// in. +#[derive(Debug, Clone, Copy)] +enum State { + /// The PacketStream is waiting to read enough bytes to determine the + /// length of the following packet. + ReadingLength, + /// The PacketStream is waiting to read enough bytes to form the entire + /// packet. + ReadingPacket, +} + +/// This struct wraps around an mio byte stream and provides the ability to +/// read 32-bit-length-prefixed packets of bytes from it. +#[derive(Debug)] +pub struct PacketStream { + stream: T, + state: State, + num_bytes_left: usize, + buffer: Vec, +} + +impl PacketStream { + + /// Returns a new PacketStream wrapping the provided byte stream. + pub fn new(stream: T) -> Self { + PacketStream { + stream: stream, + state: State::ReadingLength, + num_bytes_left: U32_SIZE, + buffer: vec![0; U32_SIZE], + } + } + + /// Attemps to read a packet in a non-blocking fashion. + /// If enough bytes can be read from the underlying byte stream to form a + /// complete packet `p`, returns `Ok(Some(p))`. + /// If not enough bytes are available, returns `Ok(None)`. + /// If an I/O error `e` arises when trying to read the underlying stream, + /// returns `Err(e)`. + /// Note: as long as this function returns `Ok(Some(p))`, the caller is + /// responsible for calling it once more to ensure that all packets are + /// read as soon as possible. + pub fn try_read(&mut self) -> io::Result> { + // Try to read as many bytes as we currently need from the underlying + // byte stream. + let offset = self.buffer.len() - self.num_bytes_left; + match try!(self.stream.try_read(&mut self.buffer[offset..])) { + None => (), + + Some(num_bytes_read) => { + assert!(num_bytes_read <= self.num_bytes_left); + self.num_bytes_left -= num_bytes_read; + }, + } + + // If we haven't read enough bytes, return. + if self.num_bytes_left > 0 { + return Ok(None); + } + + // Otherwise, the behavior depends on what state we were in. + match self.state { + State::ReadingLength => { + // If we have finished reading the length prefix, then + // deserialize it, switch states and try to read the packet + // bytes. + let message_len = + LittleEndian::read_u32(&mut self.buffer) as usize; + if message_len > MAX_MESSAGE_SIZE { + unimplemented!(); + }; + self.state = State::ReadingPacket; + self.num_bytes_left = message_len; + self.buffer.extend(iter::repeat(0).take(message_len)); + self.try_read() + }, + + State::ReadingPacket => { + // If we have finished reading the packet, swap the full buffer + // out and return the packet made from the full buffer. + self.state = State::ReadingLength; + self.num_bytes_left = U32_SIZE; + let new_buffer = vec![0;U32_SIZE]; + let old_buffer = mem::replace(&mut self.buffer, new_buffer); + Ok(Some(Packet::from_bytes(old_buffer))) + } + } + } + + /// Register the packet stream with the given mio event loop. + pub fn register( + &self, event_loop: &mut mio::EventLoop, token: mio::Token, + event_set: mio::EventSet, poll_opt: mio::PollOpt) + -> io::Result<()> + { + event_loop.register(&self.stream, token, event_set, poll_opt) + } + + /// Re-register the packet stream with the given mio event loop. + pub fn reregister( + &self, event_loop: &mut mio::EventLoop, token: mio::Token, + event_set: mio::EventSet, poll_opt: mio::PollOpt) + -> io::Result<()> + { + event_loop.reregister(&self.stream, token, event_set, poll_opt) + } +} + +impl io::Write for PacketStream { + fn write(&mut self, bytes: &[u8]) -> io::Result { + self.stream.write(bytes) + } + + fn flush(&mut self) -> io::Result<()> { + self.stream.flush() + } +} +