diff --git a/Cargo.lock b/Cargo.lock index 9034004..f4976aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,10 +71,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] -name = "bitflags" -version = "0.4.0" +name = "base64" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "bitflags" @@ -82,6 +82,15 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "byteorder" version = "0.5.3" @@ -94,12 +103,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" -[[package]] -name = "bytes" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c129aff112dcc562970abb69e2508b40850dd24c274761bb50fb8a0067ba6c27" - [[package]] name = "bytes" version = "0.4.12" @@ -136,7 +139,7 @@ checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" dependencies = [ "ansi_term", "atty", - "bitflags 1.2.1", + "bitflags", "strsim", "textwrap", "unicode-width", @@ -149,7 +152,16 @@ version = "0.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" dependencies = [ - "bitflags 1.2.1", + "bitflags", +] + +[[package]] +name = "cpufeatures" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef" +dependencies = [ + "libc", ] [[package]] @@ -219,6 +231,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "encoding" version = "0.2.33" @@ -300,7 +321,7 @@ checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3" dependencies = [ "atty", "humantime", - "log 0.4.14", + "log", "regex", "termcolor", ] @@ -311,6 +332,16 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +dependencies = [ + "matches", + "percent-encoding", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -323,7 +354,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" dependencies = [ - "bitflags 1.2.1", + "bitflags", "fuchsia-zircon-sys", ] @@ -439,6 +470,27 @@ version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" +[[package]] +name = "generic-array" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "wasi", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -448,6 +500,17 @@ dependencies = [ "libc", ] +[[package]] +name = "http" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +dependencies = [ + "bytes 1.0.1", + "fnv", + "itoa", +] + [[package]] name = "httparse" version = "1.4.1" @@ -462,9 +525,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "idna" -version = "0.1.5" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" dependencies = [ "matches", "unicode-bidi", @@ -544,15 +607,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "log" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" -dependencies = [ - "log 0.4.14", -] - [[package]] name = "log" version = "0.4.14" @@ -589,23 +643,6 @@ dependencies = [ "autocfg 1.0.1", ] -[[package]] -name = "mio" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a637d1ca14eacae06296a008fa7ad955347e34efcb5891cfd8ba05491a37907e" -dependencies = [ - "bytes 0.3.0", - "libc", - "log 0.3.9", - "miow 0.1.5", - "net2", - "nix", - "slab 0.1.3", - "time", - "winapi 0.2.8", -] - [[package]] name = "mio" version = "0.6.23" @@ -618,7 +655,7 @@ dependencies = [ "iovec", "kernel32-sys", "libc", - "log 0.4.14", + "log", "miow 0.2.2", "net2", "slab 0.4.3", @@ -632,7 +669,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" dependencies = [ "libc", - "log 0.4.14", + "log", "miow 0.3.7", "ntapi", "winapi 0.3.9", @@ -649,18 +686,6 @@ dependencies = [ "mio 0.6.23", ] -[[package]] -name = "miow" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e690c5df6b2f60acd45d56378981e827ff8295562fc8d34f573deb267a59cd1" -dependencies = [ - "kernel32-sys", - "net2", - "winapi 0.2.8", - "ws2_32-sys", -] - [[package]] name = "miow" version = "0.2.2" @@ -693,16 +718,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "nix" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb3ddedaa14746434a02041940495bf11325c22f6d36125d3bdd56090d50a79" -dependencies = [ - "bitflags 0.4.0", - "libc", -] - [[package]] name = "ntapi" version = "0.3.6" @@ -728,6 +743,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + [[package]] name = "parking_lot" version = "0.8.0" @@ -808,9 +829,29 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "1.0.1" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "pin-project" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "pin-project-lite" @@ -824,6 +865,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ppv-lite86" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -885,9 +932,9 @@ checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" dependencies = [ "autocfg 0.1.7", "libc", - "rand_chacha", + "rand_chacha 0.1.1", "rand_core 0.4.2", - "rand_hc", + "rand_hc 0.1.0", "rand_isaac", "rand_jitter", "rand_os", @@ -896,6 +943,18 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "rand" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.3", + "rand_hc 0.3.1", +] + [[package]] name = "rand_chacha" version = "0.1.1" @@ -906,6 +965,16 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.3", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -921,6 +990,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + [[package]] name = "rand_hc" version = "0.1.0" @@ -930,6 +1008,15 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "rand_hc" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" +dependencies = [ + "rand_core 0.6.3", +] + [[package]] name = "rand_isaac" version = "0.1.1" @@ -1004,7 +1091,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee" dependencies = [ - "bitflags 1.2.1", + "bitflags", ] [[package]] @@ -1117,12 +1204,16 @@ dependencies = [ ] [[package]] -name = "sha1" -version = "0.1.1" +name = "sha-1" +version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a307a40d5834140e4213a6952483b84e9ad53bdcab918b7335a6e305e505a53c" +checksum = "1a0c8611594e2ab4ebbf06ec7cbbf0a99450b8570e96cbf5188b5d5f6ef18d81" dependencies = [ - "byteorder 1.4.3", + "block-buffer", + "cfg-if 1.0.0", + "cpufeatures", + "digest", + "opaque-debug", ] [[package]] @@ -1134,12 +1225,6 @@ dependencies = [ "libc", ] -[[package]] -name = "slab" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d807fd58c4181bbabed77cb3b891ba9748241a552bcc5be698faaebefc54f46e" - [[package]] name = "slab" version = "0.2.0" @@ -1177,7 +1262,7 @@ dependencies = [ "crossbeam-channel", "env_logger", "futures 0.3.15", - "log 0.4.14", + "log", "mio 0.6.23", "parking_lot 0.8.0", "serde", @@ -1190,7 +1275,7 @@ dependencies = [ "tokio-codec", "tokio-core", "tokio-io", - "ws", + "tokio-tungstenite", ] [[package]] @@ -1205,7 +1290,7 @@ dependencies = [ "encoding_rs", "env_logger", "futures 0.3.15", - "log 0.4.14", + "log", "mio 0.6.23", "parking_lot 0.8.0", "rust-crypto", @@ -1373,7 +1458,7 @@ dependencies = [ "bytes 0.4.12", "futures 0.1.31", "iovec", - "log 0.4.14", + "log", "mio 0.6.23", "scoped-tls", "tokio 0.1.22", @@ -1422,7 +1507,7 @@ checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" dependencies = [ "bytes 0.4.12", "futures 0.1.31", - "log 0.4.14", + "log", ] [[package]] @@ -1445,7 +1530,7 @@ dependencies = [ "crossbeam-utils 0.7.2", "futures 0.1.31", "lazy_static", - "log 0.4.14", + "log", "mio 0.6.23", "num_cpus", "parking_lot 0.9.0", @@ -1490,7 +1575,7 @@ dependencies = [ "crossbeam-utils 0.7.2", "futures 0.1.31", "lazy_static", - "log 0.4.14", + "log", "num_cpus", "slab 0.4.3", "tokio-executor", @@ -1508,6 +1593,19 @@ dependencies = [ "tokio-executor", ] +[[package]] +name = "tokio-tungstenite" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8" +dependencies = [ + "futures-util", + "log", + "pin-project", + "tokio 1.8.1", + "tungstenite", +] + [[package]] name = "tokio-udp" version = "0.1.6" @@ -1516,7 +1614,7 @@ checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" dependencies = [ "bytes 0.4.12", "futures 0.1.31", - "log 0.4.14", + "log", "mio 0.6.23", "tokio-codec", "tokio-io", @@ -1533,7 +1631,7 @@ dependencies = [ "futures 0.1.31", "iovec", "libc", - "log 0.4.14", + "log", "mio 0.6.23", "mio-uds", "tokio-codec", @@ -1541,6 +1639,31 @@ dependencies = [ "tokio-reactor", ] +[[package]] +name = "tungstenite" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5" +dependencies = [ + "base64", + "byteorder 1.4.3", + "bytes 1.0.1", + "http", + "httparse", + "log", + "rand 0.8.4", + "sha-1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" + [[package]] name = "unicode-bidi" version = "0.3.5" @@ -1573,21 +1696,34 @@ checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" [[package]] name = "url" -version = "1.7.2" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" +checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" dependencies = [ + "form_urlencoded", "idna", "matches", "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "vec_map" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "version_check" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" @@ -1637,20 +1773,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "ws" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d6b1eb1f4f17a260f294bbd4e00426c101785d5e03d9527032aba063f040d5" -dependencies = [ - "httparse", - "log 0.3.9", - "mio 0.5.1", - "rand 0.3.23", - "sha1", - "url", -] - [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/client/Cargo.toml b/client/Cargo.toml index a0b610c..4399350 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -24,4 +24,4 @@ tokio = { version = "1", features = ["full"] } tokio-core = "^0.1" tokio-io = "^0.1" tokio-codec = "^0.1" -ws = "^0.4" +tokio-tungstenite = "0.15" diff --git a/client/src/control/ws.rs b/client/src/control/ws.rs index 393b83a..23e0f63 100644 --- a/client/src/control/ws.rs +++ b/client/src/control/ws.rs @@ -1,133 +1,62 @@ +use futures::StreamExt; use solstice_proto::config; -use thiserror::Error; +use tokio::net::TcpListener; use tokio::sync::mpsc; -use ws; use crate::control::request::*; use crate::control::response::*; use crate::dispatcher::Message; -/// This error is returned when a `Sender` fails to send a control request. -#[derive(Debug, Error)] -pub enum SendError { - #[error("error encoding response to JSON: {0}")] - JsonError(#[from] serde_json::Error), - #[error("error sending response on socket: {0}")] - WebSocketError(#[from] ws::Error), -} - -/// This struct is used to send control responses to the controller. -/// It encapsulates the websocket connection so as to isolate clients from -/// the underlying implementation. -#[derive(Clone, Debug)] -pub struct Sender { - sender: ws::Sender, -} - -impl Sender { - /// Queues up a control response to be sent to the controller. - pub fn send(&mut self, response: Response) -> Result<(), SendError> { - let encoded = serde_json::to_string(&response)?; - self.sender.send(encoded)?; - Ok(()) - } -} - -/// This struct handles a single websocket connection. -#[derive(Debug)] -struct Handler { - /// The channel on which to send requests to the client. - client_tx: mpsc::UnboundedSender, - /// The channel on which to send messages to the controller. - socket_tx: ws::Sender, -} - -impl Handler { - fn send_to_client(&self, request: Request) -> ws::Result<()> { - self - .client_tx - .send(Message::ControlRequest(request)) - .map_err(|err| { - error!("Error sending notification to client: {}", err); - ws::Error::new(ws::ErrorKind::Internal, "") - }) - } -} - -impl ws::Handler for Handler { - fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { - info!("Websocket open"); - Ok(()) - } - - fn on_close(&mut self, code: ws::CloseCode, reason: &str) { - info!("Websocket closed: code: {:?}, reason: {:?}", code, reason); - } - - fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { - // Get the payload string. - let payload = match msg { - ws::Message::Text(payload) => payload, - ws::Message::Binary(_) => { - error!("Received binary websocket message from controller"); - return Err(ws::Error::new( - ws::ErrorKind::Protocol, - "Binary message not supported", - )); - } - }; - - // Decode the json control request. - let control_request = match serde_json::from_str(&payload) { - Ok(control_request) => control_request, - Err(e) => { - error!("Received invalid JSON message from controller: {}", e); - return Err(ws::Error::new(ws::ErrorKind::Protocol, "Invalid JSON")); - } - }; - - debug!("Received control request: {:?}", control_request); - - // Send the control request to the client. - self.send_to_client(control_request) - } -} - /// Start listening on the socket address stored in configuration, and send /// control notifications to the client through the given channel. -pub fn listen( +pub async fn listen( client_tx: mpsc::UnboundedSender, - socket_rx: mpsc::Receiver, -) { - let websocket_result = ws::Builder::new() - .with_settings(ws::Settings { - max_connections: 1, - ..ws::Settings::default() - }) - .build(|socket_tx| Handler { - client_tx: client_tx.clone(), - socket_tx: socket_tx, - }); - - let websocket = match websocket_result { - Ok(websocket) => websocket, - Err(e) => { - error!("Unable to build websocket: {}", e); - return; - } - }; - - // TODO: Read responses off `socket_rx` and send them on `client_tx`. When - // the channel is closed, we should stop listening. In the meantime, we can - // at least spawn a task to pass along the responses. - - let listen_result = - websocket.listen((config::CONTROL_HOST, config::CONTROL_PORT)); - - match listen_result { - Ok(_) => (), - Err(e) => { - error!("Unable to listen on websocket: {}", e); - } + _socket_rx: mpsc::Receiver, +) -> anyhow::Result<()> { + let address = format!("{}:{}", config::CONTROL_HOST, config::CONTROL_PORT); + let listener = TcpListener::bind(&address).await?; + + info!("Listening for control connections on {}", address); + + while let Ok((raw_stream, addr)) = listener.accept().await { + info!("Accepted control connection from {}", addr); + + let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; + info!("WebSocket connection established: {}", addr); + + let (_outgoing, mut incoming) = ws_stream.split(); + while let Some(result) = incoming.next().await { + let message = match result { + Ok(message) => message, + Err(err) => { + warn!("Error reading control message: {}", err); + continue + }, + }; + + let text = match message.to_text() { + Ok(text) => text, + Err(err) => { + warn!("Received non-text control message: {}", err); + continue + }, + }; + + debug!("Received a text message from {}: {}", addr, text); + + // Decode the json control request. + let control_request: Request = match serde_json::from_str(&text) { + Ok(control_request) => control_request, + Err(e) => { + warn!("Received invalid JSON message from controller: {}", e); + continue + } + }; + + debug!("Received control request: {:?}", control_request); + client_tx.send(Message::ControlRequest(control_request))? + } } + + Ok(()) } diff --git a/client/src/executor.rs b/client/src/executor.rs index e5e1c32..042a2d3 100644 --- a/client/src/executor.rs +++ b/client/src/executor.rs @@ -61,9 +61,7 @@ impl Executor { mod tests { use std::sync::{Arc, Barrier}; - use parking_lot::Mutex; use solstice_proto::{User, UserStatus}; - use tokio::sync::mpsc::channel; use crate::context::{Context, ContextBundle}; diff --git a/client/src/main.rs b/client/src/main.rs index a83a73d..520d244 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -9,7 +9,6 @@ use crossbeam_channel; use env_logger; use futures::stream::StreamExt; use log::info; -use parking_lot::Mutex; use solstice_proto; use tokio::net::TcpStream; @@ -115,7 +114,7 @@ async fn async_main() { let (dispatcher_tx, mut dispatcher_rx) = tokio::sync::mpsc::unbounded_channel(); - let (control_tx, control_rx) = tokio::sync::mpsc::channel(100); + let (_control_tx, control_rx) = tokio::sync::mpsc::channel(100); let client_task = tokio::spawn(run_client(bundle.server_request_rx, dispatcher_tx.clone())); @@ -123,27 +122,33 @@ async fn async_main() { let dispatcher = Dispatcher::new(); let executor = Executor::new(bundle.context); - let control_task = tokio::task::spawn_blocking(move || { - control::listen(dispatcher_tx, control_rx); - }); + let control_task = control::listen(dispatcher_tx, control_rx); - while let Some(message) = dispatcher_rx.recv().await { - if let Some(job) = dispatcher.dispatch(message) { - executor.schedule(job); - } - } + let dispatch_task = async move { + while let Some(message) = dispatcher_rx.recv().await { + if let Some(job) = dispatcher.dispatch(message) { + executor.schedule(job); + } + } - let _context = tokio::task::spawn_blocking(move || executor.join()) - .await - .unwrap(); + tokio::task::spawn_blocking(move || executor.join()).await + }; + + tokio::select! { + result = control_task => match result { + Ok(()) => (), + Err(err) => error!("Control task failed: {}", err), + }, + result = dispatch_task => match result { + Ok(context) => info!("Dispatch task ended with context: {:?}", context), + Err(err) => error!("Dispatch task failed: {}", err), + }, + }; client_task .await .expect("Client task join error") .expect("Client error"); - - // TODO: Send some signal that we want to wrap up to the controller. - control_task.await.unwrap(); } #[tokio::main]