From e542c102b2ebc02fc296961b3ee64aa975f95ccd Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 27 May 2019 20:53:11 +0000 Subject: [PATCH] Move Context back into Executor, add test. --- src/dispatcher.rs | 19 +++++---- src/executor.rs | 93 +++++++++++++++++++++++++++++++++++------- src/message_handler.rs | 6 ++- src/proto/user.rs | 4 +- 4 files changed, 97 insertions(+), 25 deletions(-) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 5b720f6..fc7269c 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -2,17 +2,19 @@ use std::fmt::Debug; +use crate::context::Context; use crate::executor::Job; use crate::message_handler::MessageHandler; use crate::proto::server::ServerResponse; /// The type of messages dispatched by a dispatcher. -enum Message { +#[derive(Debug)] +pub enum Message { ServerResponse(ServerResponse), } /// Pairs together a message and its handler as chosen by the dispatcher. -/// Implements Execute so as to be scheduled on an executor. +/// Implements Job so as to be scheduled on an executor. struct DispatchedMessage { message: M, handler: H, @@ -29,8 +31,8 @@ where M: Debug + Send, H: Debug + Send + MessageHandler, { - fn execute(self: Box) { - if let Err(error) = self.handler.run(&self.message) { + fn execute(self: Box, context: &Context) { + if let Err(error) = self.handler.run(context, &self.message) { error!( "Error in handler {}: {:?}\nMessage: {:?}", H::name(), @@ -41,14 +43,17 @@ where } } -struct Dispatcher; +/// The Dispatcher is in charge of mapping messages to their handlers. +pub struct Dispatcher; impl Dispatcher { - fn new() -> Self { + /// Returns a new dispatcher. + pub fn new() -> Self { Self {} } - fn dispatch(message: Message) -> Box { + /// Dispatches the given message by wrapping it with a handler. + pub fn dispatch(message: Message) -> Box { panic!("Unimplemented") } } diff --git a/src/executor.rs b/src/executor.rs index 8cbf0a4..6559090 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,28 +1,38 @@ //! This module provides a facade for an abstract concurrent job executor. //! //! Mostly here to insulate the rest of this crate from the exact details of -//! the executor implementation. +//! the executor implementation, though it also owns the process-wide context +//! data structure against which handlers are run. + +use std::sync::Arc; use threadpool; +use crate::context::Context; + /// Default number of threads spawned by Executor instances const NUM_THREADS: usize = 8; /// The trait of objects that can be run by an Executor. pub trait Job: Send { - fn execute(self: Box); + /// Runs this job against the given context. + fn execute(self: Box, context: &Context); } /// A concurrent job execution engine. pub struct Executor { + /// The context against which jobs are executed. + context: Arc, + /// Executes the jobs. pool: threadpool::ThreadPool, } impl Executor { - /// Builds a new executor with a default number of threads. - pub fn new() -> Self { + /// Builds a new executor against the given context. + pub fn new(context: Context) -> Self { Self { + context: Arc::new(context), pool: threadpool::Builder::new() .num_threads(NUM_THREADS) .thread_name("Executor".to_string()) @@ -32,40 +42,49 @@ impl Executor { /// Schedules execution of the given job on this executor. pub fn schedule(&self, job: Box) { - self.pool.execute(move || job.execute()); + let context = self.context.clone(); + self.pool.execute(move || job.execute(&*context)); } - /// Blocks until all scheduled jobs are executed. - pub fn join(self) { + /// Blocks until all scheduled jobs are executed, then returns the context. + pub fn join(self) -> Context { self.pool.join(); + + // The only copies of the Arc are passed to the closures executed on + // the threadpool. Once the pool is join()ed, there cannot exist any + // other copies than ours, so we are safe to unwrap() the Arc. + Arc::try_unwrap(self.context).unwrap() } } #[cfg(test)] mod tests { - use std::io; use std::sync::{Arc, Barrier}; - use super::{Executor, Job}; + use crate::proto::{User, UserStatus}; + + use super::{Context, Executor, Job}; #[test] - fn immediate_join() { - Executor::new().join() + fn immediate_join_returns_empty_context() { + let context = Executor::new(Context::new()).join(); + assert_eq!(context.users.lock().get_list(), vec![]); + assert_eq!(context.rooms.lock().get_room_list(), vec![]); } struct Waiter { - pub barrier: Arc, + barrier: Arc, } impl Job for Waiter { - fn execute(self: Box) { + fn execute(self: Box, context: &Context) { self.barrier.wait(); } } #[test] fn join_waits_for_all_jobs() { - let executor = Executor::new(); + let executor = Executor::new(Context::new()); let barrier = Arc::new(Barrier::new(2)); @@ -78,4 +97,50 @@ mod tests { executor.join(); } + + struct UserAdder { + pub user: User, + } + + impl Job for UserAdder { + fn execute(self: Box, context: &Context) { + context.users.lock().insert(self.user); + } + } + + #[test] + fn jobs_access_context() { + let executor = Executor::new(Context::new()); + + let user1 = User { + name: "potato".to_string(), + status: UserStatus::Offline, + average_speed: 0, + num_downloads: 0, + unknown: 0, + num_files: 0, + num_folders: 0, + num_free_slots: 0, + country: "YO".to_string(), + }; + + let mut user2 = user1.clone(); + user2.name = "rutabaga".to_string(); + + executor.schedule(Box::new(UserAdder { + user: user1.clone(), + })); + executor.schedule(Box::new(UserAdder { + user: user2.clone(), + })); + + let context = executor.join(); + + let expected_users = vec![(user1.name.clone(), user1), (user2.name.clone(), user2)]; + + let mut users = context.users.lock().get_list(); + users.sort(); + + assert_eq!(users, expected_users); + } } diff --git a/src/message_handler.rs b/src/message_handler.rs index bec2d57..36e3b2b 100644 --- a/src/message_handler.rs +++ b/src/message_handler.rs @@ -1,13 +1,15 @@ use std::fmt::Debug; use std::io; +use crate::context::Context; + /// A trait for types that can handle reception of a message. /// /// Message types are mapped to handler types by Dispatcher. /// This trait is intended to allow composing handler logic. pub trait MessageHandler: Debug { - /// Attempts to handle the given message. - fn run(self, message: &Message) -> io::Result<()>; + /// Attempts to handle the given message against the given context. + fn run(self, context: &Context, message: &Message) -> io::Result<()>; /// Returns the name of this handler type. fn name() -> String; diff --git a/src/proto/user.rs b/src/proto/user.rs index 3009130..19bb0ed 100644 --- a/src/proto/user.rs +++ b/src/proto/user.rs @@ -10,7 +10,7 @@ const STATUS_AWAY: u32 = 2; const STATUS_ONLINE: u32 = 3; /// This enumeration is the list of possible user statuses. -#[derive(Clone, Copy, Debug, Eq, PartialEq, RustcDecodable, RustcEncodable)] +#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd, RustcDecodable, RustcEncodable)] pub enum UserStatus { /// The user if offline. Offline, @@ -71,7 +71,7 @@ impl ProtoDecode for UserStatus { } /// This structure contains the last known information about a fellow user. -#[derive(Clone, Debug, Eq, PartialEq, RustcDecodable, RustcEncodable)] +#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, RustcDecodable, RustcEncodable)] pub struct User { /// The name of the user. pub name: String,