Browse Source

Split PacketStream into Stream and Parser.

wip
Titouan Rigoudy 9 years ago
parent
commit
88d9b46726
2 changed files with 133 additions and 79 deletions
  1. +25
    -42
      src/proto/handler.rs
  2. +108
    -37
      src/proto/stream.rs

+ 25
- 42
src/proto/handler.rs View File

@ -7,7 +7,7 @@ use mio;
use config;
use super::{PacketStream, Request, Response};
use super::{Intent, Stream, SendPacket, Request, Response};
use super::server::*;
/// A struct used for writing bytes to a TryWrite sink.
@ -65,13 +65,24 @@ impl TokenCounter {
}
}
pub struct ServerResponseSender(mpsc::Sender<Response>);
impl SendPacket for ServerResponseSender {
type Value = ServerResponse;
type Error = mpsc::SendError<Response>;
fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> {
self.0.send(Response::ServerResponse(value))
}
}
/// This struct handles all the soulseek connections, to the server and to
/// peers.
struct Handler {
token_counter: TokenCounter,
server_token: mio::Token,
server_stream: PacketStream<mio::tcp::TcpStream>,
server_stream: Stream<mio::tcp::TcpStream, ServerResponseSender>,
server_queue: VecDeque<OutBuf>,
client_tx: mpsc::Sender<Response>,
@ -81,8 +92,9 @@ impl Handler {
fn new(client_tx: mpsc::Sender<Response>) -> io::Result<Self> {
let host = config::SERVER_HOST;
let port = config::SERVER_PORT;
let server_stream = PacketStream::new(
try!(Self::connect(host, port))
let server_stream = Stream::new(
try!(Self::connect(host, port)),
ServerResponseSender(client_tx.clone())
);
info!("Connected to server at {}:{}", host, port);
@ -102,8 +114,8 @@ impl Handler {
fn register(&self, event_loop: &mut mio::EventLoop<Self>) -> io::Result<()>
{
self.server_stream.register(
event_loop,
event_loop.register(
self.server_stream.evented(),
self.server_token,
mio::EventSet::readable(),
mio::PollOpt::edge() | mio::PollOpt::oneshot()
@ -122,37 +134,6 @@ impl Handler {
))
}
fn read_server(&mut self) {
loop {
let mut packet = match self.server_stream.try_read() {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(err) => {
error!("Error reading server: {}", err);
break
}
};
debug!("Read packet with size {}", packet.bytes_remaining());
let response = match packet.read_value() {
Ok(resp) => {
debug!("Received server response: {:?}", resp);
Response::ServerResponse(resp)
},
Err(err) => {
error!("Error parsing server packet: {}", err);
break
}
};
if let Err(err) = self.client_tx.send(response) {
error!("Error sending server response to client: {}", err);
break
}
}
}
fn write_server(&mut self) {
loop {
let mut outbuf = match self.server_queue.pop_front() {
@ -194,8 +175,8 @@ impl Handler {
mio::EventSet::readable()
};
self.server_stream.reregister(
event_loop,
event_loop.reregister(
self.server_stream.evented(),
self.server_token,
event_set,
mio::PollOpt::edge() | mio::PollOpt::oneshot()
@ -214,10 +195,12 @@ impl mio::Handler for Handler {
if event_set.is_writable() {
self.write_server();
}
if event_set.is_readable() {
self.read_server();
match self.server_stream.on_readable() {
Intent::Done => { /* don't re-register server */ },
Intent::Continue(_) => {
self.reregister_server(event_loop);
}
}
self.reregister_server(event_loop);
} else {
unreachable!("Unknown token!");
}


+ 108
- 37
src/proto/stream.rs View File

@ -1,3 +1,4 @@
use std::error;
use std::io;
use std::iter;
use std::mem;
@ -7,44 +8,59 @@ use mio;
use mio::TryRead;
use super::constants::*;
use super::packet::Packet;
use super::packet::{Packet, ReadFromPacket};
/// This enum defines the possible states a PacketStream state machine can be
/// in.
/// This enum defines the possible actions the stream wants to take after
/// processing an event.
#[derive(Debug, Clone, Copy)]
pub enum Intent {
/// The stream is done, the event loop handler can drop it.
Done,
/// The stream wants to wait for the next event matching the given
/// `EventSet`.
Continue(mio::EventSet),
}
/// This enum defines the possible states of a packet parser state machine.
#[derive(Debug, Clone, Copy)]
enum State {
/// The PacketStream is waiting to read enough bytes to determine the
/// The parser is waiting to read enough bytes to determine the
/// length of the following packet.
ReadingLength,
/// The PacketStream is waiting to read enough bytes to form the entire
/// The parser is waiting to read enough bytes to form the entire
/// packet.
ReadingPacket,
}
/// This struct wraps around an mio byte stream and provides the ability to
/// read 32-bit-length-prefixed packets of bytes from it.
/// This trait is implemented by packet sinks to which a parser can forward
/// the packets it reads.
pub trait SendPacket {
type Value: ReadFromPacket;
type Error: error::Error;
fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>;
}
#[derive(Debug)]
pub struct PacketStream<T: io::Read + io::Write + mio::Evented> {
stream: T,
state: State,
struct Parser<T: SendPacket> {
state: State,
num_bytes_left: usize,
buffer: Vec<u8>,
buffer: Vec<u8>,
packet_tx: T,
}
impl<T: io::Read + io::Write + mio::Evented> PacketStream<T> {
/// Returns a new PacketStream wrapping the provided byte stream.
pub fn new(stream: T) -> Self {
PacketStream {
stream: stream,
state: State::ReadingLength,
impl<T: SendPacket> Parser<T> {
pub fn new(packet_tx: T) -> Self {
Parser {
state: State::ReadingLength,
num_bytes_left: U32_SIZE,
buffer: vec![0; U32_SIZE],
buffer: vec![0; U32_SIZE],
packet_tx: packet_tx,
}
}
/// Attemps to read a packet in a non-blocking fashion.
/// If enough bytes can be read from the underlying byte stream to form a
/// If enough bytes can be read from the given byte stream to form a
/// complete packet `p`, returns `Ok(Some(p))`.
/// If not enough bytes are available, returns `Ok(None)`.
/// If an I/O error `e` arises when trying to read the underlying stream,
@ -52,11 +68,13 @@ impl<T: io::Read + io::Write + mio::Evented> PacketStream<T> {
/// Note: as long as this function returns `Ok(Some(p))`, the caller is
/// responsible for calling it once more to ensure that all packets are
/// read as soon as possible.
pub fn try_read(&mut self) -> io::Result<Option<Packet>> {
fn try_read<U>(&mut self, stream: &mut U) -> io::Result<Option<Packet>>
where U: io::Read
{
// Try to read as many bytes as we currently need from the underlying
// byte stream.
let offset = self.buffer.len() - self.num_bytes_left;
match try!(self.stream.try_read(&mut self.buffer[offset..])) {
match try!(stream.try_read(&mut self.buffer[offset..])) {
None => (),
Some(num_bytes_read) => {
@ -84,7 +102,7 @@ impl<T: io::Read + io::Write + mio::Evented> PacketStream<T> {
self.state = State::ReadingPacket;
self.num_bytes_left = message_len;
self.buffer.extend(iter::repeat(0).take(message_len));
self.try_read()
self.try_read(stream)
},
State::ReadingPacket => {
@ -99,26 +117,79 @@ impl<T: io::Read + io::Write + mio::Evented> PacketStream<T> {
}
}
/// Register the packet stream with the given mio event loop.
pub fn register<U: mio::Handler>(
&self, event_loop: &mut mio::EventLoop<U>, token: mio::Token,
event_set: mio::EventSet, poll_opt: mio::PollOpt)
-> io::Result<()>
pub fn read_from<U: io::Read>(&mut self, stream: &mut U) ->
Result<(), String>
{
event_loop.register(&self.stream, token, event_set, poll_opt)
loop {
let mut packet = match self.try_read(stream) {
Ok(Some(packet)) => packet,
Ok(None) => {
return Ok(())
},
Err(e) => {
return Err(format!("Error reading stream: {}", e))
}
};
let value = match packet.read_value() {
Ok(value) => value,
Err(e) => {
return Err(format!("Error parsing packet: {}", e))
}
};
if let Err(e) = self.packet_tx.send_packet(value) {
return Err(format!("Error sending parsed packet: {}", e))
}
}
}
/// Re-register the packet stream with the given mio event loop.
pub fn reregister<U: mio::Handler>(
&self, event_loop: &mut mio::EventLoop<U>, token: mio::Token,
event_set: mio::EventSet, poll_opt: mio::PollOpt)
-> io::Result<()>
{
event_loop.reregister(&self.stream, token, event_set, poll_opt)
}
/// This struct wraps around an mio byte stream and reads soulseek packets
/// from it, forwarding them once parsed.
#[derive(Debug)]
pub struct Stream<T, U>
where T: io::Read + io::Write + mio::Evented,
U: SendPacket
{
stream: T,
parser: Parser<U>
}
impl<T, U> Stream<T, U>
where T: io::Read + io::Write + mio::Evented,
U: SendPacket
{
/// Returns a new struct wrapping the provided byte stream, which will
/// forward packets to the provided sink.
pub fn new(stream: T, packet_tx: U) -> Self {
Stream {
stream: stream,
parser: Parser::new(packet_tx),
}
}
/// Returns a reference to the underlying byte stream, to allow it to be
/// registered with an event loop.
pub fn evented(&self) -> &T {
&self.stream
}
/// The stream is readable.
pub fn on_readable(&mut self) -> Intent {
match self.parser.read_from(&mut self.stream) {
Ok(()) => Intent::Continue(mio::EventSet::readable()),
Err(e) => {
error!("Stream input error: {}", e);
Intent::Done
}
}
}
}
impl<T: io::Read + io::Write + mio::Evented> io::Write for PacketStream<T> {
impl<T, U> io::Write for Stream<T, U>
where T: io::Read + io::Write + mio::Evented,
U: SendPacket
{
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.stream.write(bytes)
}


Loading…
Cancel
Save