Browse Source

Replace ws dependency with tokio-tungstenite.

wip
Titouan Rigoudy 4 years ago
parent
commit
0f081a7a80
5 changed files with 299 additions and 245 deletions
  1. +227
    -105
      Cargo.lock
  2. +1
    -1
      client/Cargo.toml
  3. +50
    -121
      client/src/control/ws.rs
  4. +0
    -2
      client/src/executor.rs
  5. +21
    -16
      client/src/main.rs

+ 227
- 105
Cargo.lock View File

@ -71,10 +71,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "bitflags"
version = "0.4.0"
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bitflags"
@ -82,6 +82,15 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "byteorder"
version = "0.5.3"
@ -94,12 +103,6 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c129aff112dcc562970abb69e2508b40850dd24c274761bb50fb8a0067ba6c27"
[[package]]
name = "bytes"
version = "0.4.12"
@ -136,7 +139,7 @@ checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term",
"atty",
"bitflags 1.2.1",
"bitflags",
"strsim",
"textwrap",
"unicode-width",
@ -149,7 +152,16 @@ version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
dependencies = [
"bitflags 1.2.1",
"bitflags",
]
[[package]]
name = "cpufeatures"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef"
dependencies = [
"libc",
]
[[package]]
@ -219,6 +231,15 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "encoding"
version = "0.2.33"
@ -300,7 +321,7 @@ checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
dependencies = [
"atty",
"humantime",
"log 0.4.14",
"log",
"regex",
"termcolor",
]
@ -311,6 +332,16 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191"
dependencies = [
"matches",
"percent-encoding",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
@ -323,7 +354,7 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
dependencies = [
"bitflags 1.2.1",
"bitflags",
"fuchsia-zircon-sys",
]
@ -439,6 +470,27 @@ version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
[[package]]
name = "generic-array"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -448,6 +500,17 @@ dependencies = [
"libc",
]
[[package]]
name = "http"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11"
dependencies = [
"bytes 1.0.1",
"fnv",
"itoa",
]
[[package]]
name = "httparse"
version = "1.4.1"
@ -462,9 +525,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "idna"
version = "0.1.5"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e"
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
dependencies = [
"matches",
"unicode-bidi",
@ -544,15 +607,6 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
dependencies = [
"log 0.4.14",
]
[[package]]
name = "log"
version = "0.4.14"
@ -589,23 +643,6 @@ dependencies = [
"autocfg 1.0.1",
]
[[package]]
name = "mio"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a637d1ca14eacae06296a008fa7ad955347e34efcb5891cfd8ba05491a37907e"
dependencies = [
"bytes 0.3.0",
"libc",
"log 0.3.9",
"miow 0.1.5",
"net2",
"nix",
"slab 0.1.3",
"time",
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.6.23"
@ -618,7 +655,7 @@ dependencies = [
"iovec",
"kernel32-sys",
"libc",
"log 0.4.14",
"log",
"miow 0.2.2",
"net2",
"slab 0.4.3",
@ -632,7 +669,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16"
dependencies = [
"libc",
"log 0.4.14",
"log",
"miow 0.3.7",
"ntapi",
"winapi 0.3.9",
@ -649,18 +686,6 @@ dependencies = [
"mio 0.6.23",
]
[[package]]
name = "miow"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e690c5df6b2f60acd45d56378981e827ff8295562fc8d34f573deb267a59cd1"
dependencies = [
"kernel32-sys",
"net2",
"winapi 0.2.8",
"ws2_32-sys",
]
[[package]]
name = "miow"
version = "0.2.2"
@ -693,16 +718,6 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "nix"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfb3ddedaa14746434a02041940495bf11325c22f6d36125d3bdd56090d50a79"
dependencies = [
"bitflags 0.4.0",
"libc",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -728,6 +743,12 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "parking_lot"
version = "0.8.0"
@ -808,9 +829,29 @@ dependencies = [
[[package]]
name = "percent-encoding"
version = "1.0.1"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
@ -824,6 +865,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@ -885,9 +932,9 @@ checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"
dependencies = [
"autocfg 0.1.7",
"libc",
"rand_chacha",
"rand_chacha 0.1.1",
"rand_core 0.4.2",
"rand_hc",
"rand_hc 0.1.0",
"rand_isaac",
"rand_jitter",
"rand_os",
@ -896,6 +943,18 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "rand"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_core 0.6.3",
"rand_hc 0.3.1",
]
[[package]]
name = "rand_chacha"
version = "0.1.1"
@ -906,6 +965,16 @@ dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.3",
]
[[package]]
name = "rand_core"
version = "0.3.1"
@ -921,6 +990,15 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.1.0"
@ -930,6 +1008,15 @@ dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_hc"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
dependencies = [
"rand_core 0.6.3",
]
[[package]]
name = "rand_isaac"
version = "0.1.1"
@ -1004,7 +1091,7 @@ version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee"
dependencies = [
"bitflags 1.2.1",
"bitflags",
]
[[package]]
@ -1117,12 +1204,16 @@ dependencies = [
]
[[package]]
name = "sha1"
version = "0.1.1"
name = "sha-1"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a307a40d5834140e4213a6952483b84e9ad53bdcab918b7335a6e305e505a53c"
checksum = "1a0c8611594e2ab4ebbf06ec7cbbf0a99450b8570e96cbf5188b5d5f6ef18d81"
dependencies = [
"byteorder 1.4.3",
"block-buffer",
"cfg-if 1.0.0",
"cpufeatures",
"digest",
"opaque-debug",
]
[[package]]
@ -1134,12 +1225,6 @@ dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d807fd58c4181bbabed77cb3b891ba9748241a552bcc5be698faaebefc54f46e"
[[package]]
name = "slab"
version = "0.2.0"
@ -1177,7 +1262,7 @@ dependencies = [
"crossbeam-channel",
"env_logger",
"futures 0.3.15",
"log 0.4.14",
"log",
"mio 0.6.23",
"parking_lot 0.8.0",
"serde",
@ -1190,7 +1275,7 @@ dependencies = [
"tokio-codec",
"tokio-core",
"tokio-io",
"ws",
"tokio-tungstenite",
]
[[package]]
@ -1205,7 +1290,7 @@ dependencies = [
"encoding_rs",
"env_logger",
"futures 0.3.15",
"log 0.4.14",
"log",
"mio 0.6.23",
"parking_lot 0.8.0",
"rust-crypto",
@ -1373,7 +1458,7 @@ dependencies = [
"bytes 0.4.12",
"futures 0.1.31",
"iovec",
"log 0.4.14",
"log",
"mio 0.6.23",
"scoped-tls",
"tokio 0.1.22",
@ -1422,7 +1507,7 @@ checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"
dependencies = [
"bytes 0.4.12",
"futures 0.1.31",
"log 0.4.14",
"log",
]
[[package]]
@ -1445,7 +1530,7 @@ dependencies = [
"crossbeam-utils 0.7.2",
"futures 0.1.31",
"lazy_static",
"log 0.4.14",
"log",
"mio 0.6.23",
"num_cpus",
"parking_lot 0.9.0",
@ -1490,7 +1575,7 @@ dependencies = [
"crossbeam-utils 0.7.2",
"futures 0.1.31",
"lazy_static",
"log 0.4.14",
"log",
"num_cpus",
"slab 0.4.3",
"tokio-executor",
@ -1508,6 +1593,19 @@ dependencies = [
"tokio-executor",
]
[[package]]
name = "tokio-tungstenite"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8"
dependencies = [
"futures-util",
"log",
"pin-project",
"tokio 1.8.1",
"tungstenite",
]
[[package]]
name = "tokio-udp"
version = "0.1.6"
@ -1516,7 +1614,7 @@ checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82"
dependencies = [
"bytes 0.4.12",
"futures 0.1.31",
"log 0.4.14",
"log",
"mio 0.6.23",
"tokio-codec",
"tokio-io",
@ -1533,7 +1631,7 @@ dependencies = [
"futures 0.1.31",
"iovec",
"libc",
"log 0.4.14",
"log",
"mio 0.6.23",
"mio-uds",
"tokio-codec",
@ -1541,6 +1639,31 @@ dependencies = [
"tokio-reactor",
]
[[package]]
name = "tungstenite"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5"
dependencies = [
"base64",
"byteorder 1.4.3",
"bytes 1.0.1",
"http",
"httparse",
"log",
"rand 0.8.4",
"sha-1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06"
[[package]]
name = "unicode-bidi"
version = "0.3.5"
@ -1573,21 +1696,34 @@ checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "url"
version = "1.7.2"
version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a"
checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
dependencies = [
"form_urlencoded",
"idna",
"matches",
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
@ -1637,20 +1773,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "ws"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d6b1eb1f4f17a260f294bbd4e00426c101785d5e03d9527032aba063f040d5"
dependencies = [
"httparse",
"log 0.3.9",
"mio 0.5.1",
"rand 0.3.23",
"sha1",
"url",
]
[[package]]
name = "ws2_32-sys"
version = "0.2.1"


+ 1
- 1
client/Cargo.toml View File

@ -24,4 +24,4 @@ tokio = { version = "1", features = ["full"] }
tokio-core = "^0.1"
tokio-io = "^0.1"
tokio-codec = "^0.1"
ws = "^0.4"
tokio-tungstenite = "0.15"

+ 50
- 121
client/src/control/ws.rs View File

@ -1,133 +1,62 @@
use futures::StreamExt;
use solstice_proto::config;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use ws;
use crate::control::request::*;
use crate::control::response::*;
use crate::dispatcher::Message;
/// This error is returned when a `Sender` fails to send a control request.
#[derive(Debug, Error)]
pub enum SendError {
#[error("error encoding response to JSON: {0}")]
JsonError(#[from] serde_json::Error),
#[error("error sending response on socket: {0}")]
WebSocketError(#[from] ws::Error),
}
/// This struct is used to send control responses to the controller.
/// It encapsulates the websocket connection so as to isolate clients from
/// the underlying implementation.
#[derive(Clone, Debug)]
pub struct Sender {
sender: ws::Sender,
}
impl Sender {
/// Queues up a control response to be sent to the controller.
pub fn send(&mut self, response: Response) -> Result<(), SendError> {
let encoded = serde_json::to_string(&response)?;
self.sender.send(encoded)?;
Ok(())
}
}
/// This struct handles a single websocket connection.
#[derive(Debug)]
struct Handler {
/// The channel on which to send requests to the client.
client_tx: mpsc::UnboundedSender<Message>,
/// The channel on which to send messages to the controller.
socket_tx: ws::Sender,
}
impl Handler {
fn send_to_client(&self, request: Request) -> ws::Result<()> {
self
.client_tx
.send(Message::ControlRequest(request))
.map_err(|err| {
error!("Error sending notification to client: {}", err);
ws::Error::new(ws::ErrorKind::Internal, "")
})
}
}
impl ws::Handler for Handler {
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
info!("Websocket open");
Ok(())
}
fn on_close(&mut self, code: ws::CloseCode, reason: &str) {
info!("Websocket closed: code: {:?}, reason: {:?}", code, reason);
}
fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
// Get the payload string.
let payload = match msg {
ws::Message::Text(payload) => payload,
ws::Message::Binary(_) => {
error!("Received binary websocket message from controller");
return Err(ws::Error::new(
ws::ErrorKind::Protocol,
"Binary message not supported",
));
}
};
// Decode the json control request.
let control_request = match serde_json::from_str(&payload) {
Ok(control_request) => control_request,
Err(e) => {
error!("Received invalid JSON message from controller: {}", e);
return Err(ws::Error::new(ws::ErrorKind::Protocol, "Invalid JSON"));
}
};
debug!("Received control request: {:?}", control_request);
// Send the control request to the client.
self.send_to_client(control_request)
}
}
/// Start listening on the socket address stored in configuration, and send
/// control notifications to the client through the given channel.
pub fn listen(
pub async fn listen(
client_tx: mpsc::UnboundedSender<Message>,
socket_rx: mpsc::Receiver<Response>,
) {
let websocket_result = ws::Builder::new()
.with_settings(ws::Settings {
max_connections: 1,
..ws::Settings::default()
})
.build(|socket_tx| Handler {
client_tx: client_tx.clone(),
socket_tx: socket_tx,
});
let websocket = match websocket_result {
Ok(websocket) => websocket,
Err(e) => {
error!("Unable to build websocket: {}", e);
return;
}
};
// TODO: Read responses off `socket_rx` and send them on `client_tx`. When
// the channel is closed, we should stop listening. In the meantime, we can
// at least spawn a task to pass along the responses.
let listen_result =
websocket.listen((config::CONTROL_HOST, config::CONTROL_PORT));
match listen_result {
Ok(_) => (),
Err(e) => {
error!("Unable to listen on websocket: {}", e);
}
_socket_rx: mpsc::Receiver<Response>,
) -> anyhow::Result<()> {
let address = format!("{}:{}", config::CONTROL_HOST, config::CONTROL_PORT);
let listener = TcpListener::bind(&address).await?;
info!("Listening for control connections on {}", address);
while let Ok((raw_stream, addr)) = listener.accept().await {
info!("Accepted control connection from {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
info!("WebSocket connection established: {}", addr);
let (_outgoing, mut incoming) = ws_stream.split();
while let Some(result) = incoming.next().await {
let message = match result {
Ok(message) => message,
Err(err) => {
warn!("Error reading control message: {}", err);
continue
},
};
let text = match message.to_text() {
Ok(text) => text,
Err(err) => {
warn!("Received non-text control message: {}", err);
continue
},
};
debug!("Received a text message from {}: {}", addr, text);
// Decode the json control request.
let control_request: Request = match serde_json::from_str(&text) {
Ok(control_request) => control_request,
Err(e) => {
warn!("Received invalid JSON message from controller: {}", e);
continue
}
};
debug!("Received control request: {:?}", control_request);
client_tx.send(Message::ControlRequest(control_request))?
}
}
Ok(())
}

+ 0
- 2
client/src/executor.rs View File

@ -61,9 +61,7 @@ impl Executor {
mod tests {
use std::sync::{Arc, Barrier};
use parking_lot::Mutex;
use solstice_proto::{User, UserStatus};
use tokio::sync::mpsc::channel;
use crate::context::{Context, ContextBundle};


+ 21
- 16
client/src/main.rs View File

@ -9,7 +9,6 @@ use crossbeam_channel;
use env_logger;
use futures::stream::StreamExt;
use log::info;
use parking_lot::Mutex;
use solstice_proto;
use tokio::net::TcpStream;
@ -115,7 +114,7 @@ async fn async_main() {
let (dispatcher_tx, mut dispatcher_rx) =
tokio::sync::mpsc::unbounded_channel();
let (control_tx, control_rx) = tokio::sync::mpsc::channel(100);
let (_control_tx, control_rx) = tokio::sync::mpsc::channel(100);
let client_task =
tokio::spawn(run_client(bundle.server_request_rx, dispatcher_tx.clone()));
@ -123,27 +122,33 @@ async fn async_main() {
let dispatcher = Dispatcher::new();
let executor = Executor::new(bundle.context);
let control_task = tokio::task::spawn_blocking(move || {
control::listen(dispatcher_tx, control_rx);
});
let control_task = control::listen(dispatcher_tx, control_rx);
while let Some(message) = dispatcher_rx.recv().await {
if let Some(job) = dispatcher.dispatch(message) {
executor.schedule(job);
}
}
let dispatch_task = async move {
while let Some(message) = dispatcher_rx.recv().await {
if let Some(job) = dispatcher.dispatch(message) {
executor.schedule(job);
}
}
let _context = tokio::task::spawn_blocking(move || executor.join())
.await
.unwrap();
tokio::task::spawn_blocking(move || executor.join()).await
};
tokio::select! {
result = control_task => match result {
Ok(()) => (),
Err(err) => error!("Control task failed: {}", err),
},
result = dispatch_task => match result {
Ok(context) => info!("Dispatch task ended with context: {:?}", context),
Err(err) => error!("Dispatch task failed: {}", err),
},
};
client_task
.await
.expect("Client task join error")
.expect("Client error");
// TODO: Send some signal that we want to wrap up to the controller.
control_task.await.unwrap();
}
#[tokio::main]


Loading…
Cancel
Save