From 7b34d10754cc70b227a13600105882b36ffe736a Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Sun, 29 Aug 2021 17:32:41 +0200 Subject: [PATCH] Delete old unused proto code. --- proto/src/packet.rs | 413 -------------------------------------------- proto/src/stream.rs | 230 ------------------------ 2 files changed, 643 deletions(-) delete mode 100644 proto/src/packet.rs delete mode 100644 proto/src/stream.rs diff --git a/proto/src/packet.rs b/proto/src/packet.rs deleted file mode 100644 index 085749e..0000000 --- a/proto/src/packet.rs +++ /dev/null @@ -1,413 +0,0 @@ -use std::error; -use std::fmt; -use std::io; -use std::io::{Read, Write}; -use std::mem; -use std::net; - -use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt}; -use encoding::all::ISO_8859_1; -use encoding::{DecoderTrap, EncoderTrap, Encoding}; -#[allow(deprecated)] -use mio::deprecated::TryRead; - -use crate::core::constants::*; - -/*==================* - * READ-ONLY PACKET * - *==================*/ - -#[derive(Debug)] -pub struct Packet { - /// The current read position in the byte buffer. - cursor: usize, - /// The underlying bytes. - bytes: Vec, -} - -impl io::Read for Packet { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let bytes_read = { - let mut slice = &self.bytes[self.cursor..]; - slice.read(buf)? - }; - self.cursor += bytes_read; - Ok(bytes_read) - } -} - -impl Packet { - /// Returns a readable packet struct from the wire representation of a - /// packet. - /// Assumes that the given vector is a valid length-prefixed packet. - fn from_wire(bytes: Vec) -> Self { - Packet { - cursor: U32_SIZE, - bytes: bytes, - } - } - - /// Provides the main way to read data out of a binary packet. - pub fn read_value(&mut self) -> Result - where - T: ReadFromPacket, - { - T::read_from_packet(self) - } - - /// Returns the number of unread bytes remaining in the packet. - pub fn bytes_remaining(&self) -> usize { - self.bytes.len() - self.cursor - } -} - -/*===================* - * WRITE-ONLY PACKET * - *===================*/ - -#[derive(Debug)] -pub struct MutPacket { - bytes: Vec, -} - -impl MutPacket { - /// Returns an empty packet with the given packet code. - pub fn new() -> Self { - // Leave space for the eventual size of the packet. - MutPacket { - bytes: vec![0; U32_SIZE], - } - } - - /// Provides the main way to write data into a binary packet. - pub fn write_value(&mut self, val: &T) -> io::Result<()> - where - T: WriteToPacket + ?Sized, - { - val.write_to_packet(self) - } - - /// Consumes the mutable packet and returns its wire representation. - pub fn into_bytes(mut self) -> Vec { - let length = (self.bytes.len() - U32_SIZE) as u32; - { - let mut first_word = &mut self.bytes[..U32_SIZE]; - first_word.write_u32::(length).unwrap(); - } - self.bytes - } -} - -impl io::Write for MutPacket { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.bytes.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.bytes.flush() - } -} - -/*===================* - * PACKET READ ERROR * - *===================*/ - -/// This enum contains an error that arose when reading data out of a Packet. -#[derive(Debug)] -pub enum PacketReadError { - /// Attempted to read a boolean, but the value was not 0 nor 1. - InvalidBoolError(u8), - /// Attempted to read an unsigned 16-bit integer, but the value was too - /// large. - InvalidU16Error(u32), - /// Attempted to read a string, but a character was invalid. - InvalidStringError(Vec), - /// Attempted to read a user::Status, but the value was not a valid - /// representation of an enum variant. - InvalidUserStatusError(u32), - /// Encountered an I/O error while reading. - IOError(io::Error), -} - -impl fmt::Display for PacketReadError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match *self { - PacketReadError::InvalidBoolError(n) => { - write!(fmt, "InvalidBoolError: {}", n) - } - PacketReadError::InvalidU16Error(n) => { - write!(fmt, "InvalidU16Error: {}", n) - } - PacketReadError::InvalidStringError(ref bytes) => { - write!(fmt, "InvalidStringError: {:?}", bytes) - } - PacketReadError::InvalidUserStatusError(n) => { - write!(fmt, "InvalidUserStatusError: {}", n) - } - PacketReadError::IOError(ref err) => { - write!(fmt, "IOError: {}", err) - } - } - } -} - -impl error::Error for PacketReadError { - fn description(&self) -> &str { - match *self { - PacketReadError::InvalidBoolError(_) => "InvalidBoolError", - PacketReadError::InvalidU16Error(_) => "InvalidU16Error", - PacketReadError::InvalidStringError(_) => "InvalidStringError", - PacketReadError::InvalidUserStatusError(_) => "InvalidUserStatusError", - PacketReadError::IOError(_) => "IOError", - } - } - - fn cause(&self) -> Option<&dyn error::Error> { - match *self { - PacketReadError::InvalidBoolError(_) => None, - PacketReadError::InvalidU16Error(_) => None, - PacketReadError::InvalidStringError(_) => None, - PacketReadError::InvalidUserStatusError(_) => None, - PacketReadError::IOError(ref err) => Some(err), - } - } -} - -impl From for PacketReadError { - fn from(err: io::Error) -> Self { - PacketReadError::IOError(err) - } -} - -/*==================* - * READ FROM PACKET * - *==================*/ - -/// This trait is implemented by types that can be deserialized from binary -/// Packets. -pub trait ReadFromPacket: Sized { - fn read_from_packet(_: &mut Packet) -> Result; -} - -/// 32-bit integers are serialized in 4 bytes, little-endian. -impl ReadFromPacket for u32 { - fn read_from_packet(packet: &mut Packet) -> Result { - Ok(packet.read_u32::()?) - } -} - -/// For convenience, usize's are deserialized as u32's then casted. -impl ReadFromPacket for usize { - fn read_from_packet(packet: &mut Packet) -> Result { - Ok(u32::read_from_packet(packet)? as usize) - } -} - -/// Booleans are serialized as single bytes, containing either 0 or 1. -impl ReadFromPacket for bool { - fn read_from_packet(packet: &mut Packet) -> Result { - match packet.read_u8()? { - 0 => Ok(false), - 1 => Ok(true), - n => Err(PacketReadError::InvalidBoolError(n)), - } - } -} - -/// 16-bit integers are serialized as 32-bit integers. -impl ReadFromPacket for u16 { - fn read_from_packet(packet: &mut Packet) -> Result { - let n = u32::read_from_packet(packet)?; - if n > MAX_PORT { - return Err(PacketReadError::InvalidU16Error(n)); - } - Ok(n as u16) - } -} - -/// IPv4 addresses are serialized directly as 32-bit integers. -impl ReadFromPacket for net::Ipv4Addr { - fn read_from_packet(packet: &mut Packet) -> Result { - let ip = u32::read_from_packet(packet)?; - Ok(net::Ipv4Addr::from(ip)) - } -} - -/// Strings are serialized as length-prefixed arrays of ISO-8859-1 encoded -/// characters. -impl ReadFromPacket for String { - fn read_from_packet(packet: &mut Packet) -> Result { - let len = usize::read_from_packet(packet)?; - - let mut buffer = vec![0; len]; - packet.read_exact(&mut buffer)?; - - match ISO_8859_1.decode(&buffer, DecoderTrap::Strict) { - Ok(string) => Ok(string), - Err(_) => Err(PacketReadError::InvalidStringError(buffer)), - } - } -} - -/// Vectors are serialized as length-prefixed arrays of values. -impl ReadFromPacket for Vec { - fn read_from_packet(packet: &mut Packet) -> Result { - let len = usize::read_from_packet(packet)?; - - let mut vec = Vec::new(); - for _ in 0..len { - vec.push(T::read_from_packet(packet)?); - } - - Ok(vec) - } -} - -/*=================* - * WRITE TO PACKET * - *=================*/ - -/// This trait is implemented by types that can be serialized to a binary -/// MutPacket. -pub trait WriteToPacket { - fn write_to_packet(&self, _: &mut MutPacket) -> io::Result<()>; -} - -/// 32-bit integers are serialized in 4 bytes, little-endian. -impl WriteToPacket for u32 { - fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> { - packet.write_u32::(*self) - } -} - -/// Booleans are serialized as single bytes, containing either 0 or 1. -impl WriteToPacket for bool { - fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> { - packet.write_u8(*self as u8)?; - Ok(()) - } -} - -/// 16-bit integers are serialized as 32-bit integers. -impl WriteToPacket for u16 { - fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> { - (*self as u32).write_to_packet(packet) - } -} - -/// Strings are serialized as a length-prefixed array of ISO-8859-1 encoded -/// characters. -impl WriteToPacket for str { - fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> { - // Encode the string. - let bytes = match ISO_8859_1.encode(self, EncoderTrap::Strict) { - Ok(bytes) => bytes, - Err(_) => { - let copy = self.to_string(); - return Err(io::Error::new(io::ErrorKind::Other, copy)); - } - }; - // Then write the bytes to the packet. - (bytes.len() as u32).write_to_packet(packet)?; - packet.write(&bytes)?; - Ok(()) - } -} - -/// Deref coercion does not happen for trait methods apparently. -impl WriteToPacket for String { - fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> { - (self as &str).write_to_packet(packet) - } -} - -/*========* - * PARSER * - *========*/ - -/// This enum defines the possible states of a packet parser state machine. -#[derive(Debug, Clone, Copy)] -enum State { - /// The parser is waiting to read enough bytes to determine the - /// length of the following packet. - ReadingLength, - /// The parser is waiting to read enough bytes to form the entire - /// packet. - ReadingPacket, -} - -#[derive(Debug)] -pub struct Parser { - state: State, - num_bytes_left: usize, - buffer: Vec, -} - -impl Parser { - pub fn new() -> Self { - Parser { - state: State::ReadingLength, - num_bytes_left: U32_SIZE, - buffer: vec![0; U32_SIZE], - } - } - - /// Attemps to read a packet in a non-blocking fashion. - /// If enough bytes can be read from the given byte stream to form a - /// complete packet `p`, returns `Ok(Some(p))`. - /// If not enough bytes are available, returns `Ok(None)`. - /// If an I/O error `e` arises when trying to read the underlying stream, - /// returns `Err(e)`. - /// Note: as long as this function returns `Ok(Some(p))`, the caller is - /// responsible for calling it once more to ensure that all packets are - /// read as soon as possible. - pub fn try_read(&mut self, stream: &mut U) -> io::Result> - where - U: io::Read, - { - // Try to read as many bytes as we currently need from the underlying - // byte stream. - let offset = self.buffer.len() - self.num_bytes_left; - - #[allow(deprecated)] - match stream.try_read(&mut self.buffer[offset..])? { - None => (), - - Some(num_bytes_read) => { - self.num_bytes_left -= num_bytes_read; - } - } - - // If we haven't read enough bytes, return. - if self.num_bytes_left > 0 { - return Ok(None); - } - - // Otherwise, the behavior depends on what state we were in. - match self.state { - State::ReadingLength => { - // If we have finished reading the length prefix, then - // deserialize it, switch states and try to read the packet - // bytes. - let message_len = LittleEndian::read_u32(&mut self.buffer) as usize; - if message_len > MAX_MESSAGE_SIZE { - unimplemented!(); - }; - self.state = State::ReadingPacket; - self.num_bytes_left = message_len; - self.buffer.resize(message_len + U32_SIZE, 0); - self.try_read(stream) - } - - State::ReadingPacket => { - // If we have finished reading the packet, swap the full buffer - // out and return the packet made from the full buffer. - self.state = State::ReadingLength; - self.num_bytes_left = U32_SIZE; - let new_buffer = vec![0; U32_SIZE]; - let old_buffer = mem::replace(&mut self.buffer, new_buffer); - Ok(Some(Packet::from_wire(old_buffer))) - } - } - } -} diff --git a/proto/src/stream.rs b/proto/src/stream.rs deleted file mode 100644 index 11835b5..0000000 --- a/proto/src/stream.rs +++ /dev/null @@ -1,230 +0,0 @@ -use std::collections::VecDeque; -use std::error; -use std::fmt; -use std::io; -use std::net::ToSocketAddrs; - -use log::error; -use mio; - -use super::packet::{MutPacket, Parser, ReadFromPacket, WriteToPacket}; - -/*========* - * OUTBUF * - *========*/ - -/// A struct used for writing bytes to a TryWrite sink. -#[derive(Debug)] -struct OutBuf { - cursor: usize, - bytes: Vec, -} - -impl From> for OutBuf { - fn from(bytes: Vec) -> Self { - OutBuf { - cursor: 0, - bytes: bytes, - } - } -} - -impl OutBuf { - #[inline] - fn remaining(&self) -> usize { - self.bytes.len() - self.cursor - } - - #[inline] - fn has_remaining(&self) -> bool { - self.remaining() > 0 - } - - #[allow(deprecated)] - fn try_write_to(&mut self, mut writer: T) -> io::Result> - where - T: mio::deprecated::TryWrite, - { - let result = writer.try_write(&self.bytes[self.cursor..]); - if let Ok(Some(bytes_written)) = result { - self.cursor += bytes_written; - } - result - } -} - -/*========* - * STREAM * - *========*/ - -/// This trait is implemented by packet sinks to which a stream can forward -/// the packets it reads. -pub trait SendPacket { - type Value: ReadFromPacket; - type Error: error::Error; - - fn send_packet(&mut self, _: Self::Value) -> Result<(), Self::Error>; - - fn notify_open(&mut self) -> Result<(), Self::Error>; -} - -/// This enum defines the possible actions the stream wants to take after -/// processing an event. -#[derive(Debug, Clone, Copy)] -pub enum Intent { - /// The stream is done, the event loop handler can drop it. - Done, - /// The stream wants to wait for the next event matching the given - /// `EventSet`. - Continue(mio::Ready), -} - -/// This struct wraps around an mio tcp stream and handles packet reads and -/// writes. -#[derive(Debug)] -pub struct Stream { - parser: Parser, - queue: VecDeque, - sender: T, - stream: mio::tcp::TcpStream, - - is_connected: bool, -} - -impl Stream { - /// Returns a new stream, asynchronously connected to the given address, - /// which forwards incoming packets to the given sender. - /// If an error occurs when connecting, returns an error. - pub fn new(addr_spec: U, sender: T) -> io::Result - where - U: ToSocketAddrs + fmt::Debug, - { - for sock_addr in addr_spec.to_socket_addrs()? { - if let Ok(stream) = mio::tcp::TcpStream::connect(&sock_addr) { - return Ok(Stream { - parser: Parser::new(), - queue: VecDeque::new(), - sender: sender, - stream: stream, - - is_connected: false, - }); - } - } - Err(io::Error::new( - io::ErrorKind::Other, - format!("Cannot connect to {:?}", addr_spec), - )) - } - - /// Returns a reference to the underlying byte stream, to allow it to be - /// registered with an event loop. - pub fn evented(&self) -> &mio::tcp::TcpStream { - &self.stream - } - - /// The stream is ready to be read from. - fn on_readable(&mut self) -> Result<(), String> { - loop { - let mut packet = match self.parser.try_read(&mut self.stream) { - Ok(Some(packet)) => packet, - Ok(None) => break, - Err(e) => return Err(format!("Error reading stream: {}", e)), - }; - let value = match packet.read_value() { - Ok(value) => value, - Err(e) => return Err(format!("Error parsing packet: {}", e)), - }; - if let Err(e) = self.sender.send_packet(value) { - return Err(format!("Error sending parsed packet: {}", e)); - } - } - Ok(()) - } - - /// The stream is ready to be written to. - fn on_writable(&mut self) -> io::Result<()> { - loop { - let mut outbuf = match self.queue.pop_front() { - Some(outbuf) => outbuf, - None => break, - }; - - let option = outbuf.try_write_to(&mut self.stream)?; - match option { - Some(_) => { - if outbuf.has_remaining() { - self.queue.push_front(outbuf) - } - // Continue looping - } - None => { - self.queue.push_front(outbuf); - break; - } - } - } - Ok(()) - } - - /// The stream is ready to read, write, or both. - pub fn on_ready(&mut self, event_set: mio::Ready) -> Intent { - #[allow(deprecated)] - if event_set.is_hup() || event_set.is_error() { - return Intent::Done; - } - if event_set.is_readable() { - let result = self.on_readable(); - if let Err(e) = result { - error!("Stream input error: {}", e); - return Intent::Done; - } - } - if event_set.is_writable() { - let result = self.on_writable(); - if let Err(e) = result { - error!("Stream output error: {}", e); - return Intent::Done; - } - } - - // We must have read or written something succesfully if we're here, - // so the stream must be connected. - if !self.is_connected { - // If we weren't already connected, notify the sink. - if let Err(err) = self.sender.notify_open() { - error!("Cannot notify client that stream is open: {}", err); - return Intent::Done; - } - // And record the fact that we are now connected. - self.is_connected = true; - } - - // We're always interested in reading more. - #[allow(deprecated)] - let mut event_set = - mio::Ready::readable() | mio::Ready::hup() | mio::Ready::error(); - // If there is still stuff to write in the queue, we're interested in - // the socket becoming writable too. - if self.queue.len() > 0 { - event_set = event_set | mio::Ready::writable(); - } - - Intent::Continue(event_set) - } - - /// The stream has been notified. - pub fn on_notify(&mut self, payload: &V) -> Intent - where - V: WriteToPacket, - { - let mut packet = MutPacket::new(); - let result = packet.write_value(payload); - if let Err(e) = result { - error!("Error writing payload to packet: {}", e); - return Intent::Done; - } - self.queue.push_back(OutBuf::from(packet.into_bytes())); - Intent::Continue(mio::Ready::readable() | mio::Ready::writable()) - } -}