Browse Source

Client now processes protocol and control messages.

wip
Titouan Rigoudy 9 years ago
parent
commit
7bd7b824b7
4 changed files with 36 additions and 37 deletions
  1. +17
    -16
      src/client.rs
  2. +6
    -4
      src/control.rs
  3. +6
    -4
      src/handler.rs
  4. +7
    -13
      src/main.rs

+ 17
- 16
src/client.rs View File

@ -7,6 +7,11 @@ use control::{ControlRequest, ControlResponse};
use proto::{Request, Response}; use proto::{Request, Response};
use proto::server::*; use proto::server::*;
pub enum IncomingMessage {
ServerResponse(ServerResponse),
ControlRequest(ControlRequest),
}
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
enum State { enum State {
NotLoggedIn, NotLoggedIn,
@ -17,27 +22,24 @@ enum State {
pub struct Client { pub struct Client {
state: State, state: State,
proto_tx: mio::Sender<Request>,
proto_rx: mpsc::Receiver<Response>,
rx: mpsc::Receiver<IncomingMessage>,
proto_tx: mio::Sender<Request>,
control_tx: mpsc::Sender<ControlResponse>, control_tx: mpsc::Sender<ControlResponse>,
control_rx: mpsc::Receiver<ControlRequest>,
} }
impl Client { impl Client {
pub fn new( pub fn new(
rx: mpsc::Receiver<IncomingMessage>,
proto_tx: mio::Sender<Request>, proto_tx: mio::Sender<Request>,
proto_rx: mpsc::Receiver<Response>,
control_tx: mpsc::Sender<ControlResponse>,
control_rx: mpsc::Receiver<ControlRequest>)
control_tx: mpsc::Sender<ControlResponse>)
-> Self -> Self
{ {
Client { Client {
state: State::NotLoggedIn, state: State::NotLoggedIn,
rx: rx,
proto_tx: proto_tx, proto_tx: proto_tx,
proto_rx: proto_rx,
control_tx: control_tx, control_tx: control_tx,
control_rx: control_rx,
} }
} }
@ -53,16 +55,15 @@ impl Client {
self.proto_tx.send(Request::ServerRequest(server_request)).unwrap(); self.proto_tx.send(Request::ServerRequest(server_request)).unwrap();
loop { loop {
let response = match self.proto_rx.recv() {
Ok(response) => response,
match self.rx.recv() {
Ok(IncomingMessage::ServerResponse(server_response)) => {
self.handle_server_response(server_response);
},
Ok(IncomingMessage::ControlRequest(control_request)) => {
warn!("Unhandled control request: {:?}", control_request);
},
Err(e) => { Err(e) => {
error!("Error receiving response: {}", e); error!("Error receiving response: {}", e);
break;
},
};
match response {
Response::ServerResponse(server_response) => {
self.handle_server_response(server_response);
}, },
} }
} }


+ 6
- 4
src/control.rs View File

@ -21,12 +21,12 @@ type WebSocketClient =
websocket::Client<websocket::DataFrame, WebSocketSender, WebSocketReceiver>; websocket::Client<websocket::DataFrame, WebSocketSender, WebSocketReceiver>;
pub struct Controller { pub struct Controller {
client_tx: mpsc::Sender<ControlRequest>,
client_tx: mpsc::Sender<client::IncomingMessage>,
client_rx: mpsc::Receiver<ControlResponse>, client_rx: mpsc::Receiver<ControlResponse>,
} }
impl Controller { impl Controller {
pub fn new(tx: mpsc::Sender<ControlRequest>,
pub fn new(tx: mpsc::Sender<client::IncomingMessage>,
rx: mpsc::Receiver<ControlResponse>) rx: mpsc::Receiver<ControlResponse>)
-> Self -> Self
{ {
@ -75,7 +75,8 @@ impl Controller {
} }
fn receiver_loop( fn receiver_loop(
mut receiver: WebSocketReceiver, tx: mpsc::Sender<ControlRequest>)
mut receiver: WebSocketReceiver,
tx: mpsc::Sender<client::IncomingMessage>)
{ {
for message_result in receiver.incoming_messages() { for message_result in receiver.incoming_messages() {
let message: websocket::Message = match message_result { let message: websocket::Message = match message_result {
@ -97,7 +98,8 @@ impl Controller {
match json::decode(&payload) { match json::decode(&payload) {
Ok(control_request) => { Ok(control_request) => {
debug!("Received control request: {:?}", control_request); debug!("Received control request: {:?}", control_request);
tx.send(control_request);
tx.send(client::IncomingMessage::ControlRequest(
control_request));
}, },
Err(e) => warn!("Error decoding control request: {}", e), Err(e) => warn!("Error decoding control request: {}", e),
}; };


+ 6
- 4
src/handler.rs View File

@ -5,7 +5,8 @@ use std::sync::mpsc::Sender;
use mio::{EventLoop, EventSet, Handler, PollOpt, Token}; use mio::{EventLoop, EventSet, Handler, PollOpt, Token};
use mio::tcp::TcpStream; use mio::tcp::TcpStream;
use proto::{Packet, PacketStream, Request, Response};
use client::IncomingMessage;
use proto::{Packet, PacketStream, Request};
use proto::server::*; use proto::server::*;
struct TokenCounter { struct TokenCounter {
@ -32,12 +33,12 @@ pub struct ConnectionHandler {
server_stream: PacketStream<TcpStream>, server_stream: PacketStream<TcpStream>,
server_queue: VecDeque<Packet>, server_queue: VecDeque<Packet>,
client_tx: Sender<Response>,
client_tx: Sender<IncomingMessage>,
} }
impl ConnectionHandler { impl ConnectionHandler {
pub fn new( pub fn new(
server_tcp_stream: TcpStream, client_tx: Sender<Response>,
server_tcp_stream: TcpStream, client_tx: Sender<IncomingMessage>,
event_loop: &mut EventLoop<Self>) -> Self event_loop: &mut EventLoop<Self>) -> Self
{ {
let mut token_counter = TokenCounter::new(); let mut token_counter = TokenCounter::new();
@ -94,7 +95,8 @@ impl ConnectionHandler {
}; };
let server_response = try!(ServerResponse::from_packet(packet)); let server_response = try!(ServerResponse::from_packet(packet));
match self.client_tx.send(Response::ServerResponse(server_response)) {
let message = IncomingMessage::ServerResponse(server_response);
match self.client_tx.send(message) {
Ok(()) => Ok(true), Ok(()) => Ok(true),
Err(e) => Err(io::Error::new( Err(e) => Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,


+ 7
- 13
src/main.rs View File

@ -44,28 +44,22 @@ fn main() {
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
let (handler_to_client_tx, handler_to_client_rx) = channel();
let mut handler =
ConnectionHandler::new(stream, handler_to_client_tx, &mut event_loop);
let (control_to_client_tx, control_to_client_rx) = channel();
let (client_tx, client_rx) = channel();
let (client_to_control_tx, client_to_control_rx) = channel(); let (client_to_control_tx, client_to_control_rx) = channel();
let client_to_handler_tx = event_loop.channel(); let client_to_handler_tx = event_loop.channel();
let mut handler =
ConnectionHandler::new(stream, client_tx.clone(), &mut event_loop);
let mut client = Client::new( let mut client = Client::new(
client_to_handler_tx, handler_to_client_rx,
client_to_control_tx, control_to_client_rx);
thread::spawn(move || {
client.run();
});
client_rx, client_to_handler_tx, client_to_control_tx);
thread::spawn(move || client.run());
let mut controller = let mut controller =
Controller::new(control_to_client_tx, client_to_control_rx);
Controller::new(client_tx, client_to_control_rx);
thread::spawn(move || { thread::spawn(move || {
controller.run(); controller.run();
}); });
event_loop.run(&mut handler).unwrap(); event_loop.run(&mut handler).unwrap();
} }

Loading…
Cancel
Save