Browse Source

Implement Executor.

wip
Titouan Rigoudy 6 years ago
parent
commit
78d10224f8
6 changed files with 62 additions and 10 deletions
  1. +10
    -0
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +1
    -0
      src/context.rs
  4. +4
    -4
      src/dispatcher.rs
  5. +43
    -6
      src/executor.rs
  6. +3
    -0
      src/message_handler.rs

+ 10
- 0
Cargo.lock View File

@ -708,6 +708,7 @@ dependencies = [
"rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
@ -736,6 +737,14 @@ dependencies = [
"thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "threadpool"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "time"
version = "0.1.42"
@ -1115,6 +1124,7 @@ dependencies = [
"checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8"
"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03"
"checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5"
"checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865"
"checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f"
"checksum tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "94a1f9396aec29d31bb16c24d155cfa144d1af91c40740125db3131bdaf76da8"
"checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f"


+ 1
- 0
Cargo.toml View File

@ -17,6 +17,7 @@ parking_lot = "^0.8"
rust-crypto = "^0.2.34"
rustc-serialize = "^0.3.17"
slab = "^0.2"
threadpool = "^1.0"
tokio-core = "^0.1"
tokio-io = "^0.1"
tokio-codec = "^0.1"


+ 1
- 0
src/context.rs View File

@ -13,6 +13,7 @@ pub struct Context {
}
impl Context {
/// Creates a new empty context.
pub fn new() -> Self {
Self {
rooms: Mutex::new(RoomMap::new()),


+ 4
- 4
src/dispatcher.rs View File

@ -3,7 +3,7 @@
use std::io;
use crate::context::Context;
use crate::executor::Execute;
use crate::executor::Job;
use crate::message_handler::MessageHandler;
use crate::proto::server::ServerResponse;
@ -25,8 +25,8 @@ impl<M, H> DispatchedMessage<M, H> {
}
}
impl<M, H: MessageHandler<M>> Execute for DispatchedMessage<M, H> {
fn execute(self, context: &Context) -> io::Result<()> {
impl<M: Send, H: MessageHandler<M> + Send> Job for DispatchedMessage<M, H> {
fn execute(self: Box<Self>, context: &Context) -> io::Result<()> {
self.handler.run(context, self.message)
}
}
@ -38,7 +38,7 @@ impl Dispatcher {
Self {}
}
fn dispatch(message: Message) -> Box<dyn Execute> {
fn dispatch(message: Message) -> Box<dyn Job> {
panic!("Unimplemented")
}
}

+ 43
- 6
src/executor.rs View File

@ -1,20 +1,57 @@
use std::io;
use std::sync::Arc;
use threadpool;
use crate::context::Context;
/// Default number of threads spawned by Executor instances
const NUM_THREADS: usize = 8;
/// The trait of objects that can be run by an Executor.
pub trait Execute {
fn execute(self, context: &Context) -> io::Result<()>;
///
/// NOTE: Intended to be used by boxed objects, so that self's contents can be
/// moved by `execute` without running into "unknown size at compiled time"
/// E0161 errors.
pub trait Job: Send {
/// Executes self in the given context.
/// Errors do not crash the process, but are error-logged.
fn execute(self: Box<Self>, context: &Context) -> io::Result<()>;
}
/// The central executor object that drives the client process.
pub struct Executor {
// TODO
/// The context against which jobs are executed.
context: Arc<Context>,
/// Executes the jobs.
pool: threadpool::ThreadPool,
}
impl Executor {
pub fn new() -> Self { Self {} }
/// Builds a new executor with an empty context a default number of threads.
pub fn new() -> Self {
Self {
context: Arc::new(Context::new()),
pool: threadpool::Builder::new()
.num_threads(NUM_THREADS)
.thread_name("Executor".to_string())
.build(),
}
}
/// Schedules execution of the given job on this executor.
pub fn schedule(&self, job: Box<dyn Job>) {
let context = self.context.clone();
self.pool.execute(move || {
if let Err(error) = job.execute(&*context) {
error!("Executable returned error: {:?}", error)
}
})
}
pub fn enqueue(work: Box<dyn Execute>) {
// TODO
/// Blocks until all scheduled jobs are executed.
pub fn join(self) {
self.pool.join()
}
}

+ 3
- 0
src/message_handler.rs View File

@ -3,6 +3,9 @@ use std::io;
use crate::context::Context;
/// A trait for types that can handle reception of a message.
///
/// Message types are mapped to handler types by Dispatcher.
/// This trait is intended to allow composing handler logic.
pub trait MessageHandler<Message> {
fn run(self, context: &Context, message: Message) -> io::Result<()>;
}

Loading…
Cancel
Save