diff --git a/src/proto/handler.rs b/src/proto/handler.rs index 02501ac..99fcdef 100644 --- a/src/proto/handler.rs +++ b/src/proto/handler.rs @@ -141,19 +141,12 @@ impl mio::Handler for Handler { } } - fn notify( - &mut self, event_loop: &mut mio::EventLoop, request: Request) + fn notify(&mut self, event_loop: &mut mio::EventLoop, + request: Request) { match request { Request::ServerRequest(server_request) => { - let packet = match server_request.to_packet() { - Ok(packet) => packet, - Err(e) => { - error!("Error writing server request to packet: {}", e); - return - } - }; - let intent = self.server_stream.on_notify(packet.into_bytes()); + let intent = self.server_stream.on_notify(&server_request); self.process_server_intent(intent, event_loop); } } diff --git a/src/proto/packet.rs b/src/proto/packet.rs index ea77856..1483c10 100644 --- a/src/proto/packet.rs +++ b/src/proto/packet.rs @@ -16,8 +16,6 @@ use super::constants::*; #[derive(Debug)] pub struct Packet { - /// The packet code. - code: u32, /// The current read position in the byte buffer. cursor: usize, /// The underlying bytes. @@ -40,25 +38,17 @@ impl Packet { /// packet. pub fn from_bytes(bytes: Vec) -> Self { // Check that the packet is long enough to contain at least a code. - assert!(bytes.len() >= 2*U32_SIZE); + assert!(bytes.len() >= U32_SIZE); // Read the purported length of the packet. let size = LittleEndian::read_u32(&bytes[0..U32_SIZE]) as usize; // Check that the packet has the right length. assert!(size + U32_SIZE == bytes.len()); - // Read the packet code. - let code = LittleEndian::read_u32(&bytes[U32_SIZE..2*U32_SIZE]); Packet { - code: code, - cursor: 2*U32_SIZE, + cursor: U32_SIZE, bytes: bytes, } } - /// Returns the packet code. - pub fn code(&self) -> u32 { - self.code - } - /// Provides the main way to read data out of a binary packet. pub fn read_value(&mut self) -> Result where T: ReadFromPacket @@ -83,13 +73,10 @@ pub struct MutPacket { impl MutPacket { /// Returns an empty packet with the given packet code. - pub fn new(code: u32) -> Self { + pub fn new() -> Self { // Leave space for the eventual size of the packet. - let mut bytes = vec![0; U32_SIZE]; - // Write the code. - bytes.write_u32::(code).unwrap(); MutPacket { - bytes: bytes, + bytes: vec![0; U32_SIZE] } } diff --git a/src/proto/server/request.rs b/src/proto/server/request.rs index 3ed18b7..d7aba45 100644 --- a/src/proto/server/request.rs +++ b/src/proto/server/request.rs @@ -22,43 +22,49 @@ pub enum ServerRequest { UserStatusRequest(UserStatusRequest), } -macro_rules! try_to_packet { - ($code: ident, $request:ident) => { - { - let mut packet = MutPacket::new($code); - try!(packet.write_value($request)); - Ok(packet) - } - } -} - -impl ServerRequest { - pub fn to_packet(&self) -> io::Result { +impl<'a> WriteToPacket for &'a ServerRequest { + fn write_to_packet(self, packet: &mut MutPacket) -> io::Result<()> { match *self { - ServerRequest::LoginRequest(ref request) => - try_to_packet!(CODE_LOGIN, request), - - ServerRequest::PeerAddressRequest(ref request) => - try_to_packet!(CODE_PEER_ADDRESS, request), - - ServerRequest::RoomJoinRequest(ref request) => - try_to_packet!(CODE_ROOM_JOIN, request), - - ServerRequest::RoomLeaveRequest(ref request) => - try_to_packet!(CODE_ROOM_LEAVE, request), - - ServerRequest::RoomListRequest => - Ok(MutPacket::new(CODE_ROOM_LIST)), - - ServerRequest::RoomMessageRequest(ref request) => - try_to_packet!(CODE_ROOM_MESSAGE, request), - - ServerRequest::SetListenPortRequest(ref request) => - try_to_packet!(CODE_SET_LISTEN_PORT, request), - - ServerRequest::UserStatusRequest(ref request) => - try_to_packet!(CODE_USER_STATUS, request), + ServerRequest::LoginRequest(ref request) => { + try!(packet.write_value(CODE_LOGIN)); + try!(packet.write_value(request)); + }, + + ServerRequest::PeerAddressRequest(ref request) => { + try!(packet.write_value(CODE_PEER_ADDRESS)); + try!(packet.write_value(request)); + }, + + ServerRequest::RoomJoinRequest(ref request) => { + try!(packet.write_value(CODE_ROOM_JOIN)); + try!(packet.write_value(request)); + }, + + ServerRequest::RoomLeaveRequest(ref request) => { + try!(packet.write_value(CODE_ROOM_LEAVE)); + try!(packet.write_value(request)); + }, + + ServerRequest::RoomListRequest => { + try!(packet.write_value(CODE_ROOM_LIST)); + }, + + ServerRequest::RoomMessageRequest(ref request) => { + try!(packet.write_value(CODE_ROOM_MESSAGE)); + try!(packet.write_value(request)); + }, + + ServerRequest::SetListenPortRequest(ref request) => { + try!(packet.write_value(CODE_SET_LISTEN_PORT)); + try!(packet.write_value(request)); + }, + + ServerRequest::UserStatusRequest(ref request) => { + try!(packet.write_value(CODE_USER_STATUS)); + try!(packet.write_value(request)); + } } + Ok(()) } } diff --git a/src/proto/server/response.rs b/src/proto/server/response.rs index 9e1cfb6..469b080 100644 --- a/src/proto/server/response.rs +++ b/src/proto/server/response.rs @@ -32,68 +32,91 @@ pub enum ServerResponse { UnknownResponse(u32), } -macro_rules! try_read_from_packet { - ($variant:ident, $packet:ident) => { - ServerResponse::$variant( - try!($packet.read_value()) - ) - } -} - impl ReadFromPacket for ServerResponse { fn read_from_packet(packet: &mut Packet) -> Result { - let resp = match packet.code() { + let code: u32 = try!(packet.read_value()); + let resp = match code { CODE_CONNECT_TO_PEER => - try_read_from_packet!(ConnectToPeerResponse, packet), + ServerResponse::ConnectToPeerResponse( + try!(packet.read_value()) + ), CODE_LOGIN => - try_read_from_packet!(LoginResponse, packet), + ServerResponse::LoginResponse( + try!(packet.read_value()) + ), CODE_PEER_ADDRESS => - try_read_from_packet!(PeerAddressResponse, packet), + ServerResponse::PeerAddressResponse( + try!(packet.read_value()) + ), CODE_PRIVILEGED_USERS => - try_read_from_packet!(PrivilegedUsersResponse, packet), + ServerResponse::PrivilegedUsersResponse( + try!(packet.read_value()) + ), CODE_ROOM_JOIN => - try_read_from_packet!(RoomJoinResponse, packet), + ServerResponse::RoomJoinResponse( + try!(packet.read_value()) + ), CODE_ROOM_LEAVE => - try_read_from_packet!(RoomLeaveResponse, packet), + ServerResponse::RoomLeaveResponse( + try!(packet.read_value()) + ), CODE_ROOM_LIST => - try_read_from_packet!(RoomListResponse, packet), + ServerResponse::RoomListResponse( + try!(packet.read_value()) + ), CODE_ROOM_MESSAGE => - try_read_from_packet!(RoomMessageResponse, packet), + ServerResponse::RoomMessageResponse( + try!(packet.read_value()) + ), CODE_ROOM_TICKERS => - try_read_from_packet!(RoomTickersResponse, packet), + ServerResponse::RoomTickersResponse( + try!(packet.read_value()) + ), CODE_ROOM_USER_JOINED => - try_read_from_packet!(RoomUserJoinedResponse, packet), + ServerResponse::RoomUserJoinedResponse( + try!(packet.read_value()) + ), CODE_ROOM_USER_LEFT => - try_read_from_packet!(RoomUserLeftResponse, packet), + ServerResponse::RoomUserLeftResponse( + try!(packet.read_value()) + ), CODE_USER_STATUS => - try_read_from_packet!(UserStatusResponse, packet), + ServerResponse::UserStatusResponse( + try!(packet.read_value()) + ), CODE_WISHLIST_INTERVAL => - try_read_from_packet!(WishlistIntervalResponse, packet), + ServerResponse::WishlistIntervalResponse( + try!(packet.read_value()) + ), CODE_PARENT_MIN_SPEED => - try_read_from_packet!(ParentMinSpeedResponse, packet), + ServerResponse::ParentMinSpeedResponse( + try!(packet.read_value()) + ), CODE_PARENT_SPEED_RATIO => - try_read_from_packet!(ParentSpeedRatioResponse, packet), + ServerResponse::ParentSpeedRatioResponse( + try!(packet.read_value()) + ), code => ServerResponse::UnknownResponse(code), }; let bytes_remaining = packet.bytes_remaining(); if bytes_remaining > 0 { warn!("Packet with code {} contains {} extra bytes", - packet.code(), bytes_remaining) + code, bytes_remaining) } Ok(resp) } diff --git a/src/proto/stream.rs b/src/proto/stream.rs index d28800d..8579b93 100644 --- a/src/proto/stream.rs +++ b/src/proto/stream.rs @@ -9,7 +9,7 @@ use mio; use mio::TryRead; use super::constants::*; -use super::packet::{Packet, ReadFromPacket}; +use super::packet::{MutPacket, Packet, ReadFromPacket, WriteToPacket}; /*========* * PARSER * @@ -279,22 +279,16 @@ impl Stream } /// The stream has been notified. - pub fn on_notify(&mut self, mut bytes: Vec) -> Intent { - self.queue.push_back(OutBuf::from(bytes)); + pub fn on_notify(&mut self, payload: V) -> Intent + where V: WriteToPacket + { + let mut packet = MutPacket::new(); + let result = packet.write_value(payload); + if let Err(e) = result { + error!("Error writing payload to packet: {}", e); + return Intent::Done + } + self.queue.push_back(OutBuf::from(packet.into_bytes())); Intent::Continue(mio::EventSet::readable() | mio::EventSet::writable()) } } - -impl io::Write for Stream - where T: io::Read + io::Write + mio::Evented, - U: SendPacket -{ - fn write(&mut self, bytes: &[u8]) -> io::Result { - self.stream.write(bytes) - } - - fn flush(&mut self) -> io::Result<()> { - self.stream.flush() - } -} -