Browse Source

Simplify MessageHandler for single-threaded use.

wip
Titouan Rigoudy 4 years ago
parent
commit
b62022b864
12 changed files with 68 additions and 69 deletions
  1. +0
    -1
      Cargo.lock
  2. +0
    -1
      client/Cargo.toml
  3. +5
    -6
      client/src/context.rs
  4. +7
    -3
      client/src/handlers/login_status_request_handler.rs
  5. +5
    -5
      client/src/handlers/privileged_users_response_handler.rs
  6. +13
    -11
      client/src/handlers/room_join_request_handler.rs
  7. +5
    -5
      client/src/handlers/room_join_response_handler.rs
  8. +4
    -5
      client/src/handlers/room_list_request_handler.rs
  9. +15
    -21
      client/src/handlers/room_list_response_handler.rs
  10. +2
    -2
      client/src/handlers/room_message_request_handler.rs
  11. +11
    -8
      client/src/handlers/room_message_response_handler.rs
  12. +1
    -1
      client/src/message_handler.rs

+ 0
- 1
Cargo.lock View File

@ -745,7 +745,6 @@ dependencies = [
"env_logger 0.8.4", "env_logger 0.8.4",
"futures", "futures",
"log", "log",
"parking_lot",
"serde", "serde",
"serde_json", "serde_json",
"slab 0.2.0", "slab 0.2.0",


+ 0
- 1
client/Cargo.toml View File

@ -12,7 +12,6 @@ crossbeam-channel = "^0.5"
env_logger = "^0.8" env_logger = "^0.8"
futures = "^0.3" futures = "^0.3"
log = "^0.4" log = "^0.4"
parking_lot = "^0.11"
serde = { version = "^1.0", features = ["derive"] } serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0" serde_json = "^1.0"
slab = "^0.2" slab = "^0.2"


+ 5
- 6
client/src/context.rs View File

@ -1,7 +1,6 @@
//! This module provides a central `Context` type that ties together all the //! This module provides a central `Context` type that ties together all the
//! different bits of client state. //! different bits of client state.
use parking_lot::Mutex;
use solstice_proto::ServerRequest; use solstice_proto::ServerRequest;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
@ -32,7 +31,7 @@ impl Default for State {
#[derive(Debug)] #[derive(Debug)]
pub struct Context { pub struct Context {
/// Mutable state. /// Mutable state.
pub state: Mutex<State>,
pub state: State,
/// Sender half of a channel used to send requests to the server. /// Sender half of a channel used to send requests to the server.
pub server_request_tx: Sender<ServerRequest>, pub server_request_tx: Sender<ServerRequest>,
@ -86,7 +85,7 @@ impl ContextBundle {
channel(options.control_response_buffer); channel(options.control_response_buffer);
Self { Self {
context: Context { context: Context {
state: Mutex::new(options.initial_state),
state: options.initial_state,
server_request_tx, server_request_tx,
control_response_tx, control_response_tx,
}, },
@ -124,8 +123,8 @@ mod tests {
#[test] #[test]
fn default_bundle_state_is_empty() { fn default_bundle_state_is_empty() {
let bundle = ContextBundle::default(); let bundle = ContextBundle::default();
let guard = bundle.context.state.lock();
assert_eq!(guard.rooms.get_room_list(), vec![]);
assert_eq!(guard.users.get_list(), vec![]);
let state = bundle.context.state;
assert_eq!(state.rooms.get_room_list(), vec![]);
assert_eq!(state.users.get_list(), vec![]);
} }
} }

+ 7
- 3
client/src/handlers/login_status_request_handler.rs View File

@ -10,8 +10,12 @@ pub struct LoginStatusRequestHandler;
impl MessageHandler for LoginStatusRequestHandler { impl MessageHandler for LoginStatusRequestHandler {
type Message = (); type Message = ();
fn run(self, context: &Context, &(): &Self::Message) -> anyhow::Result<()> {
let user_name = context.state.lock().user_name.clone();
fn run(
self,
context: &mut Context,
&(): &Self::Message,
) -> anyhow::Result<()> {
let user_name = context.state.user_name.clone();
context context
.control_response_tx .control_response_tx
.blocking_send(control::Response::LoginStatusResponse( .blocking_send(control::Response::LoginStatusResponse(
@ -46,7 +50,7 @@ mod tests {
let mut bundle = ContextBundle::new(options); let mut bundle = ContextBundle::new(options);
LoginStatusRequestHandler::default() LoginStatusRequestHandler::default()
.run(&bundle.context, &())
.run(&mut bundle.context, &())
.context("running handler")?; .context("running handler")?;
let response = bundle.control_response_rx.blocking_recv().unwrap(); let response = bundle.control_response_rx.blocking_recv().unwrap();


+ 5
- 5
client/src/handlers/privileged_users_response_handler.rs View File

@ -11,11 +11,11 @@ impl MessageHandler for PrivilegedUsersResponseHandler {
fn run( fn run(
self, self,
context: &Context,
context: &mut Context,
message: &PrivilegedUsersResponse, message: &PrivilegedUsersResponse,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let users = message.users.clone(); let users = message.users.clone();
context.state.lock().users.set_all_privileged(users);
context.state.users.set_all_privileged(users);
Ok(()) Ok(())
} }
@ -35,7 +35,7 @@ mod tests {
#[test] #[test]
fn run_sets_privileged_users() { fn run_sets_privileged_users() {
let bundle = ContextBundle::default();
let mut bundle = ContextBundle::default();
let response = PrivilegedUsersResponse { let response = PrivilegedUsersResponse {
users: vec![ users: vec![
@ -46,10 +46,10 @@ mod tests {
}; };
PrivilegedUsersResponseHandler::default() PrivilegedUsersResponseHandler::default()
.run(&bundle.context, &response)
.run(&mut bundle.context, &response)
.unwrap(); .unwrap();
let mut privileged = bundle.context.state.lock().users.get_all_privileged();
let mut privileged = bundle.context.state.users.get_all_privileged();
privileged.sort(); privileged.sort();
assert_eq!(privileged, response.users); assert_eq!(privileged, response.users);


+ 13
- 11
client/src/handlers/room_join_request_handler.rs View File

@ -10,14 +10,17 @@ use crate::room::RoomError;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct RoomJoinRequestHandler; pub struct RoomJoinRequestHandler;
fn start_joining(context: &Context, room_name: &str) -> Result<(), RoomError> {
let mut guard = context.state.lock();
let result = guard.rooms.start_joining(room_name);
fn start_joining(
context: &mut Context,
room_name: &str,
) -> Result<(), RoomError> {
let result = context.state.rooms.start_joining(room_name);
if let Err(RoomError::MembershipChangeInvalid(_, _)) = result { if let Err(RoomError::MembershipChangeInvalid(_, _)) = result {
// This `expect()` should never fail since the error was not // This `expect()` should never fail since the error was not
// `RoomNotFound` but `MembershipChangeInvalid`. // `RoomNotFound` but `MembershipChangeInvalid`.
let room = guard
let room = context
.state
.rooms .rooms
.get_strict(room_name) .get_strict(room_name)
.expect("querying room state"); .expect("querying room state");
@ -44,7 +47,7 @@ impl MessageHandler for RoomJoinRequestHandler {
fn run( fn run(
self, self,
context: &Context,
context: &mut Context,
room_name: &Self::Message, room_name: &Self::Message,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
start_joining(context, room_name).context("joining room")?; start_joining(context, room_name).context("joining room")?;
@ -80,11 +83,11 @@ mod tests {
let mut bundle = ContextBundle::default(); let mut bundle = ContextBundle::default();
RoomJoinRequestHandler::default() RoomJoinRequestHandler::default()
.run(&bundle.context, &"bleep".to_string())
.run(&mut bundle.context, &"bleep".to_string())
.unwrap_err(); .unwrap_err();
// Room state has not changed. // Room state has not changed.
assert_eq!(bundle.context.state.lock().rooms.get_room_list(), vec![]);
assert_eq!(bundle.context.state.rooms.get_room_list(), vec![]);
// Close the channel, so we can observe it was empty without hanging. // Close the channel, so we can observe it was empty without hanging.
drop(bundle.context.server_request_tx); drop(bundle.context.server_request_tx);
@ -106,12 +109,12 @@ mod tests {
let mut bundle = ContextBundle::new(options); let mut bundle = ContextBundle::new(options);
RoomJoinRequestHandler::default() RoomJoinRequestHandler::default()
.run(&bundle.context, &"bleep".to_string())
.run(&mut bundle.context, &"bleep".to_string())
.unwrap_err(); .unwrap_err();
// Room state has not changed. // Room state has not changed.
assert_eq!( assert_eq!(
bundle.context.state.lock().rooms.get_room_list(),
bundle.context.state.rooms.get_room_list(),
vec![("bleep".to_string(), room.clone())] vec![("bleep".to_string(), room.clone())]
); );
@ -138,7 +141,7 @@ mod tests {
let mut bundle = ContextBundle::new(options); let mut bundle = ContextBundle::new(options);
RoomJoinRequestHandler::default() RoomJoinRequestHandler::default()
.run(&bundle.context, &"bleep".to_string())
.run(&mut bundle.context, &"bleep".to_string())
.context("running handler")?; .context("running handler")?;
let request = bundle.server_request_rx.blocking_recv().unwrap(); let request = bundle.server_request_rx.blocking_recv().unwrap();
@ -148,7 +151,6 @@ mod tests {
bundle bundle
.context .context
.state .state
.lock()
.rooms .rooms
.get_strict("bleep") .get_strict("bleep")
.context("getting room")? .context("getting room")?


+ 5
- 5
client/src/handlers/room_join_response_handler.rs View File

@ -13,12 +13,12 @@ impl MessageHandler for RoomJoinResponseHandler {
fn run( fn run(
self, self,
context: &Context,
context: &mut Context,
response: &RoomJoinResponse, response: &RoomJoinResponse,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
{ {
let mut guard = context.state.lock();
let room = guard
let room = context
.state
.rooms .rooms
.join( .join(
&response.room_name, &response.room_name,
@ -78,7 +78,7 @@ mod tests {
}; };
RoomJoinResponseHandler::default() RoomJoinResponseHandler::default()
.run(&bundle.context, &response)
.run(&mut bundle.context, &response)
.unwrap(); .unwrap();
room.membership = Membership::Member; room.membership = Membership::Member;
@ -86,7 +86,7 @@ mod tests {
room.operators = ["shruti"].iter().map(|s| s.to_string()).collect(); room.operators = ["shruti"].iter().map(|s| s.to_string()).collect();
room.owner = Some("kim".to_string()); room.owner = Some("kim".to_string());
let rooms = bundle.context.state.lock().rooms.get_room_list();
let rooms = bundle.context.state.rooms.get_room_list();
assert_eq!(rooms, vec![("apple".to_string(), room.clone())]); assert_eq!(rooms, vec![("apple".to_string(), room.clone())]);


+ 4
- 5
client/src/handlers/room_list_request_handler.rs View File

@ -11,10 +11,9 @@ pub struct RoomListRequestHandler;
impl MessageHandler for RoomListRequestHandler { impl MessageHandler for RoomListRequestHandler {
type Message = (); type Message = ();
fn run(self, context: &Context, _message: &()) -> anyhow::Result<()> {
fn run(self, context: &mut Context, _message: &()) -> anyhow::Result<()> {
{ {
let guard = context.state.lock();
let rooms = guard.rooms.get_room_list();
let rooms = context.state.rooms.get_room_list();
let control_response = let control_response =
control::Response::RoomListResponse(control::RoomListResponse { control::Response::RoomListResponse(control::RoomListResponse {
rooms, rooms,
@ -60,7 +59,7 @@ mod tests {
let mut bundle = ContextBundle::default(); let mut bundle = ContextBundle::default();
RoomListRequestHandler::default() RoomListRequestHandler::default()
.run(&bundle.context, &())
.run(&mut bundle.context, &())
.unwrap(); .unwrap();
let request = bundle.server_request_rx.blocking_recv().unwrap(); let request = bundle.server_request_rx.blocking_recv().unwrap();
@ -81,7 +80,7 @@ mod tests {
let mut bundle = ContextBundle::new(options); let mut bundle = ContextBundle::new(options);
RoomListRequestHandler::default() RoomListRequestHandler::default()
.run(&bundle.context, &())
.run(&mut bundle.context, &())
.unwrap(); .unwrap();
let response = bundle.control_response_rx.blocking_recv().unwrap(); let response = bundle.control_response_rx.blocking_recv().unwrap();


+ 15
- 21
client/src/handlers/room_list_response_handler.rs View File

@ -13,26 +13,20 @@ impl MessageHandler for RoomListResponseHandler {
fn run( fn run(
self, self,
context: &Context,
context: &mut Context,
message: &RoomListResponse, message: &RoomListResponse,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let response = (*message).clone();
{
let mut guard = context.state.lock();
guard.rooms.set_room_list(response);
// Send under lock to avoid out-of-order sends.
let rooms = guard.rooms.get_room_list();
let control_response =
control::Response::RoomListResponse(control::RoomListResponse {
rooms,
});
context
.control_response_tx
.blocking_send(control_response)
.context("sending control response")?;
}
let response = message.clone();
context.state.rooms.set_room_list(response);
let rooms = context.state.rooms.get_room_list();
let control_response =
control::Response::RoomListResponse(control::RoomListResponse { rooms });
context
.control_response_tx
.blocking_send(control_response)
.context("sending control response")?;
Ok(()) Ok(())
} }
@ -58,7 +52,7 @@ mod tests {
#[test] #[test]
fn run_sets_room_list() { fn run_sets_room_list() {
let bundle = ContextBundle::default();
let mut bundle = ContextBundle::default();
let response = RoomListResponse { let response = RoomListResponse {
rooms: vec![("potato".to_string(), 123), ("apple".to_string(), 42)], rooms: vec![("potato".to_string(), 123), ("apple".to_string(), 42)],
@ -68,10 +62,10 @@ mod tests {
}; };
RoomListResponseHandler::default() RoomListResponseHandler::default()
.run(&bundle.context, &response)
.run(&mut bundle.context, &response)
.unwrap(); .unwrap();
let mut rooms = bundle.context.state.lock().rooms.get_room_list();
let mut rooms = bundle.context.state.rooms.get_room_list();
rooms.sort_by_key(room_name); rooms.sort_by_key(room_name);
assert_eq!( assert_eq!(


+ 2
- 2
client/src/handlers/room_message_request_handler.rs View File

@ -14,7 +14,7 @@ impl MessageHandler for RoomMessageRequestHandler {
fn run( fn run(
self, self,
context: &Context,
context: &mut Context,
message: &control::RoomMessageRequest, message: &control::RoomMessageRequest,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
context context
@ -51,7 +51,7 @@ mod tests {
RoomMessageRequestHandler::default() RoomMessageRequestHandler::default()
.run( .run(
&bundle.context,
&mut bundle.context,
&control::RoomMessageRequest { &control::RoomMessageRequest {
room_name: "bleep".to_string(), room_name: "bleep".to_string(),
message: "yo!".to_string(), message: "yo!".to_string(),


+ 11
- 8
client/src/handlers/room_message_response_handler.rs View File

@ -29,11 +29,10 @@ impl MessageHandler for RoomMessageResponseHandler {
fn run( fn run(
self, self,
context: &Context,
context: &mut Context,
message: &server::RoomMessageResponse, message: &server::RoomMessageResponse,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut state = context.state.lock();
if let Err(err) = add_message(&mut state.rooms, message) {
if let Err(err) = add_message(&mut context.state.rooms, message) {
warn!("Error storing room message: {:#}", err); warn!("Error storing room message: {:#}", err);
} }
@ -72,7 +71,7 @@ mod tests {
RoomMessageResponseHandler::default() RoomMessageResponseHandler::default()
.run( .run(
&bundle.context,
&mut bundle.context,
&server::RoomMessageResponse { &server::RoomMessageResponse {
room_name: "bleep".to_string(), room_name: "bleep".to_string(),
user_name: "shruti".to_string(), user_name: "shruti".to_string(),
@ -110,11 +109,11 @@ mod tests {
.rooms .rooms
.insert("apple".to_string(), room.clone()); .insert("apple".to_string(), room.clone());
let bundle = ContextBundle::new(options);
let mut bundle = ContextBundle::new(options);
RoomMessageResponseHandler::default() RoomMessageResponseHandler::default()
.run( .run(
&bundle.context,
&mut bundle.context,
&server::RoomMessageResponse { &server::RoomMessageResponse {
room_name: "apple".to_string(), room_name: "apple".to_string(),
user_name: "shruti".to_string(), user_name: "shruti".to_string(),
@ -123,8 +122,12 @@ mod tests {
) )
.expect("running handler"); .expect("running handler");
let state = bundle.context.state.lock();
let room = state.rooms.get_strict("apple").expect("looking up room");
let room = bundle
.context
.state
.rooms
.get_strict("apple")
.expect("looking up room");
assert_eq!( assert_eq!(
room.messages, room.messages,


+ 1
- 1
client/src/message_handler.rs View File

@ -12,7 +12,7 @@ pub trait MessageHandler: Debug {
/// Attempts to handle the given message against the given context. /// Attempts to handle the given message against the given context.
fn run( fn run(
self, self,
context: &Context,
context: &mut Context,
message: &Self::Message, message: &Self::Message,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;


Loading…
Cancel
Save