From 88d9b46726dfc01ead2dcd2f47ce0066a0ca1515 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 2 May 2016 17:34:32 +0200 Subject: [PATCH] Split PacketStream into Stream and Parser. --- src/proto/handler.rs | 67 ++++++++------------ src/proto/stream.rs | 145 ++++++++++++++++++++++++++++++++----------- 2 files changed, 133 insertions(+), 79 deletions(-) diff --git a/src/proto/handler.rs b/src/proto/handler.rs index 0a586a9..39f3874 100644 --- a/src/proto/handler.rs +++ b/src/proto/handler.rs @@ -7,7 +7,7 @@ use mio; use config; -use super::{PacketStream, Request, Response}; +use super::{Intent, Stream, SendPacket, Request, Response}; use super::server::*; /// A struct used for writing bytes to a TryWrite sink. @@ -65,13 +65,24 @@ impl TokenCounter { } } +pub struct ServerResponseSender(mpsc::Sender); + +impl SendPacket for ServerResponseSender { + type Value = ServerResponse; + type Error = mpsc::SendError; + + fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> { + self.0.send(Response::ServerResponse(value)) + } +} + /// This struct handles all the soulseek connections, to the server and to /// peers. struct Handler { token_counter: TokenCounter, server_token: mio::Token, - server_stream: PacketStream, + server_stream: Stream, server_queue: VecDeque, client_tx: mpsc::Sender, @@ -81,8 +92,9 @@ impl Handler { fn new(client_tx: mpsc::Sender) -> io::Result { let host = config::SERVER_HOST; let port = config::SERVER_PORT; - let server_stream = PacketStream::new( - try!(Self::connect(host, port)) + let server_stream = Stream::new( + try!(Self::connect(host, port)), + ServerResponseSender(client_tx.clone()) ); info!("Connected to server at {}:{}", host, port); @@ -102,8 +114,8 @@ impl Handler { fn register(&self, event_loop: &mut mio::EventLoop) -> io::Result<()> { - self.server_stream.register( - event_loop, + event_loop.register( + self.server_stream.evented(), self.server_token, mio::EventSet::readable(), mio::PollOpt::edge() | mio::PollOpt::oneshot() @@ -122,37 +134,6 @@ impl Handler { )) } - fn read_server(&mut self) { - loop { - let mut packet = match self.server_stream.try_read() { - Ok(Some(packet)) => packet, - Ok(None) => break, - Err(err) => { - error!("Error reading server: {}", err); - break - } - }; - - debug!("Read packet with size {}", packet.bytes_remaining()); - - let response = match packet.read_value() { - Ok(resp) => { - debug!("Received server response: {:?}", resp); - Response::ServerResponse(resp) - }, - Err(err) => { - error!("Error parsing server packet: {}", err); - break - } - }; - - if let Err(err) = self.client_tx.send(response) { - error!("Error sending server response to client: {}", err); - break - } - } - } - fn write_server(&mut self) { loop { let mut outbuf = match self.server_queue.pop_front() { @@ -194,8 +175,8 @@ impl Handler { mio::EventSet::readable() }; - self.server_stream.reregister( - event_loop, + event_loop.reregister( + self.server_stream.evented(), self.server_token, event_set, mio::PollOpt::edge() | mio::PollOpt::oneshot() @@ -214,10 +195,12 @@ impl mio::Handler for Handler { if event_set.is_writable() { self.write_server(); } - if event_set.is_readable() { - self.read_server(); + match self.server_stream.on_readable() { + Intent::Done => { /* don't re-register server */ }, + Intent::Continue(_) => { + self.reregister_server(event_loop); + } } - self.reregister_server(event_loop); } else { unreachable!("Unknown token!"); } diff --git a/src/proto/stream.rs b/src/proto/stream.rs index 609fdac..905e125 100644 --- a/src/proto/stream.rs +++ b/src/proto/stream.rs @@ -1,3 +1,4 @@ +use std::error; use std::io; use std::iter; use std::mem; @@ -7,44 +8,59 @@ use mio; use mio::TryRead; use super::constants::*; -use super::packet::Packet; +use super::packet::{Packet, ReadFromPacket}; -/// This enum defines the possible states a PacketStream state machine can be -/// in. +/// This enum defines the possible actions the stream wants to take after +/// processing an event. +#[derive(Debug, Clone, Copy)] +pub enum Intent { + /// The stream is done, the event loop handler can drop it. + Done, + /// The stream wants to wait for the next event matching the given + /// `EventSet`. + Continue(mio::EventSet), +} + +/// This enum defines the possible states of a packet parser state machine. #[derive(Debug, Clone, Copy)] enum State { - /// The PacketStream is waiting to read enough bytes to determine the + /// The parser is waiting to read enough bytes to determine the /// length of the following packet. ReadingLength, - /// The PacketStream is waiting to read enough bytes to form the entire + /// The parser is waiting to read enough bytes to form the entire /// packet. ReadingPacket, } -/// This struct wraps around an mio byte stream and provides the ability to -/// read 32-bit-length-prefixed packets of bytes from it. +/// This trait is implemented by packet sinks to which a parser can forward +/// the packets it reads. +pub trait SendPacket { + type Value: ReadFromPacket; + type Error: error::Error; + + fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>; +} + #[derive(Debug)] -pub struct PacketStream { - stream: T, - state: State, +struct Parser { + state: State, num_bytes_left: usize, - buffer: Vec, + buffer: Vec, + packet_tx: T, } -impl PacketStream { - - /// Returns a new PacketStream wrapping the provided byte stream. - pub fn new(stream: T) -> Self { - PacketStream { - stream: stream, - state: State::ReadingLength, +impl Parser { + pub fn new(packet_tx: T) -> Self { + Parser { + state: State::ReadingLength, num_bytes_left: U32_SIZE, - buffer: vec![0; U32_SIZE], + buffer: vec![0; U32_SIZE], + packet_tx: packet_tx, } } /// Attemps to read a packet in a non-blocking fashion. - /// If enough bytes can be read from the underlying byte stream to form a + /// If enough bytes can be read from the given byte stream to form a /// complete packet `p`, returns `Ok(Some(p))`. /// If not enough bytes are available, returns `Ok(None)`. /// If an I/O error `e` arises when trying to read the underlying stream, @@ -52,11 +68,13 @@ impl PacketStream { /// Note: as long as this function returns `Ok(Some(p))`, the caller is /// responsible for calling it once more to ensure that all packets are /// read as soon as possible. - pub fn try_read(&mut self) -> io::Result> { + fn try_read(&mut self, stream: &mut U) -> io::Result> + where U: io::Read + { // Try to read as many bytes as we currently need from the underlying // byte stream. let offset = self.buffer.len() - self.num_bytes_left; - match try!(self.stream.try_read(&mut self.buffer[offset..])) { + match try!(stream.try_read(&mut self.buffer[offset..])) { None => (), Some(num_bytes_read) => { @@ -84,7 +102,7 @@ impl PacketStream { self.state = State::ReadingPacket; self.num_bytes_left = message_len; self.buffer.extend(iter::repeat(0).take(message_len)); - self.try_read() + self.try_read(stream) }, State::ReadingPacket => { @@ -99,26 +117,79 @@ impl PacketStream { } } - /// Register the packet stream with the given mio event loop. - pub fn register( - &self, event_loop: &mut mio::EventLoop, token: mio::Token, - event_set: mio::EventSet, poll_opt: mio::PollOpt) - -> io::Result<()> + pub fn read_from(&mut self, stream: &mut U) -> + Result<(), String> { - event_loop.register(&self.stream, token, event_set, poll_opt) + loop { + let mut packet = match self.try_read(stream) { + Ok(Some(packet)) => packet, + Ok(None) => { + return Ok(()) + }, + Err(e) => { + return Err(format!("Error reading stream: {}", e)) + } + }; + let value = match packet.read_value() { + Ok(value) => value, + Err(e) => { + return Err(format!("Error parsing packet: {}", e)) + } + }; + if let Err(e) = self.packet_tx.send_packet(value) { + return Err(format!("Error sending parsed packet: {}", e)) + } + } } - /// Re-register the packet stream with the given mio event loop. - pub fn reregister( - &self, event_loop: &mut mio::EventLoop, token: mio::Token, - event_set: mio::EventSet, poll_opt: mio::PollOpt) - -> io::Result<()> - { - event_loop.reregister(&self.stream, token, event_set, poll_opt) +} + +/// This struct wraps around an mio byte stream and reads soulseek packets +/// from it, forwarding them once parsed. +#[derive(Debug)] +pub struct Stream + where T: io::Read + io::Write + mio::Evented, + U: SendPacket +{ + stream: T, + parser: Parser +} + +impl Stream + where T: io::Read + io::Write + mio::Evented, + U: SendPacket +{ + /// Returns a new struct wrapping the provided byte stream, which will + /// forward packets to the provided sink. + pub fn new(stream: T, packet_tx: U) -> Self { + Stream { + stream: stream, + parser: Parser::new(packet_tx), + } + } + + /// Returns a reference to the underlying byte stream, to allow it to be + /// registered with an event loop. + pub fn evented(&self) -> &T { + &self.stream + } + + /// The stream is readable. + pub fn on_readable(&mut self) -> Intent { + match self.parser.read_from(&mut self.stream) { + Ok(()) => Intent::Continue(mio::EventSet::readable()), + Err(e) => { + error!("Stream input error: {}", e); + Intent::Done + } + } } } -impl io::Write for PacketStream { +impl io::Write for Stream + where T: io::Read + io::Write + mio::Evented, + U: SendPacket +{ fn write(&mut self, bytes: &[u8]) -> io::Result { self.stream.write(bytes) }