diff --git a/Cargo.lock b/Cargo.lock index a0283f2..0e7a19a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -751,7 +751,6 @@ dependencies = [ "slab 0.2.0", "solstice-proto", "thiserror", - "threadpool", "tokio", "tokio-tungstenite", ] @@ -830,15 +829,6 @@ dependencies = [ "syn", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "tinyvec" version = "1.3.1" diff --git a/client/Cargo.toml b/client/Cargo.toml index f97bfc8..058c66f 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -18,6 +18,5 @@ serde_json = "^1.0" slab = "^0.2" solstice-proto = { path = "../proto" } thiserror = "^1.0" -threadpool = "^1.0" tokio = { version = "1.0", features = ["full"] } tokio-tungstenite = "0.15" diff --git a/client/src/dispatcher.rs b/client/src/dispatcher.rs index 838870b..4ab6494 100644 --- a/client/src/dispatcher.rs +++ b/client/src/dispatcher.rs @@ -7,7 +7,6 @@ use solstice_proto::server::ServerResponse; use crate::context::Context; use crate::control::Request as ControlRequest; -use crate::executor::Job; use crate::handlers::{ LoginStatusRequestHandler, PrivilegedUsersResponseHandler, RoomJoinRequestHandler, RoomJoinResponseHandler, RoomListRequestHandler, @@ -35,8 +34,14 @@ impl From for Message { } } +/// Represents a synchronous task that can be run against a context. +pub trait Job: Send { + /// Runs this job against the given context. + fn execute(self: Box, context: &mut Context); +} + /// Pairs together a message and its handler as chosen by the dispatcher. -/// Implements Job so as to be scheduled on an executor. +/// Implements Job so as to erase the exact types involved. struct DispatchedMessage { message: M, handler: H, @@ -47,7 +52,7 @@ where H: MessageHandler + Send, ::Message: Debug + Send, { - fn execute(self: Box, context: &Context) { + fn execute(self: Box, context: &mut Context) { if let Err(error) = self.handler.run(context, &self.message) { error!( "Error in handler {}: {:?}\nMessage: {:?}", diff --git a/client/src/executor.rs b/client/src/executor.rs deleted file mode 100644 index 042a2d3..0000000 --- a/client/src/executor.rs +++ /dev/null @@ -1,155 +0,0 @@ -//! This module provides a facade for an abstract concurrent job executor. -//! -//! Mostly here to insulate the rest of this crate from the exact details of -//! the executor implementation, though it also owns the process-wide context -//! data structure against which handlers are run. - -use std::sync::Arc; - -use threadpool; - -use crate::context::Context; - -/// Default number of threads spawned by Executor instances -const NUM_THREADS: usize = 8; - -/// The trait of objects that can be run by an Executor. -pub trait Job: Send { - /// Runs this job against the given context. - fn execute(self: Box, context: &Context); -} - -/// A concurrent job execution engine. -pub struct Executor { - /// The context against which jobs are executed. - context: Arc, - - /// Executes the jobs. - pool: threadpool::ThreadPool, -} - -impl Executor { - /// Builds a new executor against the given context. - pub fn new(context: Context) -> Self { - Self { - context: Arc::new(context), - pool: threadpool::Builder::new() - .num_threads(NUM_THREADS) - .thread_name("Executor".to_string()) - .build(), - } - } - - /// Schedules execution of the given job on this executor. - pub fn schedule(&self, job: Box) { - let context = self.context.clone(); - self.pool.execute(move || job.execute(&*context)); - } - - /// Blocks until all scheduled jobs are executed, then returns the context. - pub fn join(self) -> Context { - self.pool.join(); - - // The only copies of the Arc are passed to the closures executed on - // the threadpool. Once the pool is join()ed, there cannot exist any - // other copies than ours, so we are safe to unwrap() the Arc. - Arc::try_unwrap(self.context).unwrap() - } -} - -#[cfg(test)] -mod tests { - use std::sync::{Arc, Barrier}; - - use solstice_proto::{User, UserStatus}; - - use crate::context::{Context, ContextBundle}; - - use super::{Executor, Job}; - - #[test] - fn immediate_join_returns_unchanged_context() { - let bundle = ContextBundle::default(); - - let context = Executor::new(bundle.context).join(); - - assert_eq!(context.state.lock().users.get_list(), vec![]); - assert_eq!(context.state.lock().rooms.get_room_list(), vec![]); - } - - struct Waiter { - barrier: Arc, - } - - impl Job for Waiter { - fn execute(self: Box, _context: &Context) { - self.barrier.wait(); - } - } - - #[test] - fn join_waits_for_all_jobs() { - let bundle = ContextBundle::default(); - let executor = Executor::new(bundle.context); - - let barrier = Arc::new(Barrier::new(2)); - - executor.schedule(Box::new(Waiter { - barrier: barrier.clone(), - })); - executor.schedule(Box::new(Waiter { - barrier: barrier.clone(), - })); - - executor.join(); - } - - struct UserAdder { - pub user: User, - } - - impl Job for UserAdder { - fn execute(self: Box, context: &Context) { - context.state.lock().users.insert(self.user); - } - } - - #[test] - fn jobs_access_context() { - let bundle = ContextBundle::default(); - - let executor = Executor::new(bundle.context); - - let user1 = User { - name: "potato".to_string(), - status: UserStatus::Offline, - average_speed: 0, - num_downloads: 0, - unknown: 0, - num_files: 0, - num_folders: 0, - num_free_slots: 0, - country: "YO".to_string(), - }; - - let mut user2 = user1.clone(); - user2.name = "rutabaga".to_string(); - - executor.schedule(Box::new(UserAdder { - user: user1.clone(), - })); - executor.schedule(Box::new(UserAdder { - user: user2.clone(), - })); - - let context = executor.join(); - - let expected_users = - vec![(user1.name.clone(), user1), (user2.name.clone(), user2)]; - - let mut users = context.state.lock().users.get_list(); - users.sort(); - - assert_eq!(users, expected_users); - } -} diff --git a/client/src/main.rs b/client/src/main.rs index d4080ee..c1bb536 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -15,7 +15,6 @@ mod config; mod context; mod control; mod dispatcher; -mod executor; mod handlers; mod login; mod message_handler; @@ -27,7 +26,6 @@ mod user; use config::Config; use context::{ContextBundle, ContextOptions}; use dispatcher::Dispatcher; -use executor::Executor; fn old_main() { let (request_tx, _request_rx) = crossbeam_channel::bounded(100); @@ -105,19 +103,18 @@ async fn async_main() -> anyhow::Result<()> { )); let dispatcher = Dispatcher::new(); - let executor = Executor::new(bundle.context); let control_task = control_listener.run(dispatcher_tx, bundle.control_response_rx); - let dispatch_task = async move { - while let Some(message) = dispatcher_rx.recv().await { + let dispatch_task = tokio::task::spawn_blocking(move || { + let mut context = bundle.context; + while let Some(message) = dispatcher_rx.blocking_recv() { if let Some(job) = dispatcher.dispatch(message) { - executor.schedule(job); + job.execute(&mut context); } } - - tokio::task::spawn_blocking(move || executor.join()).await - }; + context + }); tokio::select! { result = control_task => match result {