From 8ddaf1e4e333a2fe8e324f035e7036e8db82a3e1 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Tue, 3 May 2016 11:16:27 +0200 Subject: [PATCH] Move SendPacket out of Parser and into Stream. --- src/proto/stream.rs | 92 ++++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 47 deletions(-) diff --git a/src/proto/stream.rs b/src/proto/stream.rs index 8579b93..b83772a 100644 --- a/src/proto/stream.rs +++ b/src/proto/stream.rs @@ -15,15 +15,6 @@ use super::packet::{MutPacket, Packet, ReadFromPacket, WriteToPacket}; * PARSER * *========*/ -/// This trait is implemented by packet sinks to which a parser can forward -/// the packets it reads. -pub trait SendPacket { - type Value: ReadFromPacket; - type Error: error::Error; - - fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>; -} - /// This enum defines the possible states of a packet parser state machine. #[derive(Debug, Clone, Copy)] enum State { @@ -36,20 +27,18 @@ enum State { } #[derive(Debug)] -struct Parser { +struct Parser { state: State, num_bytes_left: usize, buffer: Vec, - packet_tx: T, } -impl Parser { - pub fn new(packet_tx: T) -> Self { +impl Parser { + pub fn new() -> Self { Parser { state: State::ReadingLength, num_bytes_left: U32_SIZE, buffer: vec![0; U32_SIZE], - packet_tx: packet_tx, } } @@ -62,7 +51,7 @@ impl Parser { /// Note: as long as this function returns `Ok(Some(p))`, the caller is /// responsible for calling it once more to ensure that all packets are /// read as soon as possible. - fn try_read(&mut self, stream: &mut U) -> io::Result> + pub fn try_read(&mut self, stream: &mut U) -> io::Result> where U: io::Read { // Try to read as many bytes as we currently need from the underlying @@ -72,7 +61,6 @@ impl Parser { None => (), Some(num_bytes_read) => { - assert!(num_bytes_read <= self.num_bytes_left); self.num_bytes_left -= num_bytes_read; }, } @@ -110,31 +98,6 @@ impl Parser { } } } - - pub fn read_from(&mut self, stream: &mut U) -> - Result<(), String> - { - loop { - let mut packet = match self.try_read(stream) { - Ok(Some(packet)) => packet, - Ok(None) => { - return Ok(()) - }, - Err(e) => { - return Err(format!("Error reading stream: {}", e)) - } - }; - let value = match packet.read_value() { - Ok(value) => value, - Err(e) => { - return Err(format!("Error parsing packet: {}", e)) - } - }; - if let Err(e) = self.packet_tx.send_packet(value) { - return Err(format!("Error sending parsed packet: {}", e)) - } - } - } } /*========* @@ -183,6 +146,15 @@ impl OutBuf { * STREAM * *========*/ +/// This trait is implemented by packet sinks to which a stream can forward +/// the packets it reads. +pub trait SendPacket { + type Value: ReadFromPacket; + type Error: error::Error; + + fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>; +} + /// This enum defines the possible actions the stream wants to take after /// processing an event. #[derive(Debug, Clone, Copy)] @@ -201,9 +173,10 @@ pub struct Stream where T: io::Read + io::Write + mio::Evented, U: SendPacket { - stream: T, - parser: Parser, + parser: Parser, queue: VecDeque, + sender: U, + stream: T, } impl Stream @@ -212,11 +185,12 @@ impl Stream { /// Returns a new struct wrapping the provided byte stream, which will /// forward packets to the provided sink. - pub fn new(stream: T, packet_tx: U) -> Self { + pub fn new(stream: T, sender: U) -> Self { Stream { - stream: stream, - parser: Parser::new(packet_tx), + parser: Parser::new(), queue: VecDeque::new(), + sender: sender, + stream: stream, } } @@ -226,6 +200,30 @@ impl Stream &self.stream } + fn on_readable(&mut self) -> Result<(), String> { + loop { + let mut packet = match self.parser.try_read(&mut self.stream) { + Ok(Some(packet)) => packet, + Ok(None) => { + break + }, + Err(e) => { + return Err(format!("Error reading stream: {}", e)) + } + }; + let value = match packet.read_value() { + Ok(value) => value, + Err(e) => { + return Err(format!("Error parsing packet: {}", e)) + } + }; + if let Err(e) = self.sender.send_packet(value) { + return Err(format!("Error sending parsed packet: {}", e)) + } + } + Ok(()) + } + fn on_writable(&mut self) -> io::Result<()> { loop { let mut outbuf = match self.queue.pop_front() { @@ -253,7 +251,7 @@ impl Stream /// The stream is ready to read, write, or both. pub fn on_ready(&mut self, event_set: mio::EventSet) -> Intent { if event_set.is_readable() { - let result = self.parser.read_from(&mut self.stream); + let result = self.on_readable(); if let Err(e) = result { error!("Stream input error: {}", e); return Intent::Done