Browse Source

Move entire read/write/notify handling from Handler to Stream.

wip
Titouan Rigoudy 9 years ago
parent
commit
ea5afeb51a
2 changed files with 163 additions and 125 deletions
  1. +37
    -98
      src/proto/handler.rs
  2. +126
    -27
      src/proto/stream.rs

+ 37
- 98
src/proto/handler.rs View File

@ -1,4 +1,3 @@
use std::collections::VecDeque;
use std::io;
use std::net::ToSocketAddrs;
use std::sync::mpsc;
@ -10,42 +9,9 @@ use config;
use super::{Intent, Stream, SendPacket, Request, Response};
use super::server::*;
/// A struct used for writing bytes to a TryWrite sink.
struct OutBuf {
cursor: usize,
bytes: Vec<u8>
}
impl From<Vec<u8>> for OutBuf {
fn from(bytes: Vec<u8>) -> Self {
OutBuf {
cursor: 0,
bytes: bytes
}
}
}
impl OutBuf {
#[inline]
fn remaining(&self) -> usize {
self.bytes.len() - self.cursor
}
#[inline]
fn has_remaining(&self) -> bool {
self.remaining() > 0
}
fn try_write_to<T>(&mut self, mut writer: T) -> io::Result<Option<usize>>
where T: mio::TryWrite
{
let result = writer.try_write(&self.bytes[self.cursor..]);
if let Ok(Some(bytes_written)) = result {
self.cursor += bytes_written;
}
result
}
}
/*===============*
* TOKEN COUNTER *
*===============*/
/// This struct provides a simple way to generate different tokens.
struct TokenCounter {
@ -65,6 +31,10 @@ impl TokenCounter {
}
}
/*========================*
* SERVER RESPONSE SENDER *
*========================*/
pub struct ServerResponseSender(mpsc::Sender<Response>);
impl SendPacket for ServerResponseSender {
@ -76,6 +46,10 @@ impl SendPacket for ServerResponseSender {
}
}
/*=========*
* HANDLER *
*=========*/
/// This struct handles all the soulseek connections, to the server and to
/// peers.
struct Handler {
@ -83,7 +57,6 @@ struct Handler {
server_token: mio::Token,
server_stream: Stream<mio::tcp::TcpStream, ServerResponseSender>,
server_queue: VecDeque<OutBuf>,
client_tx: mpsc::Sender<Response>,
}
@ -106,7 +79,6 @@ impl Handler {
server_token: server_token,
server_stream: server_stream,
server_queue: VecDeque::new(),
client_tx: client_tx,
})
@ -134,54 +106,24 @@ impl Handler {
))
}
fn write_server(&mut self) {
loop {
let mut outbuf = match self.server_queue.pop_front() {
Some(outbuf) => outbuf,
None => break
};
match outbuf.try_write_to(&mut self.server_stream) {
Ok(Some(_)) => {
if outbuf.has_remaining() {
self.server_queue.push_front(outbuf)
}
// Continue looping
},
Ok(None) => {
self.server_queue.push_front(outbuf);
break
},
Err(e) => {
error!("Error writing server stream: {}", e);
break
}
fn process_server_intent(
&mut self, intent: Intent, event_loop: &mut mio::EventLoop<Self>)
{
match intent {
Intent::Done => {
error!("Server connection closed");
// TODO notify client and shut down
},
Intent::Continue(event_set) => {
event_loop.reregister(
self.server_stream.evented(),
self.server_token,
event_set,
mio::PollOpt::edge() | mio::PollOpt::oneshot()
).unwrap();
}
}
}
fn notify_server(&mut self, request: ServerRequest) -> io::Result<()> {
debug!("Sending server request: {:?}", request);
let packet = try!(request.to_packet());
self.server_queue.push_back(OutBuf::from(packet.into_bytes()));
Ok(())
}
/// Re-register the server socket with the event loop.
fn reregister_server(&mut self, event_loop: &mut mio::EventLoop<Self>) {
let event_set = if self.server_queue.len() > 0 {
mio::EventSet::readable() | mio::EventSet::writable()
} else {
mio::EventSet::readable()
};
event_loop.reregister(
self.server_stream.evented(),
self.server_token,
event_set,
mio::PollOpt::edge() | mio::PollOpt::oneshot()
).unwrap();
}
}
impl mio::Handler for Handler {
@ -192,15 +134,8 @@ impl mio::Handler for Handler {
token: mio::Token, event_set: mio::EventSet)
{
if token == self.server_token {
if event_set.is_writable() {
self.write_server();
}
match self.server_stream.on_readable() {
Intent::Done => { /* don't re-register server */ },
Intent::Continue(_) => {
self.reregister_server(event_loop);
}
}
let intent = self.server_stream.on_ready(event_set);
self.process_server_intent(intent, event_loop);
} else {
unreachable!("Unknown token!");
}
@ -211,11 +146,15 @@ impl mio::Handler for Handler {
{
match request {
Request::ServerRequest(server_request) => {
match self.notify_server(server_request) {
Ok(()) => (),
Err(e) => error!("Error processing server request: {}", e),
}
self.reregister_server(event_loop);
let packet = match server_request.to_packet() {
Ok(packet) => packet,
Err(e) => {
error!("Error writing server request to packet: {}", e);
return
}
};
let intent = self.server_stream.on_notify(packet.into_bytes());
self.process_server_intent(intent, event_loop);
}
}
}


+ 126
- 27
src/proto/stream.rs View File

@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::error;
use std::io;
use std::iter;
@ -10,15 +11,17 @@ use mio::TryRead;
use super::constants::*;
use super::packet::{Packet, ReadFromPacket};
/// This enum defines the possible actions the stream wants to take after
/// processing an event.
#[derive(Debug, Clone, Copy)]
pub enum Intent {
/// The stream is done, the event loop handler can drop it.
Done,
/// The stream wants to wait for the next event matching the given
/// `EventSet`.
Continue(mio::EventSet),
/*========*
* PARSER *
*========*/
/// This trait is implemented by packet sinks to which a parser can forward
/// the packets it reads.
pub trait SendPacket {
type Value: ReadFromPacket;
type Error: error::Error;
fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>;
}
/// This enum defines the possible states of a packet parser state machine.
@ -32,15 +35,6 @@ enum State {
ReadingPacket,
}
/// This trait is implemented by packet sinks to which a parser can forward
/// the packets it reads.
pub trait SendPacket {
type Value: ReadFromPacket;
type Error: error::Error;
fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>;
}
#[derive(Debug)]
struct Parser<T: SendPacket> {
state: State,
@ -141,18 +135,75 @@ impl<T: SendPacket> Parser<T> {
}
}
}
}
/*========*
* OUTBUF *
*========*/
/// A struct used for writing bytes to a TryWrite sink.
#[derive(Debug)]
struct OutBuf {
cursor: usize,
bytes: Vec<u8>
}
impl From<Vec<u8>> for OutBuf {
fn from(bytes: Vec<u8>) -> Self {
OutBuf {
cursor: 0,
bytes: bytes
}
}
}
impl OutBuf {
#[inline]
fn remaining(&self) -> usize {
self.bytes.len() - self.cursor
}
#[inline]
fn has_remaining(&self) -> bool {
self.remaining() > 0
}
fn try_write_to<T>(&mut self, mut writer: T) -> io::Result<Option<usize>>
where T: mio::TryWrite
{
let result = writer.try_write(&self.bytes[self.cursor..]);
if let Ok(Some(bytes_written)) = result {
self.cursor += bytes_written;
}
result
}
}
/*========*
* STREAM *
*========*/
/// This enum defines the possible actions the stream wants to take after
/// processing an event.
#[derive(Debug, Clone, Copy)]
pub enum Intent {
/// The stream is done, the event loop handler can drop it.
Done,
/// The stream wants to wait for the next event matching the given
/// `EventSet`.
Continue(mio::EventSet),
}
/// This struct wraps around an mio byte stream and reads soulseek packets
/// from it, forwarding them once parsed.
/// This struct wraps around an mio byte stream and handles packet reads and
/// writes.
#[derive(Debug)]
pub struct Stream<T, U>
where T: io::Read + io::Write + mio::Evented,
U: SendPacket
{
stream: T,
parser: Parser<U>
parser: Parser<U>,
queue: VecDeque<OutBuf>,
}
impl<T, U> Stream<T, U>
@ -165,6 +216,7 @@ impl<T, U> Stream<T, U>
Stream {
stream: stream,
parser: Parser::new(packet_tx),
queue: VecDeque::new(),
}
}
@ -174,15 +226,62 @@ impl<T, U> Stream<T, U>
&self.stream
}
/// The stream is readable.
pub fn on_readable(&mut self) -> Intent {
match self.parser.read_from(&mut self.stream) {
Ok(()) => Intent::Continue(mio::EventSet::readable()),
Err(e) => {
fn on_writable(&mut self) -> io::Result<()> {
loop {
let mut outbuf = match self.queue.pop_front() {
Some(outbuf) => outbuf,
None => break
};
let option = try!(outbuf.try_write_to(&mut self.stream));
match option {
Some(_) => {
if outbuf.has_remaining() {
self.queue.push_front(outbuf)
}
// Continue looping
},
None => {
self.queue.push_front(outbuf);
break
}
}
}
Ok(())
}
/// The stream is ready to read, write, or both.
pub fn on_ready(&mut self, event_set: mio::EventSet) -> Intent {
if event_set.is_readable() {
let result = self.parser.read_from(&mut self.stream);
if let Err(e) = result {
error!("Stream input error: {}", e);
Intent::Done
return Intent::Done
}
}
if event_set.is_writable() {
let result = self.on_writable();
if let Err(e) = result {
error!("Stream output error: {}", e);
return Intent::Done
}
}
// We're always interested in reading more.
// If there is still stuff to write in the queue, we're interested in
// the socket becoming writable too.
let event_set = if self.queue.len() > 0 {
mio::EventSet::readable() | mio::EventSet::writable()
} else {
mio::EventSet::readable()
};
Intent::Continue(event_set)
}
/// The stream has been notified.
pub fn on_notify(&mut self, mut bytes: Vec<u8>) -> Intent {
self.queue.push_back(OutBuf::from(bytes));
Intent::Continue(mio::EventSet::readable() | mio::EventSet::writable())
}
}


Loading…
Cancel
Save