Browse Source

Remove executor, exedute jobs single-threaded.

wip
Titouan Rigoudy 4 years ago
parent
commit
794cba3fe4
5 changed files with 14 additions and 178 deletions
  1. +0
    -10
      Cargo.lock
  2. +0
    -1
      client/Cargo.toml
  3. +8
    -3
      client/src/dispatcher.rs
  4. +0
    -155
      client/src/executor.rs
  5. +6
    -9
      client/src/main.rs

+ 0
- 10
Cargo.lock View File

@ -751,7 +751,6 @@ dependencies = [
"slab 0.2.0",
"solstice-proto",
"thiserror",
"threadpool",
"tokio",
"tokio-tungstenite",
]
@ -830,15 +829,6 @@ dependencies = [
"syn",
]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]]
name = "tinyvec"
version = "1.3.1"


+ 0
- 1
client/Cargo.toml View File

@ -18,6 +18,5 @@ serde_json = "^1.0"
slab = "^0.2"
solstice-proto = { path = "../proto" }
thiserror = "^1.0"
threadpool = "^1.0"
tokio = { version = "1.0", features = ["full"] }
tokio-tungstenite = "0.15"

+ 8
- 3
client/src/dispatcher.rs View File

@ -7,7 +7,6 @@ use solstice_proto::server::ServerResponse;
use crate::context::Context;
use crate::control::Request as ControlRequest;
use crate::executor::Job;
use crate::handlers::{
LoginStatusRequestHandler, PrivilegedUsersResponseHandler,
RoomJoinRequestHandler, RoomJoinResponseHandler, RoomListRequestHandler,
@ -35,8 +34,14 @@ impl From<ControlRequest> for Message {
}
}
/// Represents a synchronous task that can be run against a context.
pub trait Job: Send {
/// Runs this job against the given context.
fn execute(self: Box<Self>, context: &mut Context);
}
/// Pairs together a message and its handler as chosen by the dispatcher.
/// Implements Job so as to be scheduled on an executor.
/// Implements Job so as to erase the exact types involved.
struct DispatchedMessage<H, M> {
message: M,
handler: H,
@ -47,7 +52,7 @@ where
H: MessageHandler + Send,
<H as MessageHandler>::Message: Debug + Send,
{
fn execute(self: Box<Self>, context: &Context) {
fn execute(self: Box<Self>, context: &mut Context) {
if let Err(error) = self.handler.run(context, &self.message) {
error!(
"Error in handler {}: {:?}\nMessage: {:?}",


+ 0
- 155
client/src/executor.rs View File

@ -1,155 +0,0 @@
//! This module provides a facade for an abstract concurrent job executor.
//!
//! Mostly here to insulate the rest of this crate from the exact details of
//! the executor implementation, though it also owns the process-wide context
//! data structure against which handlers are run.
use std::sync::Arc;
use threadpool;
use crate::context::Context;
/// Default number of threads spawned by Executor instances
const NUM_THREADS: usize = 8;
/// The trait of objects that can be run by an Executor.
pub trait Job: Send {
/// Runs this job against the given context.
fn execute(self: Box<Self>, context: &Context);
}
/// A concurrent job execution engine.
pub struct Executor {
/// The context against which jobs are executed.
context: Arc<Context>,
/// Executes the jobs.
pool: threadpool::ThreadPool,
}
impl Executor {
/// Builds a new executor against the given context.
pub fn new(context: Context) -> Self {
Self {
context: Arc::new(context),
pool: threadpool::Builder::new()
.num_threads(NUM_THREADS)
.thread_name("Executor".to_string())
.build(),
}
}
/// Schedules execution of the given job on this executor.
pub fn schedule(&self, job: Box<dyn Job>) {
let context = self.context.clone();
self.pool.execute(move || job.execute(&*context));
}
/// Blocks until all scheduled jobs are executed, then returns the context.
pub fn join(self) -> Context {
self.pool.join();
// The only copies of the Arc are passed to the closures executed on
// the threadpool. Once the pool is join()ed, there cannot exist any
// other copies than ours, so we are safe to unwrap() the Arc.
Arc::try_unwrap(self.context).unwrap()
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Barrier};
use solstice_proto::{User, UserStatus};
use crate::context::{Context, ContextBundle};
use super::{Executor, Job};
#[test]
fn immediate_join_returns_unchanged_context() {
let bundle = ContextBundle::default();
let context = Executor::new(bundle.context).join();
assert_eq!(context.state.lock().users.get_list(), vec![]);
assert_eq!(context.state.lock().rooms.get_room_list(), vec![]);
}
struct Waiter {
barrier: Arc<Barrier>,
}
impl Job for Waiter {
fn execute(self: Box<Self>, _context: &Context) {
self.barrier.wait();
}
}
#[test]
fn join_waits_for_all_jobs() {
let bundle = ContextBundle::default();
let executor = Executor::new(bundle.context);
let barrier = Arc::new(Barrier::new(2));
executor.schedule(Box::new(Waiter {
barrier: barrier.clone(),
}));
executor.schedule(Box::new(Waiter {
barrier: barrier.clone(),
}));
executor.join();
}
struct UserAdder {
pub user: User,
}
impl Job for UserAdder {
fn execute(self: Box<Self>, context: &Context) {
context.state.lock().users.insert(self.user);
}
}
#[test]
fn jobs_access_context() {
let bundle = ContextBundle::default();
let executor = Executor::new(bundle.context);
let user1 = User {
name: "potato".to_string(),
status: UserStatus::Offline,
average_speed: 0,
num_downloads: 0,
unknown: 0,
num_files: 0,
num_folders: 0,
num_free_slots: 0,
country: "YO".to_string(),
};
let mut user2 = user1.clone();
user2.name = "rutabaga".to_string();
executor.schedule(Box::new(UserAdder {
user: user1.clone(),
}));
executor.schedule(Box::new(UserAdder {
user: user2.clone(),
}));
let context = executor.join();
let expected_users =
vec![(user1.name.clone(), user1), (user2.name.clone(), user2)];
let mut users = context.state.lock().users.get_list();
users.sort();
assert_eq!(users, expected_users);
}
}

+ 6
- 9
client/src/main.rs View File

@ -15,7 +15,6 @@ mod config;
mod context;
mod control;
mod dispatcher;
mod executor;
mod handlers;
mod login;
mod message_handler;
@ -27,7 +26,6 @@ mod user;
use config::Config;
use context::{ContextBundle, ContextOptions};
use dispatcher::Dispatcher;
use executor::Executor;
fn old_main() {
let (request_tx, _request_rx) = crossbeam_channel::bounded(100);
@ -105,19 +103,18 @@ async fn async_main() -> anyhow::Result<()> {
));
let dispatcher = Dispatcher::new();
let executor = Executor::new(bundle.context);
let control_task =
control_listener.run(dispatcher_tx, bundle.control_response_rx);
let dispatch_task = async move {
while let Some(message) = dispatcher_rx.recv().await {
let dispatch_task = tokio::task::spawn_blocking(move || {
let mut context = bundle.context;
while let Some(message) = dispatcher_rx.blocking_recv() {
if let Some(job) = dispatcher.dispatch(message) {
executor.schedule(job);
job.execute(&mut context);
}
}
tokio::task::spawn_blocking(move || executor.join()).await
};
context
});
tokio::select! {
result = control_task => match result {


Loading…
Cancel
Save