From 57a8791c8508b9ef6695c53275d9fac26074a630 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 19 Jul 2021 07:51:17 -0400 Subject: [PATCH] Implement basic asynchronous main. --- Cargo.lock | 9 ++++ client/Cargo.toml | 3 ++ client/src/dispatcher.rs | 35 ++++++++----- client/src/main.rs | 108 +++++++++++++++++++++++++++++++++++---- proto/src/config.rs | 4 +- 5 files changed, 133 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5dbe65d..e357405 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,6 +20,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "anyhow" +version = "1.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595d3cfa7a60d4555cb5067b99f07142a08ea778de5cf993f7b75c7d8fabc486" + [[package]] name = "async-stream" version = "0.3.2" @@ -1122,9 +1128,12 @@ checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" name = "solstice-client" version = "0.1.0" dependencies = [ + "anyhow", + "async-stream", "clap", "crossbeam-channel", "env_logger", + "futures 0.3.15", "log 0.4.14", "mio 0.6.23", "parking_lot 0.8.0", diff --git a/client/Cargo.toml b/client/Cargo.toml index 98c390c..b50b252 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -5,9 +5,12 @@ authors = ["letitz"] edition = "2018" [dependencies] +anyhow = "1.0" +async-stream = "0.3" clap = "^2.33" crossbeam-channel = "^0.3" env_logger = "^0.8" +futures = "0.3" log = "^0.4" mio = "^0.6" parking_lot = "^0.8" diff --git a/client/src/dispatcher.rs b/client/src/dispatcher.rs index 2908623..b8ad9de 100644 --- a/client/src/dispatcher.rs +++ b/client/src/dispatcher.rs @@ -2,11 +2,13 @@ use std::fmt::Debug; +use log::warn; +use solstice_proto::server::ServerResponse; + use crate::context::Context; use crate::executor::Job; use crate::handlers::{LoginHandler, SetPrivilegedUsersHandler}; use crate::message_handler::MessageHandler; -use solstice_proto::server::ServerResponse; /// The type of messages dispatched by a dispatcher. #[derive(Debug)] @@ -54,21 +56,24 @@ impl Dispatcher { } /// Dispatches the given message by wrapping it with a handler. - pub fn dispatch(&self, message: Message) -> Box { + pub fn dispatch(&self, message: Message) -> Option> { match message { Message::ServerResponse(ServerResponse::LoginResponse(response)) => { - Box::new(DispatchedMessage { + Some(Box::new(DispatchedMessage { message: response, handler: LoginHandler::default(), - }) + })) } Message::ServerResponse(ServerResponse::PrivilegedUsersResponse( response, - )) => Box::new(DispatchedMessage { + )) => Some(Box::new(DispatchedMessage { message: response, handler: SetPrivilegedUsersHandler::default(), - }), - _ => panic!("Unimplemented"), + })), + Message::ServerResponse(response) => { + warn!("Unhandled server response: {:?}", response); + None + } } } } @@ -85,17 +90,19 @@ mod tests { #[test] fn dispatcher_privileged_users_response() { - Dispatcher::new().dispatch(into_message(server::PrivilegedUsersResponse { - users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()], - })); + assert!(Dispatcher::new() + .dispatch(into_message(server::PrivilegedUsersResponse { + users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()], + })) + .is_some()); } #[test] fn dispatcher_login_response() { - Dispatcher::new().dispatch(into_message( - server::LoginResponse::LoginFail { + assert!(Dispatcher::new() + .dispatch(into_message(server::LoginResponse::LoginFail { reason: "bleep bloop".to_string(), - }, - )); + },)) + .is_some()); } } diff --git a/client/src/main.rs b/client/src/main.rs index 75d41ad..a3266ca 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -7,24 +7,26 @@ use std::thread; use clap::{App, Arg}; use crossbeam_channel; use env_logger; +use futures::stream::{Stream, StreamExt}; +use log::info; use solstice_proto; +use tokio::net::TcpStream; mod client; -#[cfg(test)] mod context; mod control; -#[cfg(test)] mod dispatcher; -#[cfg(test)] mod executor; -#[cfg(test)] mod handlers; mod login; -#[cfg(test)] mod message_handler; mod room; mod user; +use context::Context; +use dispatcher::Dispatcher; +use executor::Executor; + fn old_main() { let (proto_to_client_tx, proto_to_client_rx) = crossbeam_channel::unbounded(); @@ -51,11 +53,95 @@ fn old_main() { client.run(); } -fn async_main() { - // TODO +// There is a risk of deadlock if we use two bounded channels here: +// +// - client task is blocked trying to send response to dispatcher +// - all dispatcher threads are blocked trying to send requests to client +// +// This stems from the fact that requests are only read from the channel and +// sent to the server when `inbound` is being polled, which is mutually +// exclusive with `dispatcher_tx.send()` being polled. +// +// This could be fixed in one of two ways, at least: +// +// - write `Client` interface in terms of channels, not streams +// - this would allow both receiving and sending tasks to run concurrently +// inside `Client` +// - in other words, sending the response on the dispatcher channel would +// run concurrently with sending requests to the server +// - use `FrameReader` / `FrameWriter` directly instead, and synchronize their +// behavior manually +// +async fn run_client( + mut request_rx: tokio::sync::mpsc::Receiver, + dispatcher_tx: tokio::sync::mpsc::UnboundedSender, +) -> anyhow::Result<()> { + let address = format!( + "{}:{}", + solstice_proto::config::SERVER_HOST, + solstice_proto::config::SERVER_PORT + ); + info!("Connecting to server at {}.", address); + let stream = TcpStream::connect(address).await?; + + let credentials = solstice_proto::server::Credentials::new( + solstice_proto::config::USERNAME.to_string(), + solstice_proto::config::PASSWORD.to_string(), + ) + .expect("Invalid credentials"); + + info!("Logging in to server."); + let client = solstice_proto::server::Client::new(stream) + .login(credentials) + .await?; + + let outbound = async_stream::stream! { + while let Some(request) = request_rx.recv().await { + yield request; + } + }; + + info!("Running client."); + let inbound = client.run(outbound); + tokio::pin!(inbound); + + while let Some(result) = inbound.next().await { + let response = result?; + dispatcher_tx.send(response.into())?; + } + + info!("Client finished running."); + Ok(()) +} + +async fn async_main() { + let (_server_request_tx, server_request_rx) = tokio::sync::mpsc::channel(100); + let (dispatcher_tx, mut dispatcher_rx) = + tokio::sync::mpsc::unbounded_channel(); + + let client_task = tokio::spawn(run_client(server_request_rx, dispatcher_tx)); + + let dispatcher = Dispatcher::new(); + let executor = Executor::new(Context::new()); + + while let Some(message) = dispatcher_rx.recv().await { + if let Some(job) = dispatcher.dispatch(message) { + executor.schedule(job); + } + } + + let _context = tokio::task::spawn_blocking(move || executor.join()) + .await + .unwrap(); + + client_task + .await + .expect("Client task join error") + .expect("Client error"); } -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let matches = App::new("solstice-client") @@ -69,8 +155,10 @@ fn main() { .get_matches(); if matches.is_present("async") { - async_main(); + info!("Running in asynchronous mode."); + async_main().await; } else { - old_main(); + info!("Running in synchronous mode."); + tokio::task::spawn_blocking(old_main).await.unwrap(); } } diff --git a/proto/src/config.rs b/proto/src/config.rs index d870e8d..f1ded51 100644 --- a/proto/src/config.rs +++ b/proto/src/config.rs @@ -1,10 +1,10 @@ pub const VER_MAJOR: u32 = 181; pub const VER_MINOR: u32 = 100; -pub const USERNAME: &'static str = "abcdefgh"; +pub const USERNAME: &'static str = "solstice"; // The password is not used for much, and sent unencrypted over the wire, so // why not even check it in to git -pub const PASSWORD: &'static str = "ijklmnop"; +pub const PASSWORD: &'static str = "topsekrit"; pub const SERVER_HOST: &'static str = "server.slsknet.org"; pub const SERVER_PORT: u16 = 2242;