Browse Source

Connect control requests to executor in async mode.

wip
Titouan Rigoudy 4 years ago
parent
commit
ab70428930
5 changed files with 41 additions and 115 deletions
  1. +3
    -57
      client/src/client.rs
  2. +1
    -0
      client/src/context.rs
  3. +1
    -1
      client/src/control/mod.rs
  4. +24
    -48
      client/src/control/ws.rs
  5. +12
    -9
      client/src/main.rs

+ 3
- 57
client/src/client.rs View File

@ -16,9 +16,6 @@ use crate::user;
#[derive(Debug)]
enum IncomingMessage {
Proto(solstice_proto::Response),
#[allow(dead_code)]
ControlNotification(control::Notification),
}
#[derive(Debug)]
@ -52,11 +49,6 @@ pub struct Client {
proto_tx: mio::deprecated::Sender<solstice_proto::Request>,
proto_rx: crossbeam_channel::Receiver<solstice_proto::Response>,
control_tx: Option<control::Sender>,
#[allow(dead_code)]
control_rx: crossbeam_channel::Receiver<control::Notification>,
login_status: LoginStatus,
rooms: room::RoomMap,
@ -73,15 +65,11 @@ impl Client {
pub fn new(
proto_tx: mio::deprecated::Sender<solstice_proto::Request>,
proto_rx: crossbeam_channel::Receiver<solstice_proto::Response>,
control_rx: crossbeam_channel::Receiver<control::Notification>,
) -> Self {
Client {
proto_tx: proto_tx,
proto_rx: proto_rx,
control_tx: None,
control_rx: control_rx,
login_status: LoginStatus::Todo,
rooms: room::RoomMap::new(),
@ -122,10 +110,6 @@ impl Client {
IncomingMessage::Proto(response) => {
self.handle_proto_response(response)
}
IncomingMessage::ControlNotification(notif) => {
self.handle_control_notification(notif)
}
}
}
}
@ -156,53 +140,14 @@ impl Client {
/// Send a response to the controller client.
fn send_to_controller(&mut self, response: control::Response) {
#[allow(deprecated)]
let result = match self.control_tx {
None => {
// Silently drop control requests when controller is
// disconnected.
return;
}
Some(ref mut control_tx) => control_tx.send(response),
};
// If we failed to send, we assume it means that the other end of the
// channel has been dropped, i.e. the controller has disconnected.
// It may be that mio has died on us, in which case we will never see
// a controller again. If that happens, there would have probably been
// a panic anyway, so we might never hit this corner case.
if let Err(_) = result {
info!("Controller has disconnected.");
self.control_tx = None;
}
}
/*===============================*
* CONTROL NOTIFICATION HANDLING *
*===============================*/
fn handle_control_notification(&mut self, notif: control::Notification) {
match notif {
control::Notification::Connected(tx) => {
self.control_tx = Some(tx);
}
control::Notification::Disconnected => {
self.control_tx = None;
}
control::Notification::Error(e) => {
debug!("Control loop error: {}", e);
self.control_tx = None;
}
control::Notification::Request(req) => self.handle_control_request(req),
}
warn!("Cannot send control response: {:?}", response);
}
/*==========================*
* CONTROL REQUEST HANDLING *
*==========================*/
/*
fn handle_control_request(&mut self, request: control::Request) {
match request {
control::Request::LoginStatusRequest => {
@ -322,6 +267,7 @@ impl Client {
},
));
}
*/
/*=========================*
* PROTO RESPONSE HANDLING *


+ 1
- 0
client/src/context.rs View File

@ -23,6 +23,7 @@ pub struct Context {
/// Sender half of a channel used to send requests to the server.
pub server_request_tx: Sender<ServerRequest>,
// TODO: Add control response sender.
}
/// Convenience bundle for creating new `Context` structs.


+ 1
- 1
client/src/control/mod.rs View File

@ -4,4 +4,4 @@ mod ws;
pub use self::request::*;
pub use self::response::*;
pub use self::ws::{listen, Notification, SendError, Sender};
pub use self::ws::listen;

+ 24
- 48
client/src/control/ws.rs View File

@ -1,25 +1,11 @@
use crossbeam_channel;
use solstice_proto::config;
use thiserror::Error;
use tokio::sync::mpsc;
use ws;
use super::request::*;
use super::response::*;
/// This enum contains the possible notifications that the control loop will
/// send to the client.
#[derive(Debug)]
pub enum Notification {
/// A new controller has connected: control messages can now be sent on the
/// given channel.
Connected(Sender),
/// The controller has disconnected.
Disconnected,
/// An irretrievable error has arisen.
Error(String),
/// The controller has sent a request.
Request(Request),
}
use crate::control::request::*;
use crate::control::response::*;
use crate::dispatcher::Message;
/// This error is returned when a `Sender` fails to send a control request.
#[derive(Debug, Error)]
@ -50,37 +36,32 @@ impl Sender {
/// This struct handles a single websocket connection.
#[derive(Debug)]
struct Handler {
/// The channel on which to send notifications to the client.
client_tx: crossbeam_channel::Sender<Notification>,
/// The channel on which to send requests to the client.
client_tx: mpsc::UnboundedSender<Message>,
/// The channel on which to send messages to the controller.
socket_tx: ws::Sender,
}
impl Handler {
fn send_to_client(&self, notification: Notification) -> ws::Result<()> {
match self.client_tx.send(notification) {
Ok(()) => Ok(()),
Err(e) => {
error!("Error sending notification to client: {}", e);
Err(ws::Error::new(ws::ErrorKind::Internal, ""))
}
}
fn send_to_client(&self, request: Request) -> ws::Result<()> {
self
.client_tx
.send(Message::ControlRequest(request))
.map_err(|err| {
error!("Error sending notification to client: {}", err);
ws::Error::new(ws::ErrorKind::Internal, "")
})
}
}
impl ws::Handler for Handler {
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
info!("Websocket open");
self.send_to_client(Notification::Connected(Sender {
sender: self.socket_tx.clone(),
}))
Ok(())
}
fn on_close(&mut self, code: ws::CloseCode, reason: &str) {
info!("Websocket closed: code: {:?}, reason: {:?}", code, reason);
self
.send_to_client(Notification::Disconnected)
.unwrap_or(())
}
fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
@ -108,13 +89,16 @@ impl ws::Handler for Handler {
debug!("Received control request: {:?}", control_request);
// Send the control request to the client.
self.send_to_client(Notification::Request(control_request))
self.send_to_client(control_request)
}
}
/// Start listening on the socket address stored in configuration, and send
/// control notifications to the client through the given channel.
pub fn listen(client_tx: crossbeam_channel::Sender<Notification>) {
pub fn listen(
client_tx: mpsc::UnboundedSender<Message>,
socket_rx: mpsc::Receiver<Response>,
) {
let websocket_result = ws::Builder::new()
.with_settings(ws::Settings {
max_connections: 1,
@ -129,16 +113,14 @@ pub fn listen(client_tx: crossbeam_channel::Sender<Notification>) {
Ok(websocket) => websocket,
Err(e) => {
error!("Unable to build websocket: {}", e);
client_tx
.send(Notification::Error(format!(
"Unable to build websocket: {}",
e
)))
.unwrap();
return;
}
};
// TODO: Read responses off `socket_rx` and send them on `client_tx`. When
// the channel is closed, we should stop listening. In the meantime, we can
// at least spawn a task to pass along the responses.
let listen_result =
websocket.listen((config::CONTROL_HOST, config::CONTROL_PORT));
@ -146,12 +128,6 @@ pub fn listen(client_tx: crossbeam_channel::Sender<Notification>) {
Ok(_) => (),
Err(e) => {
error!("Unable to listen on websocket: {}", e);
client_tx
.send(Notification::Error(format!(
"Unable to listen on websocket: {}",
e
)))
.unwrap();
}
}
}

+ 12
- 9
client/src/main.rs View File

@ -40,16 +40,10 @@ fn old_main() {
};
let client_to_proto_tx = proto_agent.channel();
let (control_to_client_tx, control_to_client_rx) =
crossbeam_channel::unbounded();
let mut client = client::Client::new(
client_to_proto_tx,
proto_to_client_rx,
control_to_client_rx,
);
let mut client = client::Client::new(client_to_proto_tx, proto_to_client_rx);
thread::spawn(move || control::listen(control_to_client_tx));
// Run ws server.
thread::spawn(move || proto_agent.run().unwrap());
client.run();
}
@ -121,12 +115,18 @@ async fn async_main() {
let (dispatcher_tx, mut dispatcher_rx) =
tokio::sync::mpsc::unbounded_channel();
let (control_tx, control_rx) = tokio::sync::mpsc::channel(100);
let client_task =
tokio::spawn(run_client(bundle.server_request_rx, dispatcher_tx));
tokio::spawn(run_client(bundle.server_request_rx, dispatcher_tx.clone()));
let dispatcher = Dispatcher::new();
let executor = Executor::new(bundle.context);
let control_task = tokio::task::spawn_blocking(move || {
control::listen(dispatcher_tx, control_rx);
});
while let Some(message) = dispatcher_rx.recv().await {
if let Some(job) = dispatcher.dispatch(message) {
executor.schedule(job);
@ -141,6 +141,9 @@ async fn async_main() {
.await
.expect("Client task join error")
.expect("Client error");
// TODO: Send some signal that we want to wrap up to the controller.
control_task.await.unwrap();
}
#[tokio::main]


Loading…
Cancel
Save