Browse Source

Add timestamp to room messages.

Also introduce an injection seam for the system clock.
wip
Titouan Rigoudy 4 years ago
parent
commit
236ccc56a1
10 changed files with 214 additions and 44 deletions
  1. +1
    -0
      Cargo.lock
  2. +3
    -0
      client/Cargo.toml
  3. +13
    -11
      client/src/client.rs
  4. +54
    -0
      client/src/clock.rs
  5. +19
    -0
      client/src/context.rs
  6. +3
    -3
      client/src/control/response.rs
  7. +1
    -0
      client/src/handlers/room_message_request_handler.rs
  8. +22
    -7
      client/src/handlers/room_message_response_handler.rs
  9. +1
    -0
      client/src/main.rs
  10. +97
    -23
      client/src/room.rs

+ 1
- 0
Cargo.lock View File

@ -745,6 +745,7 @@ dependencies = [
"env_logger 0.8.4",
"futures",
"log",
"parking_lot",
"serde",
"serde_json",
"slab 0.2.0",


+ 3
- 0
client/Cargo.toml View File

@ -19,3 +19,6 @@ solstice-proto = { path = "../proto" }
thiserror = "^1.0"
tokio = { version = "1.0", features = ["full"] }
tokio-tungstenite = "0.15"
[dev-dependencies]
parking_lot = "^0.11.0"

+ 13
- 11
client/src/client.rs View File

@ -611,17 +611,19 @@ impl Client {
&mut self,
response: server::RoomMessageResponse,
) {
let result = self.rooms.add_message(
&response.room_name,
room::Message {
user_name: response.user_name.clone(),
message: response.message.clone(),
},
);
if let Err(err) = result {
error!("RoomMessageResponse: {}", err);
return;
}
let room = match self.rooms.get_mut_strict(&response.room_name) {
Ok(room) => room,
Err(err) => {
error!("RoomMessageResponse: {}", err);
return;
}
};
room.messages.insert(room::Message {
received_at: std::time::SystemTime::now(),
user_name: response.user_name.clone(),
message: response.message.clone(),
});
self.send_to_controller(control::Response::RoomMessageResponse(
control::RoomMessageResponse {


+ 54
- 0
client/src/clock.rs View File

@ -0,0 +1,54 @@
//! Defines an injectable clock, for testing time-sensitive code.
use std::time::SystemTime;
#[cfg(test)]
mod simulated {
use std::sync::Arc;
use std::time::SystemTime;
use parking_lot::Mutex;
#[derive(Debug)]
pub struct Clock {
now: Arc<Mutex<SystemTime>>,
}
impl Clock {
pub fn new(now: SystemTime) -> Self {
Self {
now: Arc::new(Mutex::new(now)),
}
}
pub fn now(&self) -> SystemTime {
*self.now.lock()
}
}
}
#[cfg(test)]
pub use simulated::Clock as SimulatedSystemClock;
#[derive(Debug, Default)]
pub struct SystemClock {
#[cfg(test)]
simulated_clock: Option<SimulatedSystemClock>,
}
impl SystemClock {
#[cfg(test)]
pub fn set_simulated_clock(&mut self, clock: Option<SimulatedSystemClock>) {
self.simulated_clock = clock
}
pub fn now(&self) -> SystemTime {
#[cfg(test)]
if let Some(ref clock) = self.simulated_clock {
return clock.now();
}
SystemTime::now()
}
}

+ 19
- 0
client/src/context.rs View File

@ -4,6 +4,9 @@
use solstice_proto::ServerRequest;
use tokio::sync::mpsc::{channel, Receiver, Sender};
#[cfg(test)]
use crate::clock::SimulatedSystemClock;
use crate::clock::SystemClock;
use crate::control::Response as ControlResponse;
use crate::room::RoomMap;
use crate::user::UserMap;
@ -38,6 +41,9 @@ pub struct Context {
/// Sender half of a channel used to send responses to the controller.
pub control_response_tx: Sender<ControlResponse>,
/// Clock used to get system time.
pub system_clock: SystemClock,
}
/// Convenience bundle for creating new `Context` structs.
@ -64,6 +70,10 @@ pub struct ContextOptions {
/// The buffer size of the control response channel.
pub control_response_buffer: usize,
#[cfg(test)]
/// A simulated system clock to use in tests.
pub simulated_clock: Option<SimulatedSystemClock>,
}
impl Default for ContextOptions {
@ -72,6 +82,8 @@ impl Default for ContextOptions {
initial_state: State::default(),
server_request_buffer: 100,
control_response_buffer: 100,
#[cfg(test)]
simulated_clock: None,
}
}
}
@ -83,11 +95,18 @@ impl ContextBundle {
channel(options.server_request_buffer);
let (control_response_tx, control_response_rx) =
channel(options.control_response_buffer);
#[allow(unused_mut)]
let mut system_clock = SystemClock::default();
#[cfg(test)]
system_clock.set_simulated_clock(options.simulated_clock);
Self {
context: Context {
state: options.initial_state,
server_request_tx,
control_response_tx,
system_clock,
},
server_request_rx,
control_response_rx,


+ 3
- 3
client/src/control/response.rs View File

@ -109,7 +109,7 @@ mod tests {
use solstice_proto::{User, UserStatus};
use crate::room::{Membership, Room, Visibility};
use crate::room::{Membership, MessageHistory, Room, Visibility};
use super::{
LoginStatusResponse, RoomJoinResponse, RoomLeaveResponse, RoomListResponse,
@ -200,7 +200,7 @@ mod tests {
owner: None,
operators: HashSet::new(),
members: HashSet::new(),
messages: vec![],
messages: MessageHistory::default(),
tickers: vec![],
}
}
@ -269,7 +269,7 @@ mod tests {
owner: None,
operators: HashSet::new(),
members: HashSet::new(),
messages: vec![],
messages: MessageHistory::default(),
tickers: vec![],
}
)],


+ 1
- 0
client/src/handlers/room_message_request_handler.rs View File

@ -17,6 +17,7 @@ impl MessageHandler for RoomMessageRequestHandler {
context: &mut Context,
message: &control::RoomMessageRequest,
) -> anyhow::Result<()> {
// TODO: Add message to room.
context
.server_request_tx
.blocking_send(ServerRequest::RoomMessageRequest(


+ 22
- 7
client/src/handlers/room_message_response_handler.rs View File

@ -5,19 +5,22 @@ use solstice_proto::server;
use crate::context::Context;
use crate::control;
use crate::message_handler::MessageHandler;
use crate::room::{Message as RoomMessage, RoomMap};
use crate::room::Message as RoomMessage;
#[derive(Debug, Default)]
pub struct RoomMessageResponseHandler;
fn add_message(
rooms: &mut RoomMap,
context: &mut Context,
response: &server::RoomMessageResponse,
) -> anyhow::Result<()> {
let room = rooms
let room = context
.state
.rooms
.get_mut_strict(&response.room_name)
.context("looking up room")?;
room.messages.push(RoomMessage {
room.messages.insert(RoomMessage {
received_at: context.system_clock.now(),
user_name: response.user_name.clone(),
message: response.message.clone(),
});
@ -32,7 +35,7 @@ impl MessageHandler for RoomMessageResponseHandler {
context: &mut Context,
message: &server::RoomMessageResponse,
) -> anyhow::Result<()> {
if let Err(err) = add_message(&mut context.state.rooms, message) {
if let Err(err) = add_message(context, message) {
warn!("Error storing room message: {:#}", err);
}
@ -56,8 +59,11 @@ impl MessageHandler for RoomMessageResponseHandler {
#[cfg(test)]
mod tests {
use std::time::{Duration, SystemTime};
use solstice_proto::server;
use crate::clock::SimulatedSystemClock;
use crate::context::{ContextBundle, ContextOptions};
use crate::control;
use crate::message_handler::MessageHandler;
@ -65,6 +71,10 @@ mod tests {
use super::RoomMessageResponseHandler;
fn system_time_from_secs(seconds: u64) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_secs(seconds)
}
#[test]
fn run_forwards_response() {
let mut bundle = ContextBundle::default();
@ -98,7 +108,8 @@ mod tests {
#[test]
fn run_stores_message() {
let mut room = Room::new(Visibility::Public, 42);
room.messages.push(RoomMessage {
room.messages.insert(RoomMessage {
received_at: system_time_from_secs(42),
user_name: "karandeep".to_string(),
message: "namaste!".to_string(),
});
@ -108,6 +119,8 @@ mod tests {
.initial_state
.rooms
.insert("apple".to_string(), room.clone());
options.simulated_clock =
Some(SimulatedSystemClock::new(system_time_from_secs(43)));
let mut bundle = ContextBundle::new(options);
@ -130,13 +143,15 @@ mod tests {
.expect("looking up room");
assert_eq!(
room.messages,
room.messages.to_vec(),
vec![
RoomMessage {
received_at: system_time_from_secs(42),
user_name: "karandeep".to_string(),
message: "namaste!".to_string(),
},
RoomMessage {
received_at: system_time_from_secs(43),
user_name: "shruti".to_string(),
message: "yo!".to_string(),
},


+ 1
- 0
client/src/main.rs View File

@ -11,6 +11,7 @@ use tokio::net::TcpStream;
use tokio::sync::mpsc;
mod client;
mod clock;
mod config;
mod context;
mod control;


+ 97
- 23
client/src/room.rs View File

@ -1,7 +1,8 @@
use std::collections::{HashMap, HashSet};
use std::mem;
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use solstice_proto::{server, User};
use thiserror::Error;
@ -32,13 +33,78 @@ pub enum Visibility {
PrivateOther,
}
/// This structure contains a chat room message.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
/// A message sent to a chat room.
#[derive(
Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
)]
pub struct Message {
/// Time at which the message was received by this client.
///
/// We use `SystemTime` instead of `Instant` because this is serialized and
/// sent to API clients.
///
/// Defined first in order for the `Ord` and `PartialOrd` derived traits to
/// sort by this key first.
pub received_at: SystemTime,
/// The user name of the message sender.
pub user_name: String,
/// The contents of the message.
pub message: String,
}
/// The history of messages sent for a single chat room.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct MessageHistory {
/// Messages, sorted in increasing order.
messages: Vec<Message>,
}
// MessageHistory should be transparent for serialization purposes.
impl<'de> Deserialize<'de> for MessageHistory {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let messages = Vec::<Message>::deserialize(deserializer)?;
Ok(Self::new(messages))
}
}
// MessageHistory should be transparent for serialization purposes.
impl Serialize for MessageHistory {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.messages.serialize(serializer)
}
}
impl MessageHistory {
pub fn new(mut messages: Vec<Message>) -> Self {
messages.sort();
Self { messages }
}
/// Inserts a `message` into this history.
pub fn insert(&mut self, message: Message) {
self.messages.push(message);
// This could be terrible for performance in the general case, but we know
// that messages should be coming in almost-always sorted since
// `received_at` is usually set to `now()`.
self.messages.sort();
}
#[cfg(test)]
/// Returns the list of messages sorted in increasing chronological order.
pub fn to_vec(&self) -> Vec<Message> {
return self.messages.clone();
}
}
/// This structure contains the last known information about a chat room.
/// It does not store the name, as that is stored implicitly as the key in the
/// room hash table.
@ -59,8 +125,8 @@ pub struct Room {
pub operators: HashSet<String>,
/// The names of the room's members.
pub members: HashSet<String>,
/// The messages sent to this chat room, in chronological order.
pub messages: Vec<Message>,
/// The messages sent to this chat room.
pub messages: MessageHistory,
/// The tickers displayed in this room.
pub tickers: Vec<(String, String)>,
}
@ -76,7 +142,7 @@ impl Room {
owner: None,
operators: HashSet::new(),
members: HashSet::new(),
messages: Vec::new(),
messages: MessageHistory::default(),
tickers: Vec::new(),
}
}
@ -290,17 +356,6 @@ impl RoomMap {
Ok(())
}
/// Saves the given message as the last one in the given room.
pub fn add_message(
&mut self,
room_name: &str,
message: Message,
) -> Result<(), RoomError> {
let room = self.get_mut_strict(room_name)?;
room.messages.push(message);
Ok(())
}
/// Inserts the given user in the given room's set of members.
/// Returns an error if the room is not found.
pub fn insert_member(
@ -342,9 +397,11 @@ impl RoomMap {
#[cfg(test)]
mod tests {
use std::time::{Duration, SystemTime};
use solstice_proto::server::RoomListResponse;
use super::{Membership, Message, Room, RoomMap, Visibility};
use super::{Membership, Message, MessageHistory, Room, RoomMap, Visibility};
#[test]
fn deserialize_membership() {
@ -366,10 +423,17 @@ mod tests {
fn deserialize_message() {
assert_eq!(
serde_json::from_str::<Message>(
r#"{ "user_name":"karandeep", "message":"namaste" }"#
r#"{
"received_at": { "secs_since_epoch": 42, "nanos_since_epoch": 1337 },
"user_name":"karandeep",
"message":"namaste"
}"#
)
.unwrap(),
Message {
received_at: SystemTime::UNIX_EPOCH
+ Duration::from_secs(42)
+ Duration::from_nanos(1337),
user_name: "karandeep".to_string(),
message: "namaste".to_string()
}
@ -389,8 +453,16 @@ mod tests {
"operators": ["op1", "op2"],
"members": ["m1", "m2"],
"messages": [
{ "user_name": "u1", "message": "msg1" },
{ "user_name": "u2", "message": "msg2" }
{
"received_at": { "secs_since_epoch": 43, "nanos_since_epoch": 0 },
"user_name": "u2",
"message": "msg2"
},
{
"received_at": { "secs_since_epoch": 42, "nanos_since_epoch": 0 },
"user_name": "u1",
"message": "msg1"
}
],
"tickers": [["t11", "t12"], ["t21", "t22"]]
}"#
@ -410,16 +482,18 @@ mod tests {
.iter()
.cloned()
.collect(),
messages: vec![
messages: MessageHistory::new(vec![
Message {
received_at: SystemTime::UNIX_EPOCH + Duration::from_secs(42),
user_name: "u1".to_string(),
message: "msg1".to_string(),
},
Message {
received_at: SystemTime::UNIX_EPOCH + Duration::from_secs(43),
user_name: "u2".to_string(),
message: "msg2".to_string(),
}
],
]),
tickers: vec![
("t11".to_string(), "t12".to_string()),
("t21".to_string(), "t22".to_string()),


Loading…
Cancel
Save