From a400ad595a99a91fea105aa37830bad046c5295c Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Fri, 22 Jan 2021 21:59:32 +0100 Subject: [PATCH] Handle UserStatusRequest in FakeServer. --- src/proto/server/client.rs | 86 +++++++++++++++++++++++++++++------- src/proto/server/response.rs | 2 +- src/proto/server/testing.rs | 42 +++++++++++++++++- src/proto/user.rs | 2 + 4 files changed, 114 insertions(+), 18 deletions(-) diff --git a/src/proto/server/client.rs b/src/proto/server/client.rs index 64aebc9..9733772 100644 --- a/src/proto/server/client.rs +++ b/src/proto/server/client.rs @@ -73,12 +73,15 @@ impl Client { options: ClientOptions, ) -> Result<(), ClientLoginError> { // TODO: Use into() once ServerRequest implements From. - let request = ServerRequest::LoginRequest( - options.credentials.into_login_request(options.version), - ); + let login_request = options.credentials.into_login_request(options.version); + debug!("Client: sending login request: {:?}", login_request); + + let request = ServerRequest::LoginRequest(login_request); self.frame_stream.write(&request).await?; let response = self.frame_stream.read().await?; + debug!("Client: received first response: {:?}", response); + match response { Some(ServerResponse::LoginResponse(LoginResponse::LoginOk { motd, @@ -109,6 +112,7 @@ impl Client { tokio::select!( maybe_request = request_stream.next() => { if let Some(request) = maybe_request { + debug!("Client: sending request: {:?}", request); self.frame_stream.write(&request).await?; Ok(RunOnceResult::Continue) } else { @@ -118,7 +122,10 @@ impl Client { }, read_result = self.frame_stream.read() => { match read_result? { - Some(response) => Ok(RunOnceResult::Response(response)), + Some(response) => { + debug!("Client: received response: {:?}", response); + Ok(RunOnceResult::Response(response)) + } // TODO: Consider returning error here. None => Ok(RunOnceResult::Break), } @@ -148,6 +155,7 @@ impl Client { // Drain the receiving end of the connection. while let Some(response) = self.frame_stream.read().await? { + debug!("Client: received response: {:?}", response); yield response; } }) @@ -156,11 +164,15 @@ impl Client { #[cfg(test)] mod tests { - use futures::stream::StreamExt; + use futures::stream::{empty, StreamExt}; use tokio::net; - use crate::proto::server::testing::fake_server; - use crate::proto::server::{Credentials, ServerRequest, UserStatusRequest}; + use crate::proto::server::testing::{fake_server, UserStatusMap}; + use crate::proto::server::{ + Credentials, ServerRequest, ServerResponse, UserStatusRequest, + UserStatusResponse, + }; + use crate::proto::UserStatus; use super::{Client, ClientOptions, Version}; @@ -170,10 +182,49 @@ mod tests { } #[tokio::test] - async fn client_like_grpc() { + async fn login() { + init(); + + let (server, handle) = fake_server().await.unwrap(); + let server_task = tokio::spawn(server.serve()); + + let stream = net::TcpStream::connect(handle.address()).await.unwrap(); + + let user_name = "alice".to_string(); + let password = "sekrit".to_string(); + let credentials = Credentials::new(user_name, password).unwrap(); + + let options = ClientOptions { + credentials, + version: Version::default(), + }; + let client = Client::login(stream, options).await.unwrap(); + + // Send nothing, receive no responses. + let mut inbound = client.run(empty()); + assert!(inbound.next().await.is_none()); + + handle.shutdown(); + server_task.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn simple_exchange() { init(); + let response = UserStatusResponse { + user_name: "alice".to_string(), + status: UserStatus::Online, + is_privileged: false, + }; + + let mut user_status_map = UserStatusMap::default(); + user_status_map + .map + .insert("alice".to_string(), response.clone()); + let (server, handle) = fake_server().await.unwrap(); + let server = server.with_user_status_map(user_status_map); let server_task = tokio::spawn(server.serve()); let stream = net::TcpStream::connect(handle.address()).await.unwrap(); @@ -189,17 +240,20 @@ mod tests { let client = Client::login(stream, options).await.unwrap(); let outbound = Box::pin(async_stream::stream! { - for _ in 0..2 { - yield ServerRequest::UserStatusRequest(UserStatusRequest { - user_name: "bob".to_string(), - }); - } + yield ServerRequest::UserStatusRequest(UserStatusRequest { + user_name: "bob".to_string(), + }); + yield ServerRequest::UserStatusRequest(UserStatusRequest { + user_name: "alice".to_string(), + }); }); let mut inbound = client.run(outbound); - while let Some(result) = inbound.next().await { - let _ = dbg!(result); - } + assert_eq!( + inbound.next().await.unwrap().unwrap(), + ServerResponse::UserStatusResponse(response) + ); + assert!(inbound.next().await.is_none()); handle.shutdown(); server_task.await.unwrap().unwrap(); diff --git a/src/proto/server/response.rs b/src/proto/server/response.rs index d3066ef..9a7d7b7 100644 --- a/src/proto/server/response.rs +++ b/src/proto/server/response.rs @@ -1295,7 +1295,7 @@ impl ValueDecode for UserInfoResponse { * USER STATUS * *=============*/ -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct UserStatusResponse { pub user_name: String, pub status: UserStatus, diff --git a/src/proto/server/testing.rs b/src/proto/server/testing.rs index 6cf7c51..e1a9618 100644 --- a/src/proto/server/testing.rs +++ b/src/proto/server/testing.rs @@ -1,18 +1,33 @@ //! Provides utilities for testing protocol code. +use std::collections::HashMap; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; use log::{info, warn}; +use parking_lot::Mutex; use tokio::net::TcpListener; use tokio::sync::watch; use crate::proto::core::frame::FrameStream; -use crate::proto::server::{LoginResponse, ServerRequest, ServerResponse}; +use crate::proto::server::{ + LoginResponse, ServerRequest, ServerResponse, UserStatusRequest, + UserStatusResponse, +}; + +#[derive(Debug, Default)] +pub struct UserStatusMap { + pub map: HashMap, +} + +// TODO: fn insert(&self, UserStatusResponse), since response contains username. +// TODO: Derive Debug? struct Handler { frame_stream: FrameStream, peer_address: SocketAddr, + user_status_map: Arc>, } impl Handler { @@ -51,6 +66,23 @@ impl Handler { while let Some(request) = self.frame_stream.read().await? { info!("Handler: received request: {:?}", request); + match request { + ServerRequest::UserStatusRequest(UserStatusRequest { user_name }) => { + let entry = self + .user_status_map + .lock() + .map + .get(&user_name) + .map(|response| response.clone()); + if let Some(response) = entry { + let response = ServerResponse::UserStatusResponse(response); + self.frame_stream.write(&response).await?; + } + } + _ => { + warn!("Handler: unhandled request: {:?}", request); + } + } } info!("Handler: client disconnecting, shutting down"); @@ -86,6 +118,7 @@ impl GracefulHandler { pub struct FakeServer { listener: TcpListener, shutdown_rx: watch::Receiver<()>, + user_status_map: Arc>, } /// Allows interacting with a running `FakeServer`. @@ -118,9 +151,15 @@ impl FakeServer { Ok(FakeServer { listener, shutdown_rx, + user_status_map: Arc::new(Mutex::new(UserStatusMap::default())), }) } + pub fn with_user_status_map(mut self, map: UserStatusMap) -> Self { + self.user_status_map = Arc::new(Mutex::new(map)); + self + } + /// Returns the address to which this server is bound. /// This is always localhost and a random port chosen by the OS. pub fn address(&self) -> io::Result { @@ -146,6 +185,7 @@ impl FakeServer { handler: Handler { frame_stream: FrameStream::new(stream), peer_address, + user_status_map: self.user_status_map.clone(), }, shutdown_rx: self.shutdown_rx.clone(), }; diff --git a/src/proto/user.rs b/src/proto/user.rs index e0409d6..9e9720f 100644 --- a/src/proto/user.rs +++ b/src/proto/user.rs @@ -1,3 +1,5 @@ +// TODO: Move to src/proto/core/user.rs. + use std::io; use crate::proto::core::value::{