From 1177802dbeb7956fb026e3530717f7717e012029 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Thu, 18 Feb 2016 16:09:58 +0100 Subject: [PATCH] Refactor ServerConnection to own stream. --- src/main.rs | 28 ++++++------ src/proto/mod.rs | 4 +- src/proto/{connection.rs => packet.rs} | 47 ++++++++----------- src/server.rs | 63 +++++++++++--------------- 4 files changed, 61 insertions(+), 81 deletions(-) rename src/proto/{connection.rs => packet.rs} (84%) diff --git a/src/main.rs b/src/main.rs index 9c669a0..0f9b9cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,23 +13,20 @@ use std::net::ToSocketAddrs; use mio::{EventLoop, EventSet, Handler, PollOpt, Token}; use mio::tcp::TcpStream; -use proto::Connection; +use proto::PacketStream; use server::ServerConnection; const SERVER_TOKEN: Token = Token(0); #[derive(Debug)] struct ConnectionHandler { - server_conn: Connection, - server_stream: TcpStream, + server_conn: ServerConnection, } impl ConnectionHandler { - fn new(server_conn: Connection, server_stream: TcpStream) - -> Self { + fn new(server_conn: ServerConnection) -> Self { ConnectionHandler{ server_conn: server_conn, - server_stream: server_stream, } } } @@ -42,12 +39,14 @@ impl Handler for ConnectionHandler { token: Token, event_set: EventSet) { match token { - SERVER_TOKEN => + SERVER_TOKEN => { if event_set.is_readable() { - self.server_conn.ready_to_read(&mut self.server_stream) - } else { - self.server_conn.ready_to_write(&mut self.server_stream) - }, + self.server_conn.server_readable(); + } + if event_set.is_writable() { + self.server_conn.server_writable(); + } + }, _ => unreachable!("Unknown token"), } @@ -76,10 +75,11 @@ fn main() { &stream, SERVER_TOKEN, EventSet::readable() | EventSet::writable(), - PollOpt::edge()).unwrap(); + PollOpt::level()).unwrap(); - let server_conn = Connection::new(ServerConnection::new()); - let mut handler = ConnectionHandler::new(server_conn, stream); + let packet_stream = PacketStream::new(stream); + let server_conn = ServerConnection::new(packet_stream); + let mut handler = ConnectionHandler::new(server_conn); event_loop.run(&mut handler).unwrap(); } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index de4f585..8700ca3 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,4 +1,4 @@ -mod connection; +mod packet; pub mod server; -pub use self::connection::{Connection, Packet, Peer}; +pub use self::packet::{PacketStream, Packet}; diff --git a/src/proto/connection.rs b/src/proto/packet.rs similarity index 84% rename from src/proto/connection.rs rename to src/proto/packet.rs index 100353e..9e9b8f5 100644 --- a/src/proto/connection.rs +++ b/src/proto/packet.rs @@ -4,7 +4,7 @@ use std::io::{Cursor, Read, Write}; use std::mem; use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt}; -use mio::TryRead; +use mio::{TryRead, TryWrite}; use mio::tcp::TcpStream; const MAX_PACKET_SIZE: usize = 1 << 20; // 1 MiB @@ -119,13 +119,6 @@ impl io::Read for Packet { } } - - -pub trait Peer { - fn read_packet(&mut self) -> Option; - fn write_packet(&mut self, packet: Packet); -} - #[derive(Debug, Clone, Copy)] enum State { ReadingLength, @@ -133,39 +126,37 @@ enum State { } #[derive(Debug)] -pub struct Connection { +pub struct PacketStream { + stream: T, state: State, num_bytes_left: usize, buffer: Vec, - peer: T, } -impl Connection { +impl PacketStream { - pub fn new(peer: T) -> Self { - Connection { + pub fn new(stream: T) -> Self { + PacketStream { + stream: stream, state: State::ReadingLength, num_bytes_left: U32_SIZE, buffer: vec![0; U32_SIZE], - peer: peer, } } - pub fn ready_to_read(&mut self, stream: &mut TcpStream) { + pub fn try_read(&mut self) -> io::Result> { let offset = self.buffer.len() - self.num_bytes_left; - match stream.try_read(&mut self.buffer[offset..]) { - Ok(None) => (), + match try!(self.stream.try_read(&mut self.buffer[offset..])) { + None => (), - Ok(Some(num_bytes_read)) => { + Some(num_bytes_read) => { assert!(num_bytes_read <= self.num_bytes_left); self.num_bytes_left -= num_bytes_read; }, - - Err(e) => error!("Could not read stream: {:?}", e), } if self.num_bytes_left > 0 { - return; + return Ok(None); } match self.state { @@ -178,6 +169,7 @@ impl Connection { self.state = State::ReadingPacket; self.num_bytes_left = message_len; self.buffer.extend(repeat(0).take(message_len)); + self.try_read() }, State::ReadingPacket => { @@ -185,18 +177,15 @@ impl Connection { self.num_bytes_left = U32_SIZE; let new_buffer = vec![0;U32_SIZE]; let old_buffer = mem::replace(&mut self.buffer, new_buffer); - self.peer.write_packet(Packet::from_raw_parts(old_buffer)); + Ok(Some(Packet::from_raw_parts(old_buffer))) } } } - pub fn ready_to_write(&mut self, stream: &mut TcpStream) { - match self.peer.read_packet() { - Some(packet) => { - stream.write(&packet.finalize()).unwrap(); - () - }, - None => (), + pub fn try_write(&mut self, packet: Packet) -> io::Result> { + match try!(self.stream.try_write(&packet.finalize())) { + None => Ok(None), + Some(_) => Ok(Some(())) } } } diff --git a/src/server.rs b/src/server.rs index dd1d4e5..c68e4d5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,9 +1,10 @@ use std::io; +use std::io::{Read, Write}; use std::net::Ipv4Addr; use config; -use proto::{Peer, Packet}; +use proto::{Packet, PacketStream}; use proto::server::{ LoginRequest, LoginResponse, @@ -19,42 +20,53 @@ enum State { } #[derive(Debug)] -pub struct ServerConnection { +pub struct ServerConnection { state: State, + server_stream: PacketStream, } -impl ServerConnection { - pub fn new() -> Self { +impl ServerConnection { + pub fn new(server_stream: PacketStream) -> Self { ServerConnection { state: State::NotLoggedIn, + server_stream: server_stream, } } - fn read_request(&mut self) -> Option { + pub fn server_writable(&mut self) { match self.state { State::NotLoggedIn => { println!("Logging in..."); self.state = State::LoggingIn; - Some(ServerRequest::LoginRequest(LoginRequest::new( + let request = ServerRequest::LoginRequest(LoginRequest::new( config::USERNAME, config::PASSWORD, config::VER_MAJOR, config::VER_MINOR, - ).unwrap())) + ).unwrap()); + self.server_stream.try_write(request.to_packet().unwrap()); }, - _ => None + _ => () } } - fn write_response(&mut self, response: ServerResponse) { - match response { - ServerResponse::LoginResponse(login) => { - self.handle_login(login); - }, - ServerResponse::UnknownResponse(code, packet) => { - println!("Unknown packet code {}", code); + pub fn server_readable(&mut self) { + match self.server_stream.try_read() { + Ok(Some(packet)) => { + match ServerResponse::from_packet(packet).unwrap() { + ServerResponse::LoginResponse(login) => { + self.handle_login(login); + }, + ServerResponse::UnknownResponse(code, packet) => { + println!("Unknown packet code {}", code); + }, + } }, + + Ok(None) => (), + + Err(e) => error!("Could not read packet from server: {:?}", e), } } @@ -95,24 +107,3 @@ impl ServerConnection { } } } - -impl Peer for ServerConnection { - fn read_packet(&mut self) -> Option { - match self.read_request() { - Some(request) => { - match request.to_packet() { - Ok(packet) => Some(packet), - Err(e) => unimplemented!(), - } - }, - None => None - } - } - - fn write_packet(&mut self, mut packet: Packet) { - match ServerResponse::from_packet(packet) { - Ok(response) => self.write_response(response), - Err(e) => unimplemented!(), - } - } -}