From c82c6d4033d952fc82a33a45f0b6abe41e492a42 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Tue, 3 May 2016 13:08:25 +0200 Subject: [PATCH] Move Parser to proto::packet. --- src/proto/packet.rs | 100 ++++++++++++++++++++++++++++++++++++++++---- src/proto/stream.rs | 98 ++----------------------------------------- 2 files changed, 96 insertions(+), 102 deletions(-) diff --git a/src/proto/packet.rs b/src/proto/packet.rs index dda517a..f009ca3 100644 --- a/src/proto/packet.rs +++ b/src/proto/packet.rs @@ -1,12 +1,14 @@ use std::error; use std::fmt; use std::io; +use std::mem; use std::net; use std::io::{Read, Write}; use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt}; use encoding::{Encoding, DecoderTrap, EncoderTrap}; use encoding::all::ISO_8859_1; +use mio::TryRead; use super::constants::*; @@ -36,13 +38,8 @@ impl io::Read for Packet { impl Packet { /// Returns a readable packet struct from the wire representation of a /// packet. - pub fn from_bytes(bytes: Vec) -> Self { - // Check that the packet is long enough to contain at least a code. - assert!(bytes.len() >= U32_SIZE); - // Read the purported length of the packet. - let size = LittleEndian::read_u32(&bytes[0..U32_SIZE]) as usize; - // Check that the packet has the right length. - assert!(size + U32_SIZE == bytes.len()); + /// Assumes that the given vector is a valid length-prefixed packet. + fn from_wire(bytes: Vec) -> Self { Packet { cursor: U32_SIZE, bytes: bytes, @@ -320,3 +317,92 @@ impl WriteToPacket for String { (self as &str).write_to_packet(packet) } } + +/*========* + * PARSER * + *========*/ + +/// This enum defines the possible states of a packet parser state machine. +#[derive(Debug, Clone, Copy)] +enum State { + /// The parser is waiting to read enough bytes to determine the + /// length of the following packet. + ReadingLength, + /// The parser is waiting to read enough bytes to form the entire + /// packet. + ReadingPacket, +} + +#[derive(Debug)] +pub struct Parser { + state: State, + num_bytes_left: usize, + buffer: Vec, +} + +impl Parser { + pub fn new() -> Self { + Parser { + state: State::ReadingLength, + num_bytes_left: U32_SIZE, + buffer: vec![0; U32_SIZE], + } + } + + /// Attemps to read a packet in a non-blocking fashion. + /// If enough bytes can be read from the given byte stream to form a + /// complete packet `p`, returns `Ok(Some(p))`. + /// If not enough bytes are available, returns `Ok(None)`. + /// If an I/O error `e` arises when trying to read the underlying stream, + /// returns `Err(e)`. + /// Note: as long as this function returns `Ok(Some(p))`, the caller is + /// responsible for calling it once more to ensure that all packets are + /// read as soon as possible. + pub fn try_read(&mut self, stream: &mut U) -> io::Result> + where U: io::Read + { + // Try to read as many bytes as we currently need from the underlying + // byte stream. + let offset = self.buffer.len() - self.num_bytes_left; + match try!(stream.try_read(&mut self.buffer[offset..])) { + None => (), + + Some(num_bytes_read) => { + self.num_bytes_left -= num_bytes_read; + }, + } + + // If we haven't read enough bytes, return. + if self.num_bytes_left > 0 { + return Ok(None); + } + + // Otherwise, the behavior depends on what state we were in. + match self.state { + State::ReadingLength => { + // If we have finished reading the length prefix, then + // deserialize it, switch states and try to read the packet + // bytes. + let message_len = + LittleEndian::read_u32(&mut self.buffer) as usize; + if message_len > MAX_MESSAGE_SIZE { + unimplemented!(); + }; + self.state = State::ReadingPacket; + self.num_bytes_left = message_len; + self.buffer.resize(message_len + U32_SIZE, 0); + self.try_read(stream) + }, + + State::ReadingPacket => { + // If we have finished reading the packet, swap the full buffer + // out and return the packet made from the full buffer. + self.state = State::ReadingLength; + self.num_bytes_left = U32_SIZE; + let new_buffer = vec![0;U32_SIZE]; + let old_buffer = mem::replace(&mut self.buffer, new_buffer); + Ok(Some(Packet::from_wire(old_buffer))) + } + } + } +} diff --git a/src/proto/stream.rs b/src/proto/stream.rs index c93d4ec..c928736 100644 --- a/src/proto/stream.rs +++ b/src/proto/stream.rs @@ -1,104 +1,10 @@ use std::collections::VecDeque; use std::error; use std::io; -use std::iter; -use std::mem; -use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; use mio; -use mio::TryRead; -use super::constants::*; -use super::packet::{MutPacket, Packet, ReadFromPacket, WriteToPacket}; - -/*========* - * PARSER * - *========*/ - -/// This enum defines the possible states of a packet parser state machine. -#[derive(Debug, Clone, Copy)] -enum State { - /// The parser is waiting to read enough bytes to determine the - /// length of the following packet. - ReadingLength, - /// The parser is waiting to read enough bytes to form the entire - /// packet. - ReadingPacket, -} - -#[derive(Debug)] -struct Parser { - state: State, - num_bytes_left: usize, - buffer: Vec, -} - -impl Parser { - pub fn new() -> Self { - Parser { - state: State::ReadingLength, - num_bytes_left: U32_SIZE, - buffer: vec![0; U32_SIZE], - } - } - - /// Attemps to read a packet in a non-blocking fashion. - /// If enough bytes can be read from the given byte stream to form a - /// complete packet `p`, returns `Ok(Some(p))`. - /// If not enough bytes are available, returns `Ok(None)`. - /// If an I/O error `e` arises when trying to read the underlying stream, - /// returns `Err(e)`. - /// Note: as long as this function returns `Ok(Some(p))`, the caller is - /// responsible for calling it once more to ensure that all packets are - /// read as soon as possible. - pub fn try_read(&mut self, stream: &mut U) -> io::Result> - where U: io::Read - { - // Try to read as many bytes as we currently need from the underlying - // byte stream. - let offset = self.buffer.len() - self.num_bytes_left; - match try!(stream.try_read(&mut self.buffer[offset..])) { - None => (), - - Some(num_bytes_read) => { - self.num_bytes_left -= num_bytes_read; - }, - } - - // If we haven't read enough bytes, return. - if self.num_bytes_left > 0 { - return Ok(None); - } - - // Otherwise, the behavior depends on what state we were in. - match self.state { - State::ReadingLength => { - // If we have finished reading the length prefix, then - // deserialize it, switch states and try to read the packet - // bytes. - let message_len = - LittleEndian::read_u32(&mut self.buffer) as usize; - if message_len > MAX_MESSAGE_SIZE { - unimplemented!(); - }; - self.state = State::ReadingPacket; - self.num_bytes_left = message_len; - self.buffer.extend(iter::repeat(0).take(message_len)); - self.try_read(stream) - }, - - State::ReadingPacket => { - // If we have finished reading the packet, swap the full buffer - // out and return the packet made from the full buffer. - self.state = State::ReadingLength; - self.num_bytes_left = U32_SIZE; - let new_buffer = vec![0;U32_SIZE]; - let old_buffer = mem::replace(&mut self.buffer, new_buffer); - Ok(Some(Packet::from_bytes(old_buffer))) - } - } - } -} +use super::packet::{MutPacket, Parser, ReadFromPacket, WriteToPacket}; /*========* * OUTBUF * @@ -200,6 +106,7 @@ impl Stream &self.stream } + /// The stream is ready to be read from. fn on_readable(&mut self) -> Result<(), String> { loop { let mut packet = match self.parser.try_read(&mut self.stream) { @@ -224,6 +131,7 @@ impl Stream Ok(()) } + /// The stream is ready to be written to. fn on_writable(&mut self) -> io::Result<()> { loop { let mut outbuf = match self.queue.pop_front() {