Browse Source

Move PacketStream to proto::stream.

wip
Titouan Rigoudy 9 years ago
parent
commit
c163a065c3
4 changed files with 143 additions and 137 deletions
  1. +6
    -0
      src/proto/constants.rs
  2. +5
    -3
      src/proto/mod.rs
  3. +2
    -134
      src/proto/packet.rs
  4. +130
    -0
      src/proto/stream.rs

+ 6
- 0
src/proto/constants.rs View File

@ -0,0 +1,6 @@
pub const MAX_PACKET_SIZE: usize = 1 << 20; // 1 MiB
pub const U32_SIZE: usize = 4;
pub const MAX_MESSAGE_SIZE: usize = MAX_PACKET_SIZE - U32_SIZE;
pub const MAX_PORT: u32 = (1 << 16) - 1;

+ 5
- 3
src/proto/mod.rs View File

@ -1,17 +1,19 @@
mod constants;
mod handler;
mod packet;
pub mod server;
mod stream;
pub use self::handler::*;
pub use self::packet::*;
use self::server::{ServerRequest, ServerResponse};
pub use self::stream::*;
pub enum Request {
ServerRequest(ServerRequest),
ServerRequest(server::ServerRequest),
}
pub enum Response {
ServerResponse(ServerResponse),
ServerResponse(server::ServerResponse),
}

+ 2
- 134
src/proto/packet.rs View File

@ -1,23 +1,14 @@
use std::error;
use std::fmt;
use std::io;
use std::mem;
use std::net;
use std::iter;
use std::io::{Read, Write};
use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
use encoding::{Encoding, DecoderTrap, EncoderTrap};
use encoding::all::ISO_8859_1;
use mio::{
Evented, EventLoop, EventSet, Handler, PollOpt, Token, TryRead
};
const MAX_PACKET_SIZE: usize = 1 << 20; // 1 MiB
const U32_SIZE: usize = 4;
const MAX_MESSAGE_SIZE: usize = MAX_PACKET_SIZE - U32_SIZE;
const MAX_PORT: u32 = (1 << 16) - 1;
use super::constants::*;
/*==================*
* READ-ONLY PACKET *
@ -47,7 +38,7 @@ impl io::Read for Packet {
impl Packet {
/// Returns a readable packet struct from the wire representation of a
/// packet.
fn from_bytes(bytes: Vec<u8>) -> Self {
pub fn from_bytes(bytes: Vec<u8>) -> Self {
// Check that the packet is long enough to contain at least a code.
assert!(bytes.len() >= 2*U32_SIZE);
// Read the purported length of the packet.
@ -339,126 +330,3 @@ impl<'a> WriteToPacket for &'a String {
packet.write_value::<&'a str>(self)
}
}
/*===============*
* PACKET STREAM *
*===============*/
/// This enum defines the possible states a PacketStream state machine can be
/// in.
#[derive(Debug, Clone, Copy)]
enum State {
/// The PacketStream is waiting to read enough bytes to determine the
/// length of the following packet.
ReadingLength,
/// The PacketStream is waiting to read enough bytes to form the entire
/// packet.
ReadingPacket,
}
/// This struct wraps around an mio byte stream and provides the ability to
/// read 32-bit-length-prefixed packets of bytes from it.
#[derive(Debug)]
pub struct PacketStream<T: Read + Write + Evented> {
stream: T,
state: State,
num_bytes_left: usize,
buffer: Vec<u8>,
}
impl<T: Read + Write + Evented> PacketStream<T> {
/// Returns a new PacketStream wrapping the provided byte stream.
pub fn new(stream: T) -> Self {
PacketStream {
stream: stream,
state: State::ReadingLength,
num_bytes_left: U32_SIZE,
buffer: vec![0; U32_SIZE],
}
}
/// Attemps to read a packet in a non-blocking fashion.
/// If enough bytes can be read from the underlying byte stream to form a
/// complete packet `p`, returns `Ok(Some(p))`.
/// If not enough bytes are available, returns `Ok(None)`.
/// If an I/O error `e` arises when trying to read the underlying stream,
/// returns `Err(e)`.
/// Note: as long as this function returns `Ok(Some(p))`, the caller is
/// responsible for calling it once more to ensure that all packets are
/// read as soon as possible.
pub fn try_read(&mut self) -> io::Result<Option<Packet>> {
// Try to read as many bytes as we currently need from the underlying
// byte stream.
let offset = self.buffer.len() - self.num_bytes_left;
match try!(self.stream.try_read(&mut self.buffer[offset..])) {
None => (),
Some(num_bytes_read) => {
assert!(num_bytes_read <= self.num_bytes_left);
self.num_bytes_left -= num_bytes_read;
},
}
// If we haven't read enough bytes, return.
if self.num_bytes_left > 0 {
return Ok(None);
}
// Otherwise, the behavior depends on what state we were in.
match self.state {
State::ReadingLength => {
// If we have finished reading the length prefix, then
// deserialize it, switch states and try to read the packet
// bytes.
let message_len =
LittleEndian::read_u32(&mut self.buffer) as usize;
if message_len > MAX_MESSAGE_SIZE {
unimplemented!();
};
self.state = State::ReadingPacket;
self.num_bytes_left = message_len;
self.buffer.extend(iter::repeat(0).take(message_len));
self.try_read()
},
State::ReadingPacket => {
// If we have finished reading the packet, swap the full buffer
// out and return the packet made from the full buffer.
self.state = State::ReadingLength;
self.num_bytes_left = U32_SIZE;
let new_buffer = vec![0;U32_SIZE];
let old_buffer = mem::replace(&mut self.buffer, new_buffer);
Ok(Some(Packet::from_bytes(old_buffer)))
}
}
}
/// Register the packet stream with the given mio event loop.
pub fn register<U: Handler>(
&self, event_loop: &mut EventLoop<U>, token: Token,
event_set: EventSet, poll_opt: PollOpt)
-> io::Result<()>
{
event_loop.register(&self.stream, token, event_set, poll_opt)
}
/// Re-register the packet stream with the given mio event loop.
pub fn reregister<U: Handler>(
&self, event_loop: &mut EventLoop<U>, token: Token,
event_set: EventSet, poll_opt: PollOpt)
-> io::Result<()>
{
event_loop.reregister(&self.stream, token, event_set, poll_opt)
}
}
impl<T: Read + Write + Evented> io::Write for PacketStream<T> {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.stream.write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.stream.flush()
}
}

+ 130
- 0
src/proto/stream.rs View File

@ -0,0 +1,130 @@
use std::io;
use std::iter;
use std::mem;
use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
use mio;
use mio::TryRead;
use super::constants::*;
use super::packet::Packet;
/// This enum defines the possible states a PacketStream state machine can be
/// in.
#[derive(Debug, Clone, Copy)]
enum State {
/// The PacketStream is waiting to read enough bytes to determine the
/// length of the following packet.
ReadingLength,
/// The PacketStream is waiting to read enough bytes to form the entire
/// packet.
ReadingPacket,
}
/// This struct wraps around an mio byte stream and provides the ability to
/// read 32-bit-length-prefixed packets of bytes from it.
#[derive(Debug)]
pub struct PacketStream<T: io::Read + io::Write + mio::Evented> {
stream: T,
state: State,
num_bytes_left: usize,
buffer: Vec<u8>,
}
impl<T: io::Read + io::Write + mio::Evented> PacketStream<T> {
/// Returns a new PacketStream wrapping the provided byte stream.
pub fn new(stream: T) -> Self {
PacketStream {
stream: stream,
state: State::ReadingLength,
num_bytes_left: U32_SIZE,
buffer: vec![0; U32_SIZE],
}
}
/// Attemps to read a packet in a non-blocking fashion.
/// If enough bytes can be read from the underlying byte stream to form a
/// complete packet `p`, returns `Ok(Some(p))`.
/// If not enough bytes are available, returns `Ok(None)`.
/// If an I/O error `e` arises when trying to read the underlying stream,
/// returns `Err(e)`.
/// Note: as long as this function returns `Ok(Some(p))`, the caller is
/// responsible for calling it once more to ensure that all packets are
/// read as soon as possible.
pub fn try_read(&mut self) -> io::Result<Option<Packet>> {
// Try to read as many bytes as we currently need from the underlying
// byte stream.
let offset = self.buffer.len() - self.num_bytes_left;
match try!(self.stream.try_read(&mut self.buffer[offset..])) {
None => (),
Some(num_bytes_read) => {
assert!(num_bytes_read <= self.num_bytes_left);
self.num_bytes_left -= num_bytes_read;
},
}
// If we haven't read enough bytes, return.
if self.num_bytes_left > 0 {
return Ok(None);
}
// Otherwise, the behavior depends on what state we were in.
match self.state {
State::ReadingLength => {
// If we have finished reading the length prefix, then
// deserialize it, switch states and try to read the packet
// bytes.
let message_len =
LittleEndian::read_u32(&mut self.buffer) as usize;
if message_len > MAX_MESSAGE_SIZE {
unimplemented!();
};
self.state = State::ReadingPacket;
self.num_bytes_left = message_len;
self.buffer.extend(iter::repeat(0).take(message_len));
self.try_read()
},
State::ReadingPacket => {
// If we have finished reading the packet, swap the full buffer
// out and return the packet made from the full buffer.
self.state = State::ReadingLength;
self.num_bytes_left = U32_SIZE;
let new_buffer = vec![0;U32_SIZE];
let old_buffer = mem::replace(&mut self.buffer, new_buffer);
Ok(Some(Packet::from_bytes(old_buffer)))
}
}
}
/// Register the packet stream with the given mio event loop.
pub fn register<U: mio::Handler>(
&self, event_loop: &mut mio::EventLoop<U>, token: mio::Token,
event_set: mio::EventSet, poll_opt: mio::PollOpt)
-> io::Result<()>
{
event_loop.register(&self.stream, token, event_set, poll_opt)
}
/// Re-register the packet stream with the given mio event loop.
pub fn reregister<U: mio::Handler>(
&self, event_loop: &mut mio::EventLoop<U>, token: mio::Token,
event_set: mio::EventSet, poll_opt: mio::PollOpt)
-> io::Result<()>
{
event_loop.reregister(&self.stream, token, event_set, poll_opt)
}
}
impl<T: io::Read + io::Write + mio::Evented> io::Write for PacketStream<T> {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.stream.write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.stream.flush()
}
}

Loading…
Cancel
Save