Browse Source

Split protocol and control channels in Client.

wip
Titouan Rigoudy 9 years ago
parent
commit
acf4a14914
4 changed files with 64 additions and 45 deletions
  1. +38
    -18
      src/client.rs
  2. +17
    -17
      src/control.rs
  3. +3
    -6
      src/handler.rs
  4. +6
    -4
      src/main.rs

+ 38
- 18
src/client.rs View File

@ -7,8 +7,9 @@ use control::{ControlRequest, ControlResponse};
use proto::{Response, Request};
use proto::server::*;
pub enum IncomingMessage {
ProtoResponse(Response),
#[derive(Debug)]
enum IncomingMessage {
ServerResponse(ServerResponse),
ControlRequest(ControlRequest),
}
@ -22,24 +23,27 @@ enum State {
pub struct Client {
state: State,
rx: mpsc::Receiver<IncomingMessage>,
proto_tx: mio::Sender<Request>,
proto_rx: mpsc::Receiver<Response>,
control_tx: mpsc::Sender<ControlResponse>,
control_rx: mpsc::Receiver<ControlRequest>,
}
impl Client {
pub fn new(
rx: mpsc::Receiver<IncomingMessage>,
proto_tx: mio::Sender<Request>,
control_tx: mpsc::Sender<ControlResponse>)
proto_rx: mpsc::Receiver<Response>,
control_tx: mpsc::Sender<ControlResponse>,
control_rx: mpsc::Receiver<ControlRequest>)
-> Self
{
Client {
state: State::NotLoggedIn,
rx: rx,
proto_tx: proto_tx,
proto_rx: proto_rx,
control_tx: control_tx,
control_rx: control_rx,
}
}
@ -55,20 +59,36 @@ impl Client {
self.proto_tx.send(Request::ServerRequest(server_request)).unwrap();
loop {
match self.rx.recv() {
Ok(IncomingMessage::ProtoResponse(
Response::ServerResponse(server_response))) => {
self.handle_server_response(server_response);
},
match self.recv() {
IncomingMessage::ServerResponse(response) =>
self.handle_server_response(response),
Ok(IncomingMessage::ControlRequest(control_request)) => {
warn!("Unhandled control request: {:?}", control_request);
},
IncomingMessage::ControlRequest(request) =>
self.handle_control_request(request),
}
}
}
Err(e) => {
error!("Error receiving response: {}", e);
fn recv(&mut self) -> IncomingMessage {
let proto_rx = &self.proto_rx;
let control_rx = &self.control_rx;
select! {
result = proto_rx.recv() =>
match result.unwrap() {
Response::ServerResponse(server_response) =>
IncomingMessage::ServerResponse(server_response),
},
}
result = control_rx.recv() =>
IncomingMessage::ControlRequest(result.unwrap())
}
}
fn handle_control_request(&mut self, request: ControlRequest) {
match request {
_ => {
error!("Unhandled control request: {:?}", request);
},
}
}


+ 17
- 17
src/control.rs View File

@ -25,7 +25,7 @@ enum Error {
IOError(io::Error),
JSONEncoderError(json::EncoderError),
JSONDecoderError(json::DecoderError),
SendError(mpsc::SendError<client::IncomingMessage>),
SendError(mpsc::SendError<ControlRequest>),
Utf8Error(str::Utf8Error),
WebSocketError(websocket::result::WebSocketError),
}
@ -67,8 +67,8 @@ impl From<json::DecoderError> for Error {
}
}
impl From<mpsc::SendError<client::IncomingMessage>> for Error {
fn from(err: mpsc::SendError<client::IncomingMessage>) -> Self {
impl From<mpsc::SendError<ControlRequest>> for Error {
fn from(err: mpsc::SendError<ControlRequest>) -> Self {
Error::SendError(err)
}
}
@ -86,12 +86,12 @@ impl From<websocket::result::WebSocketError> for Error {
}
pub struct Controller {
client_tx: mpsc::Sender<client::IncomingMessage>,
client_tx: mpsc::Sender<ControlRequest>,
client_rx: mpsc::Receiver<ControlResponse>,
}
impl Controller {
pub fn new(tx: mpsc::Sender<client::IncomingMessage>,
pub fn new(tx: mpsc::Sender<ControlRequest>,
rx: mpsc::Receiver<ControlResponse>)
-> Self
{
@ -108,6 +108,7 @@ impl Controller {
info!("Controller bound to {}:{}", host, port);
loop {
info!("Waiting for controller client");
let client = match Self::try_get_client(&mut server) {
Ok(client) => client,
Err(e) => {
@ -128,6 +129,8 @@ impl Controller {
Self::sender_loop(sender, &mut self.client_rx, sender_rx);
handle.join();
info!("Controller client disconnected");
}
}
@ -144,7 +147,7 @@ impl Controller {
fn receiver_loop(
mut receiver: WebSocketReceiver,
client_tx: mpsc::Sender<client::IncomingMessage>,
client_tx: mpsc::Sender<ControlRequest>,
sender_tx: mpsc::Sender<()>)
{
for message_result in receiver.incoming_messages() {
@ -180,14 +183,12 @@ impl Controller {
fn handle_text_message(
payload_bytes: &[u8],
client_tx: &mpsc::Sender<client::IncomingMessage>)
client_tx: &mpsc::Sender<ControlRequest>)
-> Result<(), Error>
{
let payload = try!(str::from_utf8(payload_bytes));
let control_request = try!(json::decode(payload));
let message = client::IncomingMessage::ControlRequest(control_request);
try!(client_tx.send(message));
try!(client_tx.send(control_request));
Ok(())
}
@ -232,26 +233,25 @@ impl Controller {
#[derive(Debug, RustcDecodable, RustcEncodable)]
pub enum ControlRequest {
LoginRequest(LoginRequest),
LoginStatusRequest(LoginStatusRequest),
}
#[derive(Debug, RustcDecodable, RustcEncodable)]
pub enum ControlResponse {
LoginResponse(LoginResponse),
LoginStatusResponse(LoginStatusResponse),
}
#[derive(Debug, RustcDecodable, RustcEncodable)]
pub struct LoginRequest {
username: String,
password: String,
}
pub struct LoginStatusRequest;
#[derive(Debug, RustcDecodable, RustcEncodable)]
pub enum LoginResponse {
pub enum LoginStatusResponse {
LoginOk {
username: String,
motd: String,
},
LoginFail {
username: String,
reason: String,
}
}

+ 3
- 6
src/handler.rs View File

@ -5,7 +5,6 @@ use std::sync::mpsc::Sender;
use mio::{EventLoop, EventSet, Handler, PollOpt, Token};
use mio::tcp::TcpStream;
use client::IncomingMessage;
use proto::{Packet, PacketStream, Request, Response};
use proto::server::*;
@ -33,12 +32,12 @@ pub struct ConnectionHandler {
server_stream: PacketStream<TcpStream>,
server_queue: VecDeque<Packet>,
client_tx: Sender<IncomingMessage>,
client_tx: Sender<Response>,
}
impl ConnectionHandler {
pub fn new(
server_tcp_stream: TcpStream, client_tx: Sender<IncomingMessage>,
server_tcp_stream: TcpStream, client_tx: Sender<Response>,
event_loop: &mut EventLoop<Self>) -> Self
{
let mut token_counter = TokenCounter::new();
@ -95,9 +94,7 @@ impl ConnectionHandler {
};
let server_response = try!(ServerResponse::from_packet(packet));
let message = IncomingMessage::ProtoResponse(
Response::ServerResponse(server_response));
match self.client_tx.send(message) {
match self.client_tx.send(Response::ServerResponse(server_response)) {
Ok(()) => Ok(true),
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,


+ 6
- 4
src/main.rs View File

@ -47,18 +47,20 @@ fn main() {
let mut event_loop = EventLoop::new().unwrap();
let (client_tx, client_rx) = channel();
let (handler_to_client_tx, handler_to_client_rx) = channel();
let (control_to_client_tx, control_to_client_rx) = channel();
let (client_to_control_tx, client_to_control_rx) = channel();
let client_to_handler_tx = event_loop.channel();
let mut handler =
ConnectionHandler::new(stream, client_tx.clone(), &mut event_loop);
ConnectionHandler::new(stream, handler_to_client_tx, &mut event_loop);
let mut client = Client::new(
client_rx, client_to_handler_tx, client_to_control_tx);
client_to_handler_tx, handler_to_client_rx,
client_to_control_tx, control_to_client_rx);
let mut controller =
Controller::new(client_tx, client_to_control_rx);
Controller::new(control_to_client_tx, client_to_control_rx);
thread::spawn(move || controller.run());
thread::spawn(move || event_loop.run(&mut handler).unwrap());


Loading…
Cancel
Save