Browse Source

Move SendPacket out of Parser and into Stream.

wip
Titouan Rigoudy 9 years ago
parent
commit
8ddaf1e4e3
1 changed files with 45 additions and 47 deletions
  1. +45
    -47
      src/proto/stream.rs

+ 45
- 47
src/proto/stream.rs View File

@ -15,15 +15,6 @@ use super::packet::{MutPacket, Packet, ReadFromPacket, WriteToPacket};
* PARSER *
*========*/
/// This trait is implemented by packet sinks to which a parser can forward
/// the packets it reads.
pub trait SendPacket {
type Value: ReadFromPacket;
type Error: error::Error;
fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>;
}
/// This enum defines the possible states of a packet parser state machine.
#[derive(Debug, Clone, Copy)]
enum State {
@ -36,20 +27,18 @@ enum State {
}
#[derive(Debug)]
struct Parser<T: SendPacket> {
struct Parser {
state: State,
num_bytes_left: usize,
buffer: Vec<u8>,
packet_tx: T,
}
impl<T: SendPacket> Parser<T> {
pub fn new(packet_tx: T) -> Self {
impl Parser {
pub fn new() -> Self {
Parser {
state: State::ReadingLength,
num_bytes_left: U32_SIZE,
buffer: vec![0; U32_SIZE],
packet_tx: packet_tx,
}
}
@ -62,7 +51,7 @@ impl<T: SendPacket> Parser<T> {
/// Note: as long as this function returns `Ok(Some(p))`, the caller is
/// responsible for calling it once more to ensure that all packets are
/// read as soon as possible.
fn try_read<U>(&mut self, stream: &mut U) -> io::Result<Option<Packet>>
pub fn try_read<U>(&mut self, stream: &mut U) -> io::Result<Option<Packet>>
where U: io::Read
{
// Try to read as many bytes as we currently need from the underlying
@ -72,7 +61,6 @@ impl<T: SendPacket> Parser<T> {
None => (),
Some(num_bytes_read) => {
assert!(num_bytes_read <= self.num_bytes_left);
self.num_bytes_left -= num_bytes_read;
},
}
@ -110,31 +98,6 @@ impl<T: SendPacket> Parser<T> {
}
}
}
pub fn read_from<U: io::Read>(&mut self, stream: &mut U) ->
Result<(), String>
{
loop {
let mut packet = match self.try_read(stream) {
Ok(Some(packet)) => packet,
Ok(None) => {
return Ok(())
},
Err(e) => {
return Err(format!("Error reading stream: {}", e))
}
};
let value = match packet.read_value() {
Ok(value) => value,
Err(e) => {
return Err(format!("Error parsing packet: {}", e))
}
};
if let Err(e) = self.packet_tx.send_packet(value) {
return Err(format!("Error sending parsed packet: {}", e))
}
}
}
}
/*========*
@ -183,6 +146,15 @@ impl OutBuf {
* STREAM *
*========*/
/// This trait is implemented by packet sinks to which a stream can forward
/// the packets it reads.
pub trait SendPacket {
type Value: ReadFromPacket;
type Error: error::Error;
fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>;
}
/// This enum defines the possible actions the stream wants to take after
/// processing an event.
#[derive(Debug, Clone, Copy)]
@ -201,9 +173,10 @@ pub struct Stream<T, U>
where T: io::Read + io::Write + mio::Evented,
U: SendPacket
{
stream: T,
parser: Parser<U>,
parser: Parser,
queue: VecDeque<OutBuf>,
sender: U,
stream: T,
}
impl<T, U> Stream<T, U>
@ -212,11 +185,12 @@ impl<T, U> Stream<T, U>
{
/// Returns a new struct wrapping the provided byte stream, which will
/// forward packets to the provided sink.
pub fn new(stream: T, packet_tx: U) -> Self {
pub fn new(stream: T, sender: U) -> Self {
Stream {
stream: stream,
parser: Parser::new(packet_tx),
parser: Parser::new(),
queue: VecDeque::new(),
sender: sender,
stream: stream,
}
}
@ -226,6 +200,30 @@ impl<T, U> Stream<T, U>
&self.stream
}
fn on_readable(&mut self) -> Result<(), String> {
loop {
let mut packet = match self.parser.try_read(&mut self.stream) {
Ok(Some(packet)) => packet,
Ok(None) => {
break
},
Err(e) => {
return Err(format!("Error reading stream: {}", e))
}
};
let value = match packet.read_value() {
Ok(value) => value,
Err(e) => {
return Err(format!("Error parsing packet: {}", e))
}
};
if let Err(e) = self.sender.send_packet(value) {
return Err(format!("Error sending parsed packet: {}", e))
}
}
Ok(())
}
fn on_writable(&mut self) -> io::Result<()> {
loop {
let mut outbuf = match self.queue.pop_front() {
@ -253,7 +251,7 @@ impl<T, U> Stream<T, U>
/// The stream is ready to read, write, or both.
pub fn on_ready(&mut self, event_set: mio::EventSet) -> Intent {
if event_set.is_readable() {
let result = self.parser.read_from(&mut self.stream);
let result = self.on_readable();
if let Err(e) = result {
error!("Stream input error: {}", e);
return Intent::Done


Loading…
Cancel
Save