Solstice client.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

167 lines
4.3 KiB

// Still no 2018 way of using the log crate without `use log::*` everywhere.
#[macro_use]
extern crate log;
use std::thread;
use clap::{App, Arg};
use crossbeam_channel;
use env_logger;
use futures::stream::StreamExt;
use log::info;
use parking_lot::Mutex;
use solstice_proto;
use tokio::net::TcpStream;
mod client;
mod context;
mod control;
mod dispatcher;
mod executor;
mod handlers;
mod login;
mod message_handler;
mod room;
mod user;
use context::ContextBundle;
use dispatcher::Dispatcher;
use executor::Executor;
fn old_main() {
let (proto_to_client_tx, proto_to_client_rx) = crossbeam_channel::unbounded();
let mut proto_agent = match solstice_proto::Agent::new(proto_to_client_tx) {
Ok(agent) => agent,
Err(err) => {
error!("Error initializing protocol agent: {}", err);
return;
}
};
let client_to_proto_tx = proto_agent.channel();
let (control_to_client_tx, control_to_client_rx) =
crossbeam_channel::unbounded();
let mut client = client::Client::new(
client_to_proto_tx,
proto_to_client_rx,
control_to_client_rx,
);
thread::spawn(move || control::listen(control_to_client_tx));
thread::spawn(move || proto_agent.run().unwrap());
client.run();
}
// There is a risk of deadlock if we use two bounded channels here:
//
// - client task is blocked trying to send response to dispatcher
// - all dispatcher threads are blocked trying to send requests to client
//
// This stems from the fact that requests are only read from the channel and
// sent to the server when `inbound` is being polled, which is mutually
// exclusive with `dispatcher_tx.send()` being polled.
//
// This could be fixed in one of two ways, at least:
//
// - write `Client` interface in terms of channels, not streams
// - this would allow both receiving and sending tasks to run concurrently
// inside `Client`
// - in other words, sending the response on the dispatcher channel would
// run concurrently with sending requests to the server
// - use `FrameReader` / `FrameWriter` directly instead, and synchronize their
// behavior manually
//
async fn run_client(
mut request_rx: tokio::sync::mpsc::Receiver<solstice_proto::ServerRequest>,
dispatcher_tx: tokio::sync::mpsc::UnboundedSender<dispatcher::Message>,
) -> anyhow::Result<()> {
let address = format!(
"{}:{}",
solstice_proto::config::SERVER_HOST,
solstice_proto::config::SERVER_PORT
);
info!("Connecting to server at {}.", address);
let stream = TcpStream::connect(address).await?;
let credentials = solstice_proto::server::Credentials::new(
solstice_proto::config::USERNAME.to_string(),
solstice_proto::config::PASSWORD.to_string(),
)
.expect("Invalid credentials");
info!("Logging in to server.");
let client = solstice_proto::server::Client::new(stream)
.login(credentials)
.await?;
let outbound = async_stream::stream! {
while let Some(request) = request_rx.recv().await {
yield request;
}
};
info!("Running client.");
let inbound = client.run(outbound);
tokio::pin!(inbound);
while let Some(result) = inbound.next().await {
let response = result?;
dispatcher_tx.send(response.into())?;
}
info!("Client finished running.");
Ok(())
}
async fn async_main() {
let bundle = ContextBundle::default();
let (dispatcher_tx, mut dispatcher_rx) =
tokio::sync::mpsc::unbounded_channel();
let client_task =
tokio::spawn(run_client(bundle.server_request_rx, dispatcher_tx));
let dispatcher = Dispatcher::new();
let executor = Executor::new(bundle.context);
while let Some(message) = dispatcher_rx.recv().await {
if let Some(job) = dispatcher.dispatch(message) {
executor.schedule(job);
}
}
let _context = tokio::task::spawn_blocking(move || executor.join())
.await
.unwrap();
client_task
.await
.expect("Client task join error")
.expect("Client error");
}
#[tokio::main]
async fn main() {
env_logger::init();
let matches = App::new("solstice-client")
.version("0.1")
.author("letitz")
.arg(
Arg::with_name("async")
.long("async")
.help("Use the new async engine."),
)
.get_matches();
if matches.is_present("async") {
info!("Running in asynchronous mode.");
async_main().await;
} else {
info!("Running in synchronous mode.");
tokio::task::spawn_blocking(old_main).await.unwrap();
}
}