Browse Source

Change Packets to not special-case the code header.

wip
Titouan Rigoudy 9 years ago
parent
commit
2c7b0bb2bc
5 changed files with 107 additions and 104 deletions
  1. +3
    -10
      src/proto/handler.rs
  2. +4
    -17
      src/proto/packet.rs
  3. +41
    -35
      src/proto/server/request.rs
  4. +48
    -25
      src/proto/server/response.rs
  5. +11
    -17
      src/proto/stream.rs

+ 3
- 10
src/proto/handler.rs View File

@ -141,19 +141,12 @@ impl mio::Handler for Handler {
} }
} }
fn notify(
&mut self, event_loop: &mut mio::EventLoop<Self>, request: Request)
fn notify(&mut self, event_loop: &mut mio::EventLoop<Self>,
request: Request)
{ {
match request { match request {
Request::ServerRequest(server_request) => { Request::ServerRequest(server_request) => {
let packet = match server_request.to_packet() {
Ok(packet) => packet,
Err(e) => {
error!("Error writing server request to packet: {}", e);
return
}
};
let intent = self.server_stream.on_notify(packet.into_bytes());
let intent = self.server_stream.on_notify(&server_request);
self.process_server_intent(intent, event_loop); self.process_server_intent(intent, event_loop);
} }
} }


+ 4
- 17
src/proto/packet.rs View File

@ -16,8 +16,6 @@ use super::constants::*;
#[derive(Debug)] #[derive(Debug)]
pub struct Packet { pub struct Packet {
/// The packet code.
code: u32,
/// The current read position in the byte buffer. /// The current read position in the byte buffer.
cursor: usize, cursor: usize,
/// The underlying bytes. /// The underlying bytes.
@ -40,25 +38,17 @@ impl Packet {
/// packet. /// packet.
pub fn from_bytes(bytes: Vec<u8>) -> Self { pub fn from_bytes(bytes: Vec<u8>) -> Self {
// Check that the packet is long enough to contain at least a code. // Check that the packet is long enough to contain at least a code.
assert!(bytes.len() >= 2*U32_SIZE);
assert!(bytes.len() >= U32_SIZE);
// Read the purported length of the packet. // Read the purported length of the packet.
let size = LittleEndian::read_u32(&bytes[0..U32_SIZE]) as usize; let size = LittleEndian::read_u32(&bytes[0..U32_SIZE]) as usize;
// Check that the packet has the right length. // Check that the packet has the right length.
assert!(size + U32_SIZE == bytes.len()); assert!(size + U32_SIZE == bytes.len());
// Read the packet code.
let code = LittleEndian::read_u32(&bytes[U32_SIZE..2*U32_SIZE]);
Packet { Packet {
code: code,
cursor: 2*U32_SIZE,
cursor: U32_SIZE,
bytes: bytes, bytes: bytes,
} }
} }
/// Returns the packet code.
pub fn code(&self) -> u32 {
self.code
}
/// Provides the main way to read data out of a binary packet. /// Provides the main way to read data out of a binary packet.
pub fn read_value<T>(&mut self) -> Result<T, PacketReadError> pub fn read_value<T>(&mut self) -> Result<T, PacketReadError>
where T: ReadFromPacket where T: ReadFromPacket
@ -83,13 +73,10 @@ pub struct MutPacket {
impl MutPacket { impl MutPacket {
/// Returns an empty packet with the given packet code. /// Returns an empty packet with the given packet code.
pub fn new(code: u32) -> Self {
pub fn new() -> Self {
// Leave space for the eventual size of the packet. // Leave space for the eventual size of the packet.
let mut bytes = vec![0; U32_SIZE];
// Write the code.
bytes.write_u32::<LittleEndian>(code).unwrap();
MutPacket { MutPacket {
bytes: bytes,
bytes: vec![0; U32_SIZE]
} }
} }


+ 41
- 35
src/proto/server/request.rs View File

@ -22,43 +22,49 @@ pub enum ServerRequest {
UserStatusRequest(UserStatusRequest), UserStatusRequest(UserStatusRequest),
} }
macro_rules! try_to_packet {
($code: ident, $request:ident) => {
{
let mut packet = MutPacket::new($code);
try!(packet.write_value($request));
Ok(packet)
}
}
}
impl ServerRequest {
pub fn to_packet(&self) -> io::Result<MutPacket> {
impl<'a> WriteToPacket for &'a ServerRequest {
fn write_to_packet(self, packet: &mut MutPacket) -> io::Result<()> {
match *self { match *self {
ServerRequest::LoginRequest(ref request) =>
try_to_packet!(CODE_LOGIN, request),
ServerRequest::PeerAddressRequest(ref request) =>
try_to_packet!(CODE_PEER_ADDRESS, request),
ServerRequest::RoomJoinRequest(ref request) =>
try_to_packet!(CODE_ROOM_JOIN, request),
ServerRequest::RoomLeaveRequest(ref request) =>
try_to_packet!(CODE_ROOM_LEAVE, request),
ServerRequest::RoomListRequest =>
Ok(MutPacket::new(CODE_ROOM_LIST)),
ServerRequest::RoomMessageRequest(ref request) =>
try_to_packet!(CODE_ROOM_MESSAGE, request),
ServerRequest::SetListenPortRequest(ref request) =>
try_to_packet!(CODE_SET_LISTEN_PORT, request),
ServerRequest::UserStatusRequest(ref request) =>
try_to_packet!(CODE_USER_STATUS, request),
ServerRequest::LoginRequest(ref request) => {
try!(packet.write_value(CODE_LOGIN));
try!(packet.write_value(request));
},
ServerRequest::PeerAddressRequest(ref request) => {
try!(packet.write_value(CODE_PEER_ADDRESS));
try!(packet.write_value(request));
},
ServerRequest::RoomJoinRequest(ref request) => {
try!(packet.write_value(CODE_ROOM_JOIN));
try!(packet.write_value(request));
},
ServerRequest::RoomLeaveRequest(ref request) => {
try!(packet.write_value(CODE_ROOM_LEAVE));
try!(packet.write_value(request));
},
ServerRequest::RoomListRequest => {
try!(packet.write_value(CODE_ROOM_LIST));
},
ServerRequest::RoomMessageRequest(ref request) => {
try!(packet.write_value(CODE_ROOM_MESSAGE));
try!(packet.write_value(request));
},
ServerRequest::SetListenPortRequest(ref request) => {
try!(packet.write_value(CODE_SET_LISTEN_PORT));
try!(packet.write_value(request));
},
ServerRequest::UserStatusRequest(ref request) => {
try!(packet.write_value(CODE_USER_STATUS));
try!(packet.write_value(request));
}
} }
Ok(())
} }
} }


+ 48
- 25
src/proto/server/response.rs View File

@ -32,68 +32,91 @@ pub enum ServerResponse {
UnknownResponse(u32), UnknownResponse(u32),
} }
macro_rules! try_read_from_packet {
($variant:ident, $packet:ident) => {
ServerResponse::$variant(
try!($packet.read_value())
)
}
}
impl ReadFromPacket for ServerResponse { impl ReadFromPacket for ServerResponse {
fn read_from_packet(packet: &mut Packet) -> Result<Self, PacketReadError> { fn read_from_packet(packet: &mut Packet) -> Result<Self, PacketReadError> {
let resp = match packet.code() {
let code: u32 = try!(packet.read_value());
let resp = match code {
CODE_CONNECT_TO_PEER => CODE_CONNECT_TO_PEER =>
try_read_from_packet!(ConnectToPeerResponse, packet),
ServerResponse::ConnectToPeerResponse(
try!(packet.read_value())
),
CODE_LOGIN => CODE_LOGIN =>
try_read_from_packet!(LoginResponse, packet),
ServerResponse::LoginResponse(
try!(packet.read_value())
),
CODE_PEER_ADDRESS => CODE_PEER_ADDRESS =>
try_read_from_packet!(PeerAddressResponse, packet),
ServerResponse::PeerAddressResponse(
try!(packet.read_value())
),
CODE_PRIVILEGED_USERS => CODE_PRIVILEGED_USERS =>
try_read_from_packet!(PrivilegedUsersResponse, packet),
ServerResponse::PrivilegedUsersResponse(
try!(packet.read_value())
),
CODE_ROOM_JOIN => CODE_ROOM_JOIN =>
try_read_from_packet!(RoomJoinResponse, packet),
ServerResponse::RoomJoinResponse(
try!(packet.read_value())
),
CODE_ROOM_LEAVE => CODE_ROOM_LEAVE =>
try_read_from_packet!(RoomLeaveResponse, packet),
ServerResponse::RoomLeaveResponse(
try!(packet.read_value())
),
CODE_ROOM_LIST => CODE_ROOM_LIST =>
try_read_from_packet!(RoomListResponse, packet),
ServerResponse::RoomListResponse(
try!(packet.read_value())
),
CODE_ROOM_MESSAGE => CODE_ROOM_MESSAGE =>
try_read_from_packet!(RoomMessageResponse, packet),
ServerResponse::RoomMessageResponse(
try!(packet.read_value())
),
CODE_ROOM_TICKERS => CODE_ROOM_TICKERS =>
try_read_from_packet!(RoomTickersResponse, packet),
ServerResponse::RoomTickersResponse(
try!(packet.read_value())
),
CODE_ROOM_USER_JOINED => CODE_ROOM_USER_JOINED =>
try_read_from_packet!(RoomUserJoinedResponse, packet),
ServerResponse::RoomUserJoinedResponse(
try!(packet.read_value())
),
CODE_ROOM_USER_LEFT => CODE_ROOM_USER_LEFT =>
try_read_from_packet!(RoomUserLeftResponse, packet),
ServerResponse::RoomUserLeftResponse(
try!(packet.read_value())
),
CODE_USER_STATUS => CODE_USER_STATUS =>
try_read_from_packet!(UserStatusResponse, packet),
ServerResponse::UserStatusResponse(
try!(packet.read_value())
),
CODE_WISHLIST_INTERVAL => CODE_WISHLIST_INTERVAL =>
try_read_from_packet!(WishlistIntervalResponse, packet),
ServerResponse::WishlistIntervalResponse(
try!(packet.read_value())
),
CODE_PARENT_MIN_SPEED => CODE_PARENT_MIN_SPEED =>
try_read_from_packet!(ParentMinSpeedResponse, packet),
ServerResponse::ParentMinSpeedResponse(
try!(packet.read_value())
),
CODE_PARENT_SPEED_RATIO => CODE_PARENT_SPEED_RATIO =>
try_read_from_packet!(ParentSpeedRatioResponse, packet),
ServerResponse::ParentSpeedRatioResponse(
try!(packet.read_value())
),
code => ServerResponse::UnknownResponse(code), code => ServerResponse::UnknownResponse(code),
}; };
let bytes_remaining = packet.bytes_remaining(); let bytes_remaining = packet.bytes_remaining();
if bytes_remaining > 0 { if bytes_remaining > 0 {
warn!("Packet with code {} contains {} extra bytes", warn!("Packet with code {} contains {} extra bytes",
packet.code(), bytes_remaining)
code, bytes_remaining)
} }
Ok(resp) Ok(resp)
} }


+ 11
- 17
src/proto/stream.rs View File

@ -9,7 +9,7 @@ use mio;
use mio::TryRead; use mio::TryRead;
use super::constants::*; use super::constants::*;
use super::packet::{Packet, ReadFromPacket};
use super::packet::{MutPacket, Packet, ReadFromPacket, WriteToPacket};
/*========* /*========*
* PARSER * * PARSER *
@ -279,22 +279,16 @@ impl<T, U> Stream<T, U>
} }
/// The stream has been notified. /// The stream has been notified.
pub fn on_notify(&mut self, mut bytes: Vec<u8>) -> Intent {
self.queue.push_back(OutBuf::from(bytes));
pub fn on_notify<V>(&mut self, payload: V) -> Intent
where V: WriteToPacket
{
let mut packet = MutPacket::new();
let result = packet.write_value(payload);
if let Err(e) = result {
error!("Error writing payload to packet: {}", e);
return Intent::Done
}
self.queue.push_back(OutBuf::from(packet.into_bytes()));
Intent::Continue(mio::EventSet::readable() | mio::EventSet::writable()) Intent::Continue(mio::EventSet::readable() | mio::EventSet::writable())
} }
} }
impl<T, U> io::Write for Stream<T, U>
where T: io::Read + io::Write + mio::Evented,
U: SendPacket
{
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.stream.write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.stream.flush()
}
}

Loading…
Cancel
Save