Browse Source

Completely refactor architecture to use multiple threads.

wip
Titouan Rigoudy 9 years ago
parent
commit
31408265a1
5 changed files with 223 additions and 94 deletions
  1. +40
    -83
      src/client.rs
  2. +0
    -2
      src/control.rs
  3. +166
    -0
      src/handler.rs
  4. +13
    -5
      src/main.rs
  5. +4
    -4
      src/proto/packet.rs

+ 40
- 83
src/client.rs View File

@ -1,12 +1,20 @@
use std::io;
use std::sync::mpsc::Receiver;
use mio::{EventLoop, EventSet, Handler, PollOpt, Token};
use mio::tcp::TcpStream;
use mio::Sender;
use config;
use proto::{PacketStream};
use proto::server::*;
#[derive(Debug)]
pub enum Request {
ServerRequest(ServerRequest),
}
#[derive(Debug)]
pub enum Response {
ServerResponse(ServerResponse),
}
#[derive(Debug, Clone, Copy)]
enum State {
NotLoggedIn,
@ -14,64 +22,44 @@ enum State {
LoggedIn,
}
#[derive(Debug)]
pub struct Client {
state: State,
token_counter: usize,
server_token: Token,
server_stream: PacketStream<TcpStream>,
server_interest: EventSet,
tx: Sender<Request>,
rx: Receiver<Response>,
}
impl Client {
pub fn new(server_stream: PacketStream<TcpStream>) -> Self {
let token_counter = 0;
pub fn new(tx: Sender<Request>, rx: Receiver<Response>) -> Self {
Client {
state: State::NotLoggedIn,
token_counter: token_counter,
server_token: Token(token_counter),
server_stream: server_stream,
server_interest: EventSet::writable() | EventSet::readable(),
}
}
pub fn server_writable(&mut self) {
match self.state {
State::NotLoggedIn => {
info!("Logging in...");
self.state = State::LoggingIn;
self.server_interest = EventSet::readable();
let request = ServerRequest::LoginRequest(LoginRequest::new(
config::USERNAME,
config::PASSWORD,
config::VER_MAJOR,
config::VER_MINOR,
).unwrap());
self.server_stream.try_write(request.to_packet().unwrap())
.unwrap();
},
_ => ()
tx: tx,
rx: rx,
}
}
pub fn server_readable(&mut self) {
match self.server_stream.try_read() {
Ok(Some(packet)) => {
match ServerResponse::from_packet(packet) {
Ok(response) =>
self.handle_server_response(response),
Err(e) =>
error!("Error while parsing server packet: {}", e),
}
},
Ok(None) => (),
Err(e) => error!("Error while reading server packet: {:?}", e),
pub fn run(&mut self) {
info!("Logging in...");
self.state = State::LoggingIn;
let server_request = ServerRequest::LoginRequest(LoginRequest::new(
config::USERNAME,
config::PASSWORD,
config::VER_MAJOR,
config::VER_MINOR,
).unwrap());
self.tx.send(Request::ServerRequest(server_request)).unwrap();
loop {
let response = match self.rx.recv() {
Ok(response) => response,
Err(e) => {
error!("Error receiving response: {}", e);
break;
},
};
match response {
Response::ServerResponse(server_response) =>
self.handle_server_response(server_response),
}
}
}
@ -94,15 +82,6 @@ impl Client {
}
}
pub fn register_all<T: Handler>(&self, event_loop: &mut EventLoop<T>)
-> io::Result<()>
{
try!(self.server_stream.register(
event_loop, self.server_token, self.server_interest,
PollOpt::edge()));
Ok(())
}
fn handle_login_response(&mut self, login: LoginResponse) {
match self.state {
State::LoggingIn => {
@ -150,25 +129,3 @@ impl Client {
response.users.len());
}
}
impl Handler for Client {
type Timeout = ();
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<Self>,
token: Token, event_set: EventSet) {
if token == self.server_token {
if event_set.is_writable() {
self.server_writable();
}
if event_set.is_readable() {
self.server_readable();
}
self.server_stream.reregister(
event_loop, token, self.server_interest,
PollOpt::edge() | PollOpt::oneshot()).unwrap();
} else {
unreachable!("Unknown token!");
}
}
}

+ 0
- 2
src/control.rs View File

@ -5,8 +5,6 @@ use rustc_serialize::json;
use mio::tcp::TcpStream;
use websocket::{Server, Message};
#[derive(RustcDecodable, RustcEncodable)]
pub enum ControlRequest {
LoginRequest(LoginRequest),


+ 166
- 0
src/handler.rs View File

@ -0,0 +1,166 @@
use std::collections::VecDeque;
use std::io;
use std::sync::mpsc::Sender;
use mio::{EventLoop, EventSet, Handler, PollOpt, Token};
use mio::tcp::TcpStream;
use client::{Request, Response};
use proto::{Packet, PacketStream};
use proto::server::*;
struct TokenCounter {
counter: usize,
}
impl TokenCounter {
fn new() -> Self {
TokenCounter {
counter: 0,
}
}
fn next(&mut self) -> Token {
self.counter += 1;
Token(self.counter - 1)
}
}
pub struct ConnectionHandler {
token_counter: TokenCounter,
server_token: Token,
server_stream: PacketStream<TcpStream>,
server_queue: VecDeque<Packet>,
client_tx: Sender<Response>,
}
impl ConnectionHandler {
pub fn new(
server_tcp_stream: TcpStream, client_tx: Sender<Response>,
event_loop: &mut EventLoop<Self>) -> Self
{
let mut token_counter = TokenCounter::new();
let server_token = token_counter.next();
let event_set = EventSet::readable();
let poll_opt = PollOpt::edge() | PollOpt::oneshot();
let server_stream = PacketStream::new(server_tcp_stream);
server_stream.register(event_loop, server_token, event_set, poll_opt)
.unwrap();
ConnectionHandler {
token_counter: token_counter,
server_token: server_token,
server_stream: server_stream,
server_queue: VecDeque::new(),
client_tx: client_tx,
}
}
fn read_server_once(&mut self) -> io::Result<bool> {
let packet = match try!(self.server_stream.try_read()) {
Some(packet) => packet,
None => return Ok(false),
};
let server_response = try!(ServerResponse::from_packet(packet));
match self.client_tx.send(Response::ServerResponse(server_response)) {
Ok(()) => Ok(true),
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
format!("Send failed on client_tx channel: {}", e))),
}
}
fn write_server_once(&mut self) -> io::Result<bool> {
let mut packet = match self.server_queue.pop_front() {
Some(packet) => packet,
None => return Ok(false),
};
match try!(self.server_stream.try_write(&mut packet)) {
Some(()) => Ok(true),
None => {
self.server_queue.push_front(packet);
Ok(false)
}
}
}
fn notify_server(
&mut self, event_loop: &mut EventLoop<Self>, request: ServerRequest)
-> io::Result<()>
{
let packet = try!(request.to_packet());
self.server_queue.push_back(packet);
Ok(())
}
fn reregister_server(&mut self, event_loop: &mut EventLoop<Self>) {
let event_set = if self.server_queue.len() > 0 {
EventSet::readable() | EventSet::writable()
} else {
EventSet::readable()
};
let poll_opt = PollOpt::edge() | PollOpt::oneshot();
self.server_stream.reregister(
event_loop, self.server_token, event_set, poll_opt).unwrap();
}
}
impl Handler for ConnectionHandler {
type Timeout = ();
type Message = Request;
fn ready(&mut self, event_loop: &mut EventLoop<Self>,
token: Token, event_set: EventSet)
{
if token == self.server_token {
if event_set.is_writable() {
loop {
match self.write_server_once() {
Ok(true) => (),
Ok(false) => break,
Err(e) => {
error!("Error writing server: {}", e);
break;
}
}
}
}
if event_set.is_readable() {
loop {
match self.read_server_once() {
Ok(true) => (),
Ok(false) => break,
Err(e) => {
error!("Error reading server: {}", e);
break;
}
}
}
}
self.reregister_server(event_loop);
} else {
unreachable!("Unknown token!");
}
}
fn notify(&mut self, event_loop: &mut EventLoop<Self>, request: Request) {
match request {
Request::ServerRequest(server_request) => {
match self.notify_server(event_loop, server_request) {
Ok(()) => (),
Err(e) => error!("Error processing server request: {}", e),
}
self.reregister_server(event_loop);
}
}
}
}

+ 13
- 5
src/main.rs View File

@ -1,6 +1,7 @@
mod client;
mod config;
mod control;
mod handler;
mod proto;
extern crate byteorder;
@ -13,12 +14,14 @@ extern crate websocket;
use std::io;
use std::net::ToSocketAddrs;
use std::sync::mpsc::channel;
use std::thread;
use mio::EventLoop;
use mio::tcp::TcpStream;
use proto::PacketStream;
use client::Client;
use handler::ConnectionHandler;
fn connect(hostname: &str, port: u16) -> io::Result<TcpStream> {
for sock_addr in try!((hostname, port).to_socket_addrs()) {
@ -40,9 +43,14 @@ fn main() {
let mut event_loop = EventLoop::new().unwrap();
let packet_stream = PacketStream::new(stream);
let mut server_conn = Client::new(packet_stream);
server_conn.register_all(&mut event_loop).unwrap();
let (tx, rx) = channel();
event_loop.run(&mut server_conn).unwrap();
let mut handler = ConnectionHandler::new(stream, tx, &mut event_loop);
let mut client = Client::new(event_loop.channel(), rx);
thread::spawn(move || {
client.run();
});
event_loop.run(&mut handler).unwrap();
}

+ 4
- 4
src/proto/packet.rs View File

@ -119,13 +119,13 @@ impl Packet {
}
pub fn finalize(mut self) -> Vec<u8> {
pub fn as_slice(&mut self) -> &[u8] {
let bytes_len = (self.bytes.len() - U32_SIZE) as u32;
{
let mut first_word = &mut self.bytes[..U32_SIZE];
first_word.write_u32::<LittleEndian>(bytes_len).unwrap();
}
self.bytes
&self.bytes
}
}
@ -213,8 +213,8 @@ impl<T: Read + Write + Evented> PacketStream<T> {
}
}
pub fn try_write(&mut self, packet: Packet) -> io::Result<Option<()>> {
match try!(self.stream.try_write(&packet.finalize())) {
pub fn try_write(&mut self, packet: &mut Packet) -> io::Result<Option<()>> {
match try!(self.stream.try_write(packet.as_slice())) {
None => Ok(None),
Some(_) => Ok(Some(()))
}


Loading…
Cancel
Save