diff --git a/src/client.rs b/src/client.rs index c226f8a..20e9ea9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,12 +1,20 @@ -use std::io; +use std::sync::mpsc::Receiver; -use mio::{EventLoop, EventSet, Handler, PollOpt, Token}; -use mio::tcp::TcpStream; +use mio::Sender; use config; -use proto::{PacketStream}; use proto::server::*; +#[derive(Debug)] +pub enum Request { + ServerRequest(ServerRequest), +} + +#[derive(Debug)] +pub enum Response { + ServerResponse(ServerResponse), +} + #[derive(Debug, Clone, Copy)] enum State { NotLoggedIn, @@ -14,64 +22,44 @@ enum State { LoggedIn, } -#[derive(Debug)] pub struct Client { state: State, - - token_counter: usize, - - server_token: Token, - server_stream: PacketStream, - server_interest: EventSet, + tx: Sender, + rx: Receiver, } impl Client { - pub fn new(server_stream: PacketStream) -> Self { - let token_counter = 0; + pub fn new(tx: Sender, rx: Receiver) -> Self { Client { state: State::NotLoggedIn, - token_counter: token_counter, - server_token: Token(token_counter), - server_stream: server_stream, - server_interest: EventSet::writable() | EventSet::readable(), - } - } - - pub fn server_writable(&mut self) { - match self.state { - State::NotLoggedIn => { - info!("Logging in..."); - self.state = State::LoggingIn; - self.server_interest = EventSet::readable(); - let request = ServerRequest::LoginRequest(LoginRequest::new( - config::USERNAME, - config::PASSWORD, - config::VER_MAJOR, - config::VER_MINOR, - ).unwrap()); - self.server_stream.try_write(request.to_packet().unwrap()) - .unwrap(); - }, - - _ => () + tx: tx, + rx: rx, } } - pub fn server_readable(&mut self) { - match self.server_stream.try_read() { - Ok(Some(packet)) => { - match ServerResponse::from_packet(packet) { - Ok(response) => - self.handle_server_response(response), - - Err(e) => - error!("Error while parsing server packet: {}", e), - } - }, - - Ok(None) => (), - - Err(e) => error!("Error while reading server packet: {:?}", e), + pub fn run(&mut self) { + info!("Logging in..."); + self.state = State::LoggingIn; + let server_request = ServerRequest::LoginRequest(LoginRequest::new( + config::USERNAME, + config::PASSWORD, + config::VER_MAJOR, + config::VER_MINOR, + ).unwrap()); + self.tx.send(Request::ServerRequest(server_request)).unwrap(); + + loop { + let response = match self.rx.recv() { + Ok(response) => response, + Err(e) => { + error!("Error receiving response: {}", e); + break; + }, + }; + match response { + Response::ServerResponse(server_response) => + self.handle_server_response(server_response), + } } } @@ -94,15 +82,6 @@ impl Client { } } - pub fn register_all(&self, event_loop: &mut EventLoop) - -> io::Result<()> - { - try!(self.server_stream.register( - event_loop, self.server_token, self.server_interest, - PollOpt::edge())); - Ok(()) - } - fn handle_login_response(&mut self, login: LoginResponse) { match self.state { State::LoggingIn => { @@ -150,25 +129,3 @@ impl Client { response.users.len()); } } - -impl Handler for Client { - type Timeout = (); - type Message = (); - - fn ready(&mut self, event_loop: &mut EventLoop, - token: Token, event_set: EventSet) { - if token == self.server_token { - if event_set.is_writable() { - self.server_writable(); - } - if event_set.is_readable() { - self.server_readable(); - } - self.server_stream.reregister( - event_loop, token, self.server_interest, - PollOpt::edge() | PollOpt::oneshot()).unwrap(); - } else { - unreachable!("Unknown token!"); - } - } -} diff --git a/src/control.rs b/src/control.rs index 934d919..aed7308 100644 --- a/src/control.rs +++ b/src/control.rs @@ -5,8 +5,6 @@ use rustc_serialize::json; use mio::tcp::TcpStream; -use websocket::{Server, Message}; - #[derive(RustcDecodable, RustcEncodable)] pub enum ControlRequest { LoginRequest(LoginRequest), diff --git a/src/handler.rs b/src/handler.rs new file mode 100644 index 0000000..694c9e2 --- /dev/null +++ b/src/handler.rs @@ -0,0 +1,166 @@ +use std::collections::VecDeque; +use std::io; +use std::sync::mpsc::Sender; + +use mio::{EventLoop, EventSet, Handler, PollOpt, Token}; +use mio::tcp::TcpStream; + +use client::{Request, Response}; +use proto::{Packet, PacketStream}; +use proto::server::*; + +struct TokenCounter { + counter: usize, +} + +impl TokenCounter { + fn new() -> Self { + TokenCounter { + counter: 0, + } + } + + fn next(&mut self) -> Token { + self.counter += 1; + Token(self.counter - 1) + } +} + +pub struct ConnectionHandler { + token_counter: TokenCounter, + + server_token: Token, + server_stream: PacketStream, + server_queue: VecDeque, + + client_tx: Sender, +} + +impl ConnectionHandler { + pub fn new( + server_tcp_stream: TcpStream, client_tx: Sender, + event_loop: &mut EventLoop) -> Self + { + let mut token_counter = TokenCounter::new(); + let server_token = token_counter.next(); + + let event_set = EventSet::readable(); + let poll_opt = PollOpt::edge() | PollOpt::oneshot(); + + let server_stream = PacketStream::new(server_tcp_stream); + server_stream.register(event_loop, server_token, event_set, poll_opt) + .unwrap(); + + ConnectionHandler { + token_counter: token_counter, + + server_token: server_token, + server_stream: server_stream, + server_queue: VecDeque::new(), + + client_tx: client_tx, + } + } + + fn read_server_once(&mut self) -> io::Result { + let packet = match try!(self.server_stream.try_read()) { + Some(packet) => packet, + None => return Ok(false), + }; + + let server_response = try!(ServerResponse::from_packet(packet)); + match self.client_tx.send(Response::ServerResponse(server_response)) { + Ok(()) => Ok(true), + Err(e) => Err(io::Error::new( + io::ErrorKind::Other, + format!("Send failed on client_tx channel: {}", e))), + + } + } + + fn write_server_once(&mut self) -> io::Result { + let mut packet = match self.server_queue.pop_front() { + Some(packet) => packet, + None => return Ok(false), + }; + + match try!(self.server_stream.try_write(&mut packet)) { + Some(()) => Ok(true), + None => { + self.server_queue.push_front(packet); + Ok(false) + } + } + } + + fn notify_server( + &mut self, event_loop: &mut EventLoop, request: ServerRequest) + -> io::Result<()> + { + let packet = try!(request.to_packet()); + self.server_queue.push_back(packet); + Ok(()) + } + + fn reregister_server(&mut self, event_loop: &mut EventLoop) { + let event_set = if self.server_queue.len() > 0 { + EventSet::readable() | EventSet::writable() + } else { + EventSet::readable() + }; + let poll_opt = PollOpt::edge() | PollOpt::oneshot(); + self.server_stream.reregister( + event_loop, self.server_token, event_set, poll_opt).unwrap(); + } +} + +impl Handler for ConnectionHandler { + type Timeout = (); + type Message = Request; + + fn ready(&mut self, event_loop: &mut EventLoop, + token: Token, event_set: EventSet) + { + if token == self.server_token { + if event_set.is_writable() { + loop { + match self.write_server_once() { + Ok(true) => (), + Ok(false) => break, + Err(e) => { + error!("Error writing server: {}", e); + break; + } + } + } + } + if event_set.is_readable() { + loop { + match self.read_server_once() { + Ok(true) => (), + Ok(false) => break, + Err(e) => { + error!("Error reading server: {}", e); + break; + } + } + } + } + self.reregister_server(event_loop); + } else { + unreachable!("Unknown token!"); + } + } + + fn notify(&mut self, event_loop: &mut EventLoop, request: Request) { + match request { + Request::ServerRequest(server_request) => { + match self.notify_server(event_loop, server_request) { + Ok(()) => (), + Err(e) => error!("Error processing server request: {}", e), + } + self.reregister_server(event_loop); + } + } + } +} diff --git a/src/main.rs b/src/main.rs index b17c30f..2f12007 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod client; mod config; mod control; +mod handler; mod proto; extern crate byteorder; @@ -13,12 +14,14 @@ extern crate websocket; use std::io; use std::net::ToSocketAddrs; +use std::sync::mpsc::channel; +use std::thread; use mio::EventLoop; use mio::tcp::TcpStream; -use proto::PacketStream; use client::Client; +use handler::ConnectionHandler; fn connect(hostname: &str, port: u16) -> io::Result { for sock_addr in try!((hostname, port).to_socket_addrs()) { @@ -40,9 +43,14 @@ fn main() { let mut event_loop = EventLoop::new().unwrap(); - let packet_stream = PacketStream::new(stream); - let mut server_conn = Client::new(packet_stream); - server_conn.register_all(&mut event_loop).unwrap(); + let (tx, rx) = channel(); - event_loop.run(&mut server_conn).unwrap(); + let mut handler = ConnectionHandler::new(stream, tx, &mut event_loop); + + let mut client = Client::new(event_loop.channel(), rx); + thread::spawn(move || { + client.run(); + }); + + event_loop.run(&mut handler).unwrap(); } diff --git a/src/proto/packet.rs b/src/proto/packet.rs index da98a28..940c692 100644 --- a/src/proto/packet.rs +++ b/src/proto/packet.rs @@ -119,13 +119,13 @@ impl Packet { } - pub fn finalize(mut self) -> Vec { + pub fn as_slice(&mut self) -> &[u8] { let bytes_len = (self.bytes.len() - U32_SIZE) as u32; { let mut first_word = &mut self.bytes[..U32_SIZE]; first_word.write_u32::(bytes_len).unwrap(); } - self.bytes + &self.bytes } } @@ -213,8 +213,8 @@ impl PacketStream { } } - pub fn try_write(&mut self, packet: Packet) -> io::Result> { - match try!(self.stream.try_write(&packet.finalize())) { + pub fn try_write(&mut self, packet: &mut Packet) -> io::Result> { + match try!(self.stream.try_write(packet.as_slice())) { None => Ok(None), Some(_) => Ok(Some(())) }