Browse Source

Move some Executor functionality into MessageHandler.

wip
Titouan Rigoudy 6 years ago
parent
commit
e7fc922bb3
4 changed files with 42 additions and 61 deletions
  1. +0
    -3
      src/context.rs
  2. +15
    -5
      src/dispatcher.rs
  3. +20
    -49
      src/executor.rs
  4. +7
    -4
      src/message_handler.rs

+ 0
- 3
src/context.rs View File

@ -27,9 +27,6 @@ impl Context {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use std::thread;
use super::Context; use super::Context;
#[test] #[test]


+ 15
- 5
src/dispatcher.rs View File

@ -1,8 +1,7 @@
//! This module defines the central message dispatcher to the client process. //! This module defines the central message dispatcher to the client process.
use std::io;
use std::fmt::Debug;
use crate::context::Context;
use crate::executor::Job; use crate::executor::Job;
use crate::message_handler::MessageHandler; use crate::message_handler::MessageHandler;
use crate::proto::server::ServerResponse; use crate::proto::server::ServerResponse;
@ -25,9 +24,20 @@ impl<M, H> DispatchedMessage<M, H> {
} }
} }
impl<M: Send, H: MessageHandler<M> + Send> Job for DispatchedMessage<M, H> {
fn execute(self: Box<Self>, context: &Context) -> io::Result<()> {
self.handler.run(context, self.message)
impl<M, H> Job for DispatchedMessage<M, H>
where
M: Debug + Send,
H: Debug + Send + MessageHandler<M>,
{
fn execute(self: Box<Self>) {
if let Err(error) = self.handler.run(&self.message) {
error!(
"Error in handler {}: {:?}\nMessage: {:?}",
H::name(),
error,
&self.message
);
}
} }
} }


+ 20
- 49
src/executor.rs View File

@ -1,38 +1,28 @@
use std::io;
use std::sync::Arc;
//! This module provides a facade for an abstract concurrent job executor.
//!
//! Mostly here to insulate the rest of this crate from the exact details of
//! the executor implementation.
use threadpool; use threadpool;
use crate::context::Context;
/// Default number of threads spawned by Executor instances /// Default number of threads spawned by Executor instances
const NUM_THREADS: usize = 8; const NUM_THREADS: usize = 8;
/// The trait of objects that can be run by an Executor. /// The trait of objects that can be run by an Executor.
///
/// NOTE: Intended to be used by boxed objects, so that self's contents can be
/// moved by `execute` without running into "unknown size at compiled time"
/// E0161 errors.
pub trait Job: Send { pub trait Job: Send {
/// Executes self in the given context.
/// Errors do not crash the process, but are error-logged.
fn execute(self: Box<Self>, context: &Context) -> io::Result<()>;
fn execute(self: Box<Self>);
} }
/// The central executor object that drives the client process.
/// A concurrent job execution engine.
pub struct Executor { pub struct Executor {
/// The context against which jobs are executed.
context: Arc<Context>,
/// Executes the jobs. /// Executes the jobs.
pool: threadpool::ThreadPool, pool: threadpool::ThreadPool,
} }
impl Executor { impl Executor {
/// Builds a new executor with an empty context a default number of threads.
/// Builds a new executor with a default number of threads.
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
context: Arc::new(Context::new()),
pool: threadpool::Builder::new() pool: threadpool::Builder::new()
.num_threads(NUM_THREADS) .num_threads(NUM_THREADS)
.thread_name("Executor".to_string()) .thread_name("Executor".to_string())
@ -42,36 +32,25 @@ impl Executor {
/// Schedules execution of the given job on this executor. /// Schedules execution of the given job on this executor.
pub fn schedule(&self, job: Box<dyn Job>) { pub fn schedule(&self, job: Box<dyn Job>) {
let context = self.context.clone();
self.pool.execute(move || {
if let Err(error) = job.execute(&*context) {
error!("Executable returned error: {:?}", error)
}
})
self.pool.execute(move || job.execute());
} }
/// Blocks until all scheduled jobs are executed, then returns the context.
pub fn join(self) -> Context {
/// Blocks until all scheduled jobs are executed.
pub fn join(self) {
self.pool.join(); self.pool.join();
// Once the pool is joined, no-one should be holding on to copies of
// `self.context` anymore, so we unwrap() here.
Arc::try_unwrap(self.context).unwrap()
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io; use std::io;
use std::sync::{Arc, Barrier, Mutex};
use std::sync::{Arc, Barrier};
use super::{Context, Executor, Job};
use super::{Executor, Job};
#[test] #[test]
fn immediate_join_returns_empty_context() {
let context = Executor::new().join();
assert_eq!(context.users.lock().get_list(), vec![]);
assert_eq!(context.rooms.lock().get_room_list(), vec![]);
fn immediate_join() {
Executor::new().join()
} }
struct Waiter { struct Waiter {
@ -79,9 +58,8 @@ mod tests {
} }
impl Job for Waiter { impl Job for Waiter {
fn execute(self: Box<Self>, context: &Context) -> io::Result<()> {
fn execute(self: Box<Self>) {
self.barrier.wait(); self.barrier.wait();
Ok(())
} }
} }
@ -91,20 +69,13 @@ mod tests {
let barrier = Arc::new(Barrier::new(2)); let barrier = Arc::new(Barrier::new(2));
let waiter1 = Box::new(Waiter {
executor.schedule(Box::new(Waiter {
barrier: barrier.clone(), barrier: barrier.clone(),
});
let waiter2 = Box::new(Waiter {
}));
executor.schedule(Box::new(Waiter {
barrier: barrier.clone(), barrier: barrier.clone(),
});
}));
executor.schedule(waiter1);
executor.schedule(waiter2);
let context = executor.join();
assert_eq!(context.users.lock().get_list(), vec![]);
assert_eq!(context.rooms.lock().get_room_list(), vec![]);
executor.join();
} }
// TODO: Add a test that exercises modifying Context.
} }

+ 7
- 4
src/message_handler.rs View File

@ -1,11 +1,14 @@
use std::fmt::Debug;
use std::io; use std::io;
use crate::context::Context;
/// A trait for types that can handle reception of a message. /// A trait for types that can handle reception of a message.
/// ///
/// Message types are mapped to handler types by Dispatcher. /// Message types are mapped to handler types by Dispatcher.
/// This trait is intended to allow composing handler logic. /// This trait is intended to allow composing handler logic.
pub trait MessageHandler<Message> {
fn run(self, context: &Context, message: Message) -> io::Result<()>;
pub trait MessageHandler<Message>: Debug {
/// Attempts to handle the given message.
fn run(self, message: &Message) -> io::Result<()>;
/// Returns the name of this handler type.
fn name() -> String;
} }

Loading…
Cancel
Save