diff --git a/src/proto/handler.rs b/src/proto/handler.rs index 39f3874..02501ac 100644 --- a/src/proto/handler.rs +++ b/src/proto/handler.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::io; use std::net::ToSocketAddrs; use std::sync::mpsc; @@ -10,42 +9,9 @@ use config; use super::{Intent, Stream, SendPacket, Request, Response}; use super::server::*; -/// A struct used for writing bytes to a TryWrite sink. -struct OutBuf { - cursor: usize, - bytes: Vec -} - -impl From> for OutBuf { - fn from(bytes: Vec) -> Self { - OutBuf { - cursor: 0, - bytes: bytes - } - } -} - -impl OutBuf { - #[inline] - fn remaining(&self) -> usize { - self.bytes.len() - self.cursor - } - - #[inline] - fn has_remaining(&self) -> bool { - self.remaining() > 0 - } - - fn try_write_to(&mut self, mut writer: T) -> io::Result> - where T: mio::TryWrite - { - let result = writer.try_write(&self.bytes[self.cursor..]); - if let Ok(Some(bytes_written)) = result { - self.cursor += bytes_written; - } - result - } -} +/*===============* + * TOKEN COUNTER * + *===============*/ /// This struct provides a simple way to generate different tokens. struct TokenCounter { @@ -65,6 +31,10 @@ impl TokenCounter { } } +/*========================* + * SERVER RESPONSE SENDER * + *========================*/ + pub struct ServerResponseSender(mpsc::Sender); impl SendPacket for ServerResponseSender { @@ -76,6 +46,10 @@ impl SendPacket for ServerResponseSender { } } +/*=========* + * HANDLER * + *=========*/ + /// This struct handles all the soulseek connections, to the server and to /// peers. struct Handler { @@ -83,7 +57,6 @@ struct Handler { server_token: mio::Token, server_stream: Stream, - server_queue: VecDeque, client_tx: mpsc::Sender, } @@ -106,7 +79,6 @@ impl Handler { server_token: server_token, server_stream: server_stream, - server_queue: VecDeque::new(), client_tx: client_tx, }) @@ -134,54 +106,24 @@ impl Handler { )) } - fn write_server(&mut self) { - loop { - let mut outbuf = match self.server_queue.pop_front() { - Some(outbuf) => outbuf, - None => break - }; - - match outbuf.try_write_to(&mut self.server_stream) { - Ok(Some(_)) => { - if outbuf.has_remaining() { - self.server_queue.push_front(outbuf) - } - // Continue looping - }, - Ok(None) => { - self.server_queue.push_front(outbuf); - break - }, - Err(e) => { - error!("Error writing server stream: {}", e); - break - } + fn process_server_intent( + &mut self, intent: Intent, event_loop: &mut mio::EventLoop) + { + match intent { + Intent::Done => { + error!("Server connection closed"); + // TODO notify client and shut down + }, + Intent::Continue(event_set) => { + event_loop.reregister( + self.server_stream.evented(), + self.server_token, + event_set, + mio::PollOpt::edge() | mio::PollOpt::oneshot() + ).unwrap(); } } } - - fn notify_server(&mut self, request: ServerRequest) -> io::Result<()> { - debug!("Sending server request: {:?}", request); - let packet = try!(request.to_packet()); - self.server_queue.push_back(OutBuf::from(packet.into_bytes())); - Ok(()) - } - - /// Re-register the server socket with the event loop. - fn reregister_server(&mut self, event_loop: &mut mio::EventLoop) { - let event_set = if self.server_queue.len() > 0 { - mio::EventSet::readable() | mio::EventSet::writable() - } else { - mio::EventSet::readable() - }; - - event_loop.reregister( - self.server_stream.evented(), - self.server_token, - event_set, - mio::PollOpt::edge() | mio::PollOpt::oneshot() - ).unwrap(); - } } impl mio::Handler for Handler { @@ -192,15 +134,8 @@ impl mio::Handler for Handler { token: mio::Token, event_set: mio::EventSet) { if token == self.server_token { - if event_set.is_writable() { - self.write_server(); - } - match self.server_stream.on_readable() { - Intent::Done => { /* don't re-register server */ }, - Intent::Continue(_) => { - self.reregister_server(event_loop); - } - } + let intent = self.server_stream.on_ready(event_set); + self.process_server_intent(intent, event_loop); } else { unreachable!("Unknown token!"); } @@ -211,11 +146,15 @@ impl mio::Handler for Handler { { match request { Request::ServerRequest(server_request) => { - match self.notify_server(server_request) { - Ok(()) => (), - Err(e) => error!("Error processing server request: {}", e), - } - self.reregister_server(event_loop); + let packet = match server_request.to_packet() { + Ok(packet) => packet, + Err(e) => { + error!("Error writing server request to packet: {}", e); + return + } + }; + let intent = self.server_stream.on_notify(packet.into_bytes()); + self.process_server_intent(intent, event_loop); } } } diff --git a/src/proto/stream.rs b/src/proto/stream.rs index 905e125..d28800d 100644 --- a/src/proto/stream.rs +++ b/src/proto/stream.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::error; use std::io; use std::iter; @@ -10,15 +11,17 @@ use mio::TryRead; use super::constants::*; use super::packet::{Packet, ReadFromPacket}; -/// This enum defines the possible actions the stream wants to take after -/// processing an event. -#[derive(Debug, Clone, Copy)] -pub enum Intent { - /// The stream is done, the event loop handler can drop it. - Done, - /// The stream wants to wait for the next event matching the given - /// `EventSet`. - Continue(mio::EventSet), +/*========* + * PARSER * + *========*/ + +/// This trait is implemented by packet sinks to which a parser can forward +/// the packets it reads. +pub trait SendPacket { + type Value: ReadFromPacket; + type Error: error::Error; + + fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>; } /// This enum defines the possible states of a packet parser state machine. @@ -32,15 +35,6 @@ enum State { ReadingPacket, } -/// This trait is implemented by packet sinks to which a parser can forward -/// the packets it reads. -pub trait SendPacket { - type Value: ReadFromPacket; - type Error: error::Error; - - fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>; -} - #[derive(Debug)] struct Parser { state: State, @@ -141,18 +135,75 @@ impl Parser { } } } +} + +/*========* + * OUTBUF * + *========*/ + +/// A struct used for writing bytes to a TryWrite sink. +#[derive(Debug)] +struct OutBuf { + cursor: usize, + bytes: Vec +} + +impl From> for OutBuf { + fn from(bytes: Vec) -> Self { + OutBuf { + cursor: 0, + bytes: bytes + } + } +} + +impl OutBuf { + #[inline] + fn remaining(&self) -> usize { + self.bytes.len() - self.cursor + } + + #[inline] + fn has_remaining(&self) -> bool { + self.remaining() > 0 + } + fn try_write_to(&mut self, mut writer: T) -> io::Result> + where T: mio::TryWrite + { + let result = writer.try_write(&self.bytes[self.cursor..]); + if let Ok(Some(bytes_written)) = result { + self.cursor += bytes_written; + } + result + } +} + +/*========* + * STREAM * + *========*/ + +/// This enum defines the possible actions the stream wants to take after +/// processing an event. +#[derive(Debug, Clone, Copy)] +pub enum Intent { + /// The stream is done, the event loop handler can drop it. + Done, + /// The stream wants to wait for the next event matching the given + /// `EventSet`. + Continue(mio::EventSet), } -/// This struct wraps around an mio byte stream and reads soulseek packets -/// from it, forwarding them once parsed. +/// This struct wraps around an mio byte stream and handles packet reads and +/// writes. #[derive(Debug)] pub struct Stream where T: io::Read + io::Write + mio::Evented, U: SendPacket { stream: T, - parser: Parser + parser: Parser, + queue: VecDeque, } impl Stream @@ -165,6 +216,7 @@ impl Stream Stream { stream: stream, parser: Parser::new(packet_tx), + queue: VecDeque::new(), } } @@ -174,15 +226,62 @@ impl Stream &self.stream } - /// The stream is readable. - pub fn on_readable(&mut self) -> Intent { - match self.parser.read_from(&mut self.stream) { - Ok(()) => Intent::Continue(mio::EventSet::readable()), - Err(e) => { + fn on_writable(&mut self) -> io::Result<()> { + loop { + let mut outbuf = match self.queue.pop_front() { + Some(outbuf) => outbuf, + None => break + }; + + let option = try!(outbuf.try_write_to(&mut self.stream)); + match option { + Some(_) => { + if outbuf.has_remaining() { + self.queue.push_front(outbuf) + } + // Continue looping + }, + None => { + self.queue.push_front(outbuf); + break + } + } + } + Ok(()) + } + + /// The stream is ready to read, write, or both. + pub fn on_ready(&mut self, event_set: mio::EventSet) -> Intent { + if event_set.is_readable() { + let result = self.parser.read_from(&mut self.stream); + if let Err(e) = result { error!("Stream input error: {}", e); - Intent::Done + return Intent::Done + } + } + if event_set.is_writable() { + let result = self.on_writable(); + if let Err(e) = result { + error!("Stream output error: {}", e); + return Intent::Done } } + + // We're always interested in reading more. + // If there is still stuff to write in the queue, we're interested in + // the socket becoming writable too. + let event_set = if self.queue.len() > 0 { + mio::EventSet::readable() | mio::EventSet::writable() + } else { + mio::EventSet::readable() + }; + Intent::Continue(event_set) + } + + /// The stream has been notified. + pub fn on_notify(&mut self, mut bytes: Vec) -> Intent { + self.queue.push_back(OutBuf::from(bytes)); + Intent::Continue(mio::EventSet::readable() | mio::EventSet::writable()) } }