Browse Source

Move Context back into Executor, add test.

wip
Titouan Rigoudy 6 years ago
parent
commit
e542c102b2
4 changed files with 97 additions and 25 deletions
  1. +12
    -7
      src/dispatcher.rs
  2. +79
    -14
      src/executor.rs
  3. +4
    -2
      src/message_handler.rs
  4. +2
    -2
      src/proto/user.rs

+ 12
- 7
src/dispatcher.rs View File

@ -2,17 +2,19 @@
use std::fmt::Debug; use std::fmt::Debug;
use crate::context::Context;
use crate::executor::Job; use crate::executor::Job;
use crate::message_handler::MessageHandler; use crate::message_handler::MessageHandler;
use crate::proto::server::ServerResponse; use crate::proto::server::ServerResponse;
/// The type of messages dispatched by a dispatcher. /// The type of messages dispatched by a dispatcher.
enum Message {
#[derive(Debug)]
pub enum Message {
ServerResponse(ServerResponse), ServerResponse(ServerResponse),
} }
/// Pairs together a message and its handler as chosen by the dispatcher. /// Pairs together a message and its handler as chosen by the dispatcher.
/// Implements Execute so as to be scheduled on an executor.
/// Implements Job so as to be scheduled on an executor.
struct DispatchedMessage<M, H> { struct DispatchedMessage<M, H> {
message: M, message: M,
handler: H, handler: H,
@ -29,8 +31,8 @@ where
M: Debug + Send, M: Debug + Send,
H: Debug + Send + MessageHandler<M>, H: Debug + Send + MessageHandler<M>,
{ {
fn execute(self: Box<Self>) {
if let Err(error) = self.handler.run(&self.message) {
fn execute(self: Box<Self>, context: &Context) {
if let Err(error) = self.handler.run(context, &self.message) {
error!( error!(
"Error in handler {}: {:?}\nMessage: {:?}", "Error in handler {}: {:?}\nMessage: {:?}",
H::name(), H::name(),
@ -41,14 +43,17 @@ where
} }
} }
struct Dispatcher;
/// The Dispatcher is in charge of mapping messages to their handlers.
pub struct Dispatcher;
impl Dispatcher { impl Dispatcher {
fn new() -> Self {
/// Returns a new dispatcher.
pub fn new() -> Self {
Self {} Self {}
} }
fn dispatch(message: Message) -> Box<dyn Job> {
/// Dispatches the given message by wrapping it with a handler.
pub fn dispatch(message: Message) -> Box<dyn Job> {
panic!("Unimplemented") panic!("Unimplemented")
} }
} }

+ 79
- 14
src/executor.rs View File

@ -1,28 +1,38 @@
//! This module provides a facade for an abstract concurrent job executor. //! This module provides a facade for an abstract concurrent job executor.
//! //!
//! Mostly here to insulate the rest of this crate from the exact details of //! Mostly here to insulate the rest of this crate from the exact details of
//! the executor implementation.
//! the executor implementation, though it also owns the process-wide context
//! data structure against which handlers are run.
use std::sync::Arc;
use threadpool; use threadpool;
use crate::context::Context;
/// Default number of threads spawned by Executor instances /// Default number of threads spawned by Executor instances
const NUM_THREADS: usize = 8; const NUM_THREADS: usize = 8;
/// The trait of objects that can be run by an Executor. /// The trait of objects that can be run by an Executor.
pub trait Job: Send { pub trait Job: Send {
fn execute(self: Box<Self>);
/// Runs this job against the given context.
fn execute(self: Box<Self>, context: &Context);
} }
/// A concurrent job execution engine. /// A concurrent job execution engine.
pub struct Executor { pub struct Executor {
/// The context against which jobs are executed.
context: Arc<Context>,
/// Executes the jobs. /// Executes the jobs.
pool: threadpool::ThreadPool, pool: threadpool::ThreadPool,
} }
impl Executor { impl Executor {
/// Builds a new executor with a default number of threads.
pub fn new() -> Self {
/// Builds a new executor against the given context.
pub fn new(context: Context) -> Self {
Self { Self {
context: Arc::new(context),
pool: threadpool::Builder::new() pool: threadpool::Builder::new()
.num_threads(NUM_THREADS) .num_threads(NUM_THREADS)
.thread_name("Executor".to_string()) .thread_name("Executor".to_string())
@ -32,40 +42,49 @@ impl Executor {
/// Schedules execution of the given job on this executor. /// Schedules execution of the given job on this executor.
pub fn schedule(&self, job: Box<dyn Job>) { pub fn schedule(&self, job: Box<dyn Job>) {
self.pool.execute(move || job.execute());
let context = self.context.clone();
self.pool.execute(move || job.execute(&*context));
} }
/// Blocks until all scheduled jobs are executed.
pub fn join(self) {
/// Blocks until all scheduled jobs are executed, then returns the context.
pub fn join(self) -> Context {
self.pool.join(); self.pool.join();
// The only copies of the Arc are passed to the closures executed on
// the threadpool. Once the pool is join()ed, there cannot exist any
// other copies than ours, so we are safe to unwrap() the Arc.
Arc::try_unwrap(self.context).unwrap()
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io;
use std::sync::{Arc, Barrier}; use std::sync::{Arc, Barrier};
use super::{Executor, Job};
use crate::proto::{User, UserStatus};
use super::{Context, Executor, Job};
#[test] #[test]
fn immediate_join() {
Executor::new().join()
fn immediate_join_returns_empty_context() {
let context = Executor::new(Context::new()).join();
assert_eq!(context.users.lock().get_list(), vec![]);
assert_eq!(context.rooms.lock().get_room_list(), vec![]);
} }
struct Waiter { struct Waiter {
pub barrier: Arc<Barrier>,
barrier: Arc<Barrier>,
} }
impl Job for Waiter { impl Job for Waiter {
fn execute(self: Box<Self>) {
fn execute(self: Box<Self>, context: &Context) {
self.barrier.wait(); self.barrier.wait();
} }
} }
#[test] #[test]
fn join_waits_for_all_jobs() { fn join_waits_for_all_jobs() {
let executor = Executor::new();
let executor = Executor::new(Context::new());
let barrier = Arc::new(Barrier::new(2)); let barrier = Arc::new(Barrier::new(2));
@ -78,4 +97,50 @@ mod tests {
executor.join(); executor.join();
} }
struct UserAdder {
pub user: User,
}
impl Job for UserAdder {
fn execute(self: Box<Self>, context: &Context) {
context.users.lock().insert(self.user);
}
}
#[test]
fn jobs_access_context() {
let executor = Executor::new(Context::new());
let user1 = User {
name: "potato".to_string(),
status: UserStatus::Offline,
average_speed: 0,
num_downloads: 0,
unknown: 0,
num_files: 0,
num_folders: 0,
num_free_slots: 0,
country: "YO".to_string(),
};
let mut user2 = user1.clone();
user2.name = "rutabaga".to_string();
executor.schedule(Box::new(UserAdder {
user: user1.clone(),
}));
executor.schedule(Box::new(UserAdder {
user: user2.clone(),
}));
let context = executor.join();
let expected_users = vec![(user1.name.clone(), user1), (user2.name.clone(), user2)];
let mut users = context.users.lock().get_list();
users.sort();
assert_eq!(users, expected_users);
}
} }

+ 4
- 2
src/message_handler.rs View File

@ -1,13 +1,15 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::io; use std::io;
use crate::context::Context;
/// A trait for types that can handle reception of a message. /// A trait for types that can handle reception of a message.
/// ///
/// Message types are mapped to handler types by Dispatcher. /// Message types are mapped to handler types by Dispatcher.
/// This trait is intended to allow composing handler logic. /// This trait is intended to allow composing handler logic.
pub trait MessageHandler<Message>: Debug { pub trait MessageHandler<Message>: Debug {
/// Attempts to handle the given message.
fn run(self, message: &Message) -> io::Result<()>;
/// Attempts to handle the given message against the given context.
fn run(self, context: &Context, message: &Message) -> io::Result<()>;
/// Returns the name of this handler type. /// Returns the name of this handler type.
fn name() -> String; fn name() -> String;


+ 2
- 2
src/proto/user.rs View File

@ -10,7 +10,7 @@ const STATUS_AWAY: u32 = 2;
const STATUS_ONLINE: u32 = 3; const STATUS_ONLINE: u32 = 3;
/// This enumeration is the list of possible user statuses. /// This enumeration is the list of possible user statuses.
#[derive(Clone, Copy, Debug, Eq, PartialEq, RustcDecodable, RustcEncodable)]
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd, RustcDecodable, RustcEncodable)]
pub enum UserStatus { pub enum UserStatus {
/// The user if offline. /// The user if offline.
Offline, Offline,
@ -71,7 +71,7 @@ impl ProtoDecode for UserStatus {
} }
/// This structure contains the last known information about a fellow user. /// This structure contains the last known information about a fellow user.
#[derive(Clone, Debug, Eq, PartialEq, RustcDecodable, RustcEncodable)]
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, RustcDecodable, RustcEncodable)]
pub struct User { pub struct User {
/// The name of the user. /// The name of the user.
pub name: String, pub name: String,


Loading…
Cancel
Save