Browse Source

Implement basic asynchronous main.

wip
Titouan Rigoudy 4 years ago
parent
commit
57a8791c85
5 changed files with 133 additions and 26 deletions
  1. +9
    -0
      Cargo.lock
  2. +3
    -0
      client/Cargo.toml
  3. +21
    -14
      client/src/dispatcher.rs
  4. +98
    -10
      client/src/main.rs
  5. +2
    -2
      proto/src/config.rs

+ 9
- 0
Cargo.lock View File

@ -20,6 +20,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "anyhow"
version = "1.0.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "595d3cfa7a60d4555cb5067b99f07142a08ea778de5cf993f7b75c7d8fabc486"
[[package]]
name = "async-stream"
version = "0.3.2"
@ -1122,9 +1128,12 @@ checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
name = "solstice-client"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"clap",
"crossbeam-channel",
"env_logger",
"futures 0.3.15",
"log 0.4.14",
"mio 0.6.23",
"parking_lot 0.8.0",


+ 3
- 0
client/Cargo.toml View File

@ -5,9 +5,12 @@ authors = ["letitz"]
edition = "2018"
[dependencies]
anyhow = "1.0"
async-stream = "0.3"
clap = "^2.33"
crossbeam-channel = "^0.3"
env_logger = "^0.8"
futures = "0.3"
log = "^0.4"
mio = "^0.6"
parking_lot = "^0.8"


+ 21
- 14
client/src/dispatcher.rs View File

@ -2,11 +2,13 @@
use std::fmt::Debug;
use log::warn;
use solstice_proto::server::ServerResponse;
use crate::context::Context;
use crate::executor::Job;
use crate::handlers::{LoginHandler, SetPrivilegedUsersHandler};
use crate::message_handler::MessageHandler;
use solstice_proto::server::ServerResponse;
/// The type of messages dispatched by a dispatcher.
#[derive(Debug)]
@ -54,21 +56,24 @@ impl Dispatcher {
}
/// Dispatches the given message by wrapping it with a handler.
pub fn dispatch(&self, message: Message) -> Box<dyn Job> {
pub fn dispatch(&self, message: Message) -> Option<Box<dyn Job>> {
match message {
Message::ServerResponse(ServerResponse::LoginResponse(response)) => {
Box::new(DispatchedMessage {
Some(Box::new(DispatchedMessage {
message: response,
handler: LoginHandler::default(),
})
}))
}
Message::ServerResponse(ServerResponse::PrivilegedUsersResponse(
response,
)) => Box::new(DispatchedMessage {
)) => Some(Box::new(DispatchedMessage {
message: response,
handler: SetPrivilegedUsersHandler::default(),
}),
_ => panic!("Unimplemented"),
})),
Message::ServerResponse(response) => {
warn!("Unhandled server response: {:?}", response);
None
}
}
}
}
@ -85,17 +90,19 @@ mod tests {
#[test]
fn dispatcher_privileged_users_response() {
Dispatcher::new().dispatch(into_message(server::PrivilegedUsersResponse {
users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()],
}));
assert!(Dispatcher::new()
.dispatch(into_message(server::PrivilegedUsersResponse {
users: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()],
}))
.is_some());
}
#[test]
fn dispatcher_login_response() {
Dispatcher::new().dispatch(into_message(
server::LoginResponse::LoginFail {
assert!(Dispatcher::new()
.dispatch(into_message(server::LoginResponse::LoginFail {
reason: "bleep bloop".to_string(),
},
));
},))
.is_some());
}
}

+ 98
- 10
client/src/main.rs View File

@ -7,24 +7,26 @@ use std::thread;
use clap::{App, Arg};
use crossbeam_channel;
use env_logger;
use futures::stream::{Stream, StreamExt};
use log::info;
use solstice_proto;
use tokio::net::TcpStream;
mod client;
#[cfg(test)]
mod context;
mod control;
#[cfg(test)]
mod dispatcher;
#[cfg(test)]
mod executor;
#[cfg(test)]
mod handlers;
mod login;
#[cfg(test)]
mod message_handler;
mod room;
mod user;
use context::Context;
use dispatcher::Dispatcher;
use executor::Executor;
fn old_main() {
let (proto_to_client_tx, proto_to_client_rx) = crossbeam_channel::unbounded();
@ -51,11 +53,95 @@ fn old_main() {
client.run();
}
fn async_main() {
// TODO
// There is a risk of deadlock if we use two bounded channels here:
//
// - client task is blocked trying to send response to dispatcher
// - all dispatcher threads are blocked trying to send requests to client
//
// This stems from the fact that requests are only read from the channel and
// sent to the server when `inbound` is being polled, which is mutually
// exclusive with `dispatcher_tx.send()` being polled.
//
// This could be fixed in one of two ways, at least:
//
// - write `Client` interface in terms of channels, not streams
// - this would allow both receiving and sending tasks to run concurrently
// inside `Client`
// - in other words, sending the response on the dispatcher channel would
// run concurrently with sending requests to the server
// - use `FrameReader` / `FrameWriter` directly instead, and synchronize their
// behavior manually
//
async fn run_client(
mut request_rx: tokio::sync::mpsc::Receiver<solstice_proto::ServerRequest>,
dispatcher_tx: tokio::sync::mpsc::UnboundedSender<dispatcher::Message>,
) -> anyhow::Result<()> {
let address = format!(
"{}:{}",
solstice_proto::config::SERVER_HOST,
solstice_proto::config::SERVER_PORT
);
info!("Connecting to server at {}.", address);
let stream = TcpStream::connect(address).await?;
let credentials = solstice_proto::server::Credentials::new(
solstice_proto::config::USERNAME.to_string(),
solstice_proto::config::PASSWORD.to_string(),
)
.expect("Invalid credentials");
info!("Logging in to server.");
let client = solstice_proto::server::Client::new(stream)
.login(credentials)
.await?;
let outbound = async_stream::stream! {
while let Some(request) = request_rx.recv().await {
yield request;
}
};
info!("Running client.");
let inbound = client.run(outbound);
tokio::pin!(inbound);
while let Some(result) = inbound.next().await {
let response = result?;
dispatcher_tx.send(response.into())?;
}
info!("Client finished running.");
Ok(())
}
async fn async_main() {
let (_server_request_tx, server_request_rx) = tokio::sync::mpsc::channel(100);
let (dispatcher_tx, mut dispatcher_rx) =
tokio::sync::mpsc::unbounded_channel();
let client_task = tokio::spawn(run_client(server_request_rx, dispatcher_tx));
let dispatcher = Dispatcher::new();
let executor = Executor::new(Context::new());
while let Some(message) = dispatcher_rx.recv().await {
if let Some(job) = dispatcher.dispatch(message) {
executor.schedule(job);
}
}
let _context = tokio::task::spawn_blocking(move || executor.join())
.await
.unwrap();
client_task
.await
.expect("Client task join error")
.expect("Client error");
}
fn main() {
#[tokio::main]
async fn main() {
env_logger::init();
let matches = App::new("solstice-client")
@ -69,8 +155,10 @@ fn main() {
.get_matches();
if matches.is_present("async") {
async_main();
info!("Running in asynchronous mode.");
async_main().await;
} else {
old_main();
info!("Running in synchronous mode.");
tokio::task::spawn_blocking(old_main).await.unwrap();
}
}

+ 2
- 2
proto/src/config.rs View File

@ -1,10 +1,10 @@
pub const VER_MAJOR: u32 = 181;
pub const VER_MINOR: u32 = 100;
pub const USERNAME: &'static str = "abcdefgh";
pub const USERNAME: &'static str = "solstice";
// The password is not used for much, and sent unencrypted over the wire, so
// why not even check it in to git
pub const PASSWORD: &'static str = "ijklmnop";
pub const PASSWORD: &'static str = "topsekrit";
pub const SERVER_HOST: &'static str = "server.slsknet.org";
pub const SERVER_PORT: u16 = 2242;


Loading…
Cancel
Save