Browse Source

Move Parser to proto::packet.

wip
Titouan Rigoudy 9 years ago
parent
commit
c82c6d4033
2 changed files with 96 additions and 102 deletions
  1. +93
    -7
      src/proto/packet.rs
  2. +3
    -95
      src/proto/stream.rs

+ 93
- 7
src/proto/packet.rs View File

@ -1,12 +1,14 @@
use std::error;
use std::fmt;
use std::io;
use std::mem;
use std::net;
use std::io::{Read, Write};
use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
use encoding::{Encoding, DecoderTrap, EncoderTrap};
use encoding::all::ISO_8859_1;
use mio::TryRead;
use super::constants::*;
@ -36,13 +38,8 @@ impl io::Read for Packet {
impl Packet {
/// Returns a readable packet struct from the wire representation of a
/// packet.
pub fn from_bytes(bytes: Vec<u8>) -> Self {
// Check that the packet is long enough to contain at least a code.
assert!(bytes.len() >= U32_SIZE);
// Read the purported length of the packet.
let size = LittleEndian::read_u32(&bytes[0..U32_SIZE]) as usize;
// Check that the packet has the right length.
assert!(size + U32_SIZE == bytes.len());
/// Assumes that the given vector is a valid length-prefixed packet.
fn from_wire(bytes: Vec<u8>) -> Self {
Packet {
cursor: U32_SIZE,
bytes: bytes,
@ -320,3 +317,92 @@ impl WriteToPacket for String {
(self as &str).write_to_packet(packet)
}
}
/*========*
* PARSER *
*========*/
/// This enum defines the possible states of a packet parser state machine.
#[derive(Debug, Clone, Copy)]
enum State {
/// The parser is waiting to read enough bytes to determine the
/// length of the following packet.
ReadingLength,
/// The parser is waiting to read enough bytes to form the entire
/// packet.
ReadingPacket,
}
#[derive(Debug)]
pub struct Parser {
state: State,
num_bytes_left: usize,
buffer: Vec<u8>,
}
impl Parser {
pub fn new() -> Self {
Parser {
state: State::ReadingLength,
num_bytes_left: U32_SIZE,
buffer: vec![0; U32_SIZE],
}
}
/// Attemps to read a packet in a non-blocking fashion.
/// If enough bytes can be read from the given byte stream to form a
/// complete packet `p`, returns `Ok(Some(p))`.
/// If not enough bytes are available, returns `Ok(None)`.
/// If an I/O error `e` arises when trying to read the underlying stream,
/// returns `Err(e)`.
/// Note: as long as this function returns `Ok(Some(p))`, the caller is
/// responsible for calling it once more to ensure that all packets are
/// read as soon as possible.
pub fn try_read<U>(&mut self, stream: &mut U) -> io::Result<Option<Packet>>
where U: io::Read
{
// Try to read as many bytes as we currently need from the underlying
// byte stream.
let offset = self.buffer.len() - self.num_bytes_left;
match try!(stream.try_read(&mut self.buffer[offset..])) {
None => (),
Some(num_bytes_read) => {
self.num_bytes_left -= num_bytes_read;
},
}
// If we haven't read enough bytes, return.
if self.num_bytes_left > 0 {
return Ok(None);
}
// Otherwise, the behavior depends on what state we were in.
match self.state {
State::ReadingLength => {
// If we have finished reading the length prefix, then
// deserialize it, switch states and try to read the packet
// bytes.
let message_len =
LittleEndian::read_u32(&mut self.buffer) as usize;
if message_len > MAX_MESSAGE_SIZE {
unimplemented!();
};
self.state = State::ReadingPacket;
self.num_bytes_left = message_len;
self.buffer.resize(message_len + U32_SIZE, 0);
self.try_read(stream)
},
State::ReadingPacket => {
// If we have finished reading the packet, swap the full buffer
// out and return the packet made from the full buffer.
self.state = State::ReadingLength;
self.num_bytes_left = U32_SIZE;
let new_buffer = vec![0;U32_SIZE];
let old_buffer = mem::replace(&mut self.buffer, new_buffer);
Ok(Some(Packet::from_wire(old_buffer)))
}
}
}
}

+ 3
- 95
src/proto/stream.rs View File

@ -1,104 +1,10 @@
use std::collections::VecDeque;
use std::error;
use std::io;
use std::iter;
use std::mem;
use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
use mio;
use mio::TryRead;
use super::constants::*;
use super::packet::{MutPacket, Packet, ReadFromPacket, WriteToPacket};
/*========*
* PARSER *
*========*/
/// This enum defines the possible states of a packet parser state machine.
#[derive(Debug, Clone, Copy)]
enum State {
/// The parser is waiting to read enough bytes to determine the
/// length of the following packet.
ReadingLength,
/// The parser is waiting to read enough bytes to form the entire
/// packet.
ReadingPacket,
}
#[derive(Debug)]
struct Parser {
state: State,
num_bytes_left: usize,
buffer: Vec<u8>,
}
impl Parser {
pub fn new() -> Self {
Parser {
state: State::ReadingLength,
num_bytes_left: U32_SIZE,
buffer: vec![0; U32_SIZE],
}
}
/// Attemps to read a packet in a non-blocking fashion.
/// If enough bytes can be read from the given byte stream to form a
/// complete packet `p`, returns `Ok(Some(p))`.
/// If not enough bytes are available, returns `Ok(None)`.
/// If an I/O error `e` arises when trying to read the underlying stream,
/// returns `Err(e)`.
/// Note: as long as this function returns `Ok(Some(p))`, the caller is
/// responsible for calling it once more to ensure that all packets are
/// read as soon as possible.
pub fn try_read<U>(&mut self, stream: &mut U) -> io::Result<Option<Packet>>
where U: io::Read
{
// Try to read as many bytes as we currently need from the underlying
// byte stream.
let offset = self.buffer.len() - self.num_bytes_left;
match try!(stream.try_read(&mut self.buffer[offset..])) {
None => (),
Some(num_bytes_read) => {
self.num_bytes_left -= num_bytes_read;
},
}
// If we haven't read enough bytes, return.
if self.num_bytes_left > 0 {
return Ok(None);
}
// Otherwise, the behavior depends on what state we were in.
match self.state {
State::ReadingLength => {
// If we have finished reading the length prefix, then
// deserialize it, switch states and try to read the packet
// bytes.
let message_len =
LittleEndian::read_u32(&mut self.buffer) as usize;
if message_len > MAX_MESSAGE_SIZE {
unimplemented!();
};
self.state = State::ReadingPacket;
self.num_bytes_left = message_len;
self.buffer.extend(iter::repeat(0).take(message_len));
self.try_read(stream)
},
State::ReadingPacket => {
// If we have finished reading the packet, swap the full buffer
// out and return the packet made from the full buffer.
self.state = State::ReadingLength;
self.num_bytes_left = U32_SIZE;
let new_buffer = vec![0;U32_SIZE];
let old_buffer = mem::replace(&mut self.buffer, new_buffer);
Ok(Some(Packet::from_bytes(old_buffer)))
}
}
}
}
use super::packet::{MutPacket, Parser, ReadFromPacket, WriteToPacket};
/*========*
* OUTBUF *
@ -200,6 +106,7 @@ impl<T, U> Stream<T, U>
&self.stream
}
/// The stream is ready to be read from.
fn on_readable(&mut self) -> Result<(), String> {
loop {
let mut packet = match self.parser.try_read(&mut self.stream) {
@ -224,6 +131,7 @@ impl<T, U> Stream<T, U>
Ok(())
}
/// The stream is ready to be written to.
fn on_writable(&mut self) -> io::Result<()> {
loop {
let mut outbuf = match self.queue.pop_front() {


Loading…
Cancel
Save