|
|
@ -1,10 +1,9 @@ |
|
|
use std::io;
|
|
|
use std::io;
|
|
|
use std::io::{Read, Write};
|
|
|
|
|
|
use std::sync::mpsc;
|
|
|
use std::sync::mpsc;
|
|
|
|
|
|
use std::str;
|
|
|
use std::thread;
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
|
use rustc_serialize::json;
|
|
|
use rustc_serialize::json;
|
|
|
use mio::tcp::TcpStream;
|
|
|
|
|
|
use websocket;
|
|
|
use websocket;
|
|
|
use websocket::{Receiver, Sender};
|
|
|
use websocket::{Receiver, Sender};
|
|
|
|
|
|
|
|
|
@ -20,6 +19,17 @@ type WebSocketSender = |
|
|
type WebSocketClient =
|
|
|
type WebSocketClient =
|
|
|
websocket::Client<websocket::DataFrame, WebSocketSender, WebSocketReceiver>;
|
|
|
websocket::Client<websocket::DataFrame, WebSocketSender, WebSocketReceiver>;
|
|
|
|
|
|
|
|
|
|
|
|
enum Error {
|
|
|
|
|
|
IOError(io::Error),
|
|
|
|
|
|
WebSocketError(websocket::result::WebSocketError),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl From<io::Error> for Error {
|
|
|
|
|
|
fn from(err: io::Error) -> Self {
|
|
|
|
|
|
Error::IOError(err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
pub struct Controller {
|
|
|
pub struct Controller {
|
|
|
client_tx: mpsc::Sender<client::IncomingMessage>,
|
|
|
client_tx: mpsc::Sender<client::IncomingMessage>,
|
|
|
client_rx: mpsc::Receiver<ControlResponse>,
|
|
|
client_rx: mpsc::Receiver<ControlResponse>,
|
|
|
@ -39,27 +49,30 @@ impl Controller { |
|
|
pub fn run(&mut self) {
|
|
|
pub fn run(&mut self) {
|
|
|
let host = config::CONTROL_HOST;
|
|
|
let host = config::CONTROL_HOST;
|
|
|
let port = config::CONTROL_PORT;
|
|
|
let port = config::CONTROL_PORT;
|
|
|
|
|
|
let mut server = websocket::Server::bind((host, port)).unwrap();
|
|
|
|
|
|
info!("Controller bound to {}:{}", host, port);
|
|
|
|
|
|
|
|
|
loop {
|
|
|
loop {
|
|
|
let client = Self::get_client(host, port);
|
|
|
|
|
|
|
|
|
let client = match Self::try_get_client(&mut server) {
|
|
|
|
|
|
Ok(client) => client,
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
error!("Error accepting control connection: {}", e);
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
};
|
|
|
info!("Controller client connected");
|
|
|
info!("Controller client connected");
|
|
|
let (mut sender, mut receiver) = client.split();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let (sender, receiver) = client.split();
|
|
|
|
|
|
let (sender_tx, sender_rx) = mpsc::channel();
|
|
|
|
|
|
|
|
|
let tx = self.client_tx.clone();
|
|
|
let tx = self.client_tx.clone();
|
|
|
thread::spawn(move || {
|
|
|
|
|
|
Self::receiver_loop(receiver, tx);
|
|
|
|
|
|
|
|
|
let handle = thread::spawn(move || {
|
|
|
|
|
|
Self::receiver_loop(receiver, tx, sender_tx);
|
|
|
});
|
|
|
});
|
|
|
Self::sender_loop(sender, &mut self.client_rx);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn get_client(host: &str, port: u16) -> WebSocketClient
|
|
|
|
|
|
{
|
|
|
|
|
|
let mut server = websocket::Server::bind((host, port)).unwrap();
|
|
|
|
|
|
info!("Controller bound to {}:{}", host, port);
|
|
|
|
|
|
loop {
|
|
|
|
|
|
match Self::try_get_client(&mut server) {
|
|
|
|
|
|
Ok(client) => return client,
|
|
|
|
|
|
Err(e) => error!("Error accepting control connection: {}", e),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
Self::sender_loop(sender, &mut self.client_rx, sender_rx);
|
|
|
|
|
|
|
|
|
|
|
|
handle.join();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -76,43 +89,101 @@ impl Controller { |
|
|
|
|
|
|
|
|
fn receiver_loop(
|
|
|
fn receiver_loop(
|
|
|
mut receiver: WebSocketReceiver,
|
|
|
mut receiver: WebSocketReceiver,
|
|
|
tx: mpsc::Sender<client::IncomingMessage>)
|
|
|
|
|
|
|
|
|
client_tx: mpsc::Sender<client::IncomingMessage>,
|
|
|
|
|
|
sender_tx: mpsc::Sender<()>)
|
|
|
{
|
|
|
{
|
|
|
for message_result in receiver.incoming_messages() {
|
|
|
for message_result in receiver.incoming_messages() {
|
|
|
let message: websocket::Message = match message_result {
|
|
|
|
|
|
|
|
|
let message : websocket::message::Message = match message_result {
|
|
|
Ok(message) => message,
|
|
|
Ok(message) => message,
|
|
|
Err(e) => {
|
|
|
Err(e) => {
|
|
|
warn!("Error receiving control message {}", e);
|
|
|
|
|
|
|
|
|
warn!("Error receving websocket message: {}", e);
|
|
|
continue;
|
|
|
continue;
|
|
|
},
|
|
|
|
|
|
|
|
|
}
|
|
|
};
|
|
|
};
|
|
|
let payload = match message.opcode {
|
|
|
|
|
|
|
|
|
match message.opcode {
|
|
|
websocket::message::Type::Text =>
|
|
|
websocket::message::Type::Text =>
|
|
|
String::from_utf8(message.payload.into_owned()).unwrap(),
|
|
|
|
|
|
|
|
|
Self::handle_text_message(&message.payload, &client_tx),
|
|
|
|
|
|
|
|
|
code => {
|
|
|
|
|
|
warn!("Unhandled websocket message type: {:?}", code);
|
|
|
|
|
|
continue;
|
|
|
|
|
|
},
|
|
|
|
|
|
};
|
|
|
|
|
|
match json::decode(&payload) {
|
|
|
|
|
|
Ok(control_request) => {
|
|
|
|
|
|
debug!("Received control request: {:?}", control_request);
|
|
|
|
|
|
tx.send(client::IncomingMessage::ControlRequest(
|
|
|
|
|
|
control_request));
|
|
|
|
|
|
},
|
|
|
|
|
|
Err(e) => warn!("Error decoding control request: {}", e),
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
websocket::message::Type::Close => break,
|
|
|
|
|
|
|
|
|
|
|
|
code => warn!("Unhandled websocket message with code {:?}",
|
|
|
|
|
|
code),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
info!("Shutting down websocket receiver");
|
|
|
|
|
|
receiver.shutdown().unwrap();
|
|
|
|
|
|
// Notify sender that the websocket is closed
|
|
|
|
|
|
sender_tx.send(());
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn handle_text_message(
|
|
|
|
|
|
payload_bytes: &[u8],
|
|
|
|
|
|
client_tx: &mpsc::Sender<client::IncomingMessage>)
|
|
|
|
|
|
{
|
|
|
|
|
|
let payload = match str::from_utf8(payload_bytes) {
|
|
|
|
|
|
Ok(payload) => payload,
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
warn!("Invalid UTF8 payload: {}", e);
|
|
|
|
|
|
return;
|
|
|
|
|
|
},
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
let control_request = match json::decode(payload) {
|
|
|
|
|
|
Ok(control_request) => control_request,
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
warn!("Invalid JSON payload: {}", e);
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
let message = client::IncomingMessage::ControlRequest(control_request);
|
|
|
|
|
|
match client_tx.send(message) {
|
|
|
|
|
|
Ok(()) => (),
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
warn!("Error sending control request to client: {}", e);
|
|
|
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
fn sender_loop(
|
|
|
fn sender_loop(
|
|
|
mut sender: WebSocketSender, rx: &mut mpsc::Receiver<ControlResponse>)
|
|
|
|
|
|
|
|
|
mut sender: WebSocketSender,
|
|
|
|
|
|
client_rx: &mut mpsc::Receiver<ControlResponse>,
|
|
|
|
|
|
sender_rx: mpsc::Receiver<()>)
|
|
|
{
|
|
|
{
|
|
|
for control_response in rx.iter() {
|
|
|
|
|
|
let encoded = json::encode(&control_response).unwrap();
|
|
|
|
|
|
let message = websocket::Message::text(encoded);
|
|
|
|
|
|
sender.send_message(&message).unwrap();
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
|
select! {
|
|
|
|
|
|
_ = sender_rx.recv() => break,
|
|
|
|
|
|
|
|
|
|
|
|
response_result = client_rx.recv() => {
|
|
|
|
|
|
match response_result {
|
|
|
|
|
|
Ok(response) =>
|
|
|
|
|
|
Self::send_response(&mut sender, response),
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
error!("Error receving from client channel: {}", e);
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
info!("Shutting down websocket sender");
|
|
|
|
|
|
sender.shutdown_all().unwrap();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn send_response(sender: &mut WebSocketSender, response: ControlResponse) {
|
|
|
|
|
|
let message = match json::encode(&response) {
|
|
|
|
|
|
Ok(encoded) => websocket::Message::text(encoded),
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
error!("Error encoding control_response to JSON: {}", e);
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
};
|
|
|
|
|
|
match sender.send_message(&message) {
|
|
|
|
|
|
Ok(()) => (),
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
error!("Error sending message to control client: {}", e);
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|