Browse Source

Handle UserStatusRequest in FakeServer.

wip
Titouan Rigoudy 4 years ago
parent
commit
a400ad595a
4 changed files with 114 additions and 18 deletions
  1. +70
    -16
      src/proto/server/client.rs
  2. +1
    -1
      src/proto/server/response.rs
  3. +41
    -1
      src/proto/server/testing.rs
  4. +2
    -0
      src/proto/user.rs

+ 70
- 16
src/proto/server/client.rs View File

@ -73,12 +73,15 @@ impl Client {
options: ClientOptions,
) -> Result<(), ClientLoginError> {
// TODO: Use into() once ServerRequest implements From<LoginRequest>.
let request = ServerRequest::LoginRequest(
options.credentials.into_login_request(options.version),
);
let login_request = options.credentials.into_login_request(options.version);
debug!("Client: sending login request: {:?}", login_request);
let request = ServerRequest::LoginRequest(login_request);
self.frame_stream.write(&request).await?;
let response = self.frame_stream.read().await?;
debug!("Client: received first response: {:?}", response);
match response {
Some(ServerResponse::LoginResponse(LoginResponse::LoginOk {
motd,
@ -109,6 +112,7 @@ impl Client {
tokio::select!(
maybe_request = request_stream.next() => {
if let Some(request) = maybe_request {
debug!("Client: sending request: {:?}", request);
self.frame_stream.write(&request).await?;
Ok(RunOnceResult::Continue)
} else {
@ -118,7 +122,10 @@ impl Client {
},
read_result = self.frame_stream.read() => {
match read_result? {
Some(response) => Ok(RunOnceResult::Response(response)),
Some(response) => {
debug!("Client: received response: {:?}", response);
Ok(RunOnceResult::Response(response))
}
// TODO: Consider returning error here.
None => Ok(RunOnceResult::Break),
}
@ -148,6 +155,7 @@ impl Client {
// Drain the receiving end of the connection.
while let Some(response) = self.frame_stream.read().await? {
debug!("Client: received response: {:?}", response);
yield response;
}
})
@ -156,11 +164,15 @@ impl Client {
#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
use futures::stream::{empty, StreamExt};
use tokio::net;
use crate::proto::server::testing::fake_server;
use crate::proto::server::{Credentials, ServerRequest, UserStatusRequest};
use crate::proto::server::testing::{fake_server, UserStatusMap};
use crate::proto::server::{
Credentials, ServerRequest, ServerResponse, UserStatusRequest,
UserStatusResponse,
};
use crate::proto::UserStatus;
use super::{Client, ClientOptions, Version};
@ -170,10 +182,49 @@ mod tests {
}
#[tokio::test]
async fn client_like_grpc() {
async fn login() {
init();
let (server, handle) = fake_server().await.unwrap();
let server_task = tokio::spawn(server.serve());
let stream = net::TcpStream::connect(handle.address()).await.unwrap();
let user_name = "alice".to_string();
let password = "sekrit".to_string();
let credentials = Credentials::new(user_name, password).unwrap();
let options = ClientOptions {
credentials,
version: Version::default(),
};
let client = Client::login(stream, options).await.unwrap();
// Send nothing, receive no responses.
let mut inbound = client.run(empty());
assert!(inbound.next().await.is_none());
handle.shutdown();
server_task.await.unwrap().unwrap();
}
#[tokio::test]
async fn simple_exchange() {
init();
let response = UserStatusResponse {
user_name: "alice".to_string(),
status: UserStatus::Online,
is_privileged: false,
};
let mut user_status_map = UserStatusMap::default();
user_status_map
.map
.insert("alice".to_string(), response.clone());
let (server, handle) = fake_server().await.unwrap();
let server = server.with_user_status_map(user_status_map);
let server_task = tokio::spawn(server.serve());
let stream = net::TcpStream::connect(handle.address()).await.unwrap();
@ -189,17 +240,20 @@ mod tests {
let client = Client::login(stream, options).await.unwrap();
let outbound = Box::pin(async_stream::stream! {
for _ in 0..2 {
yield ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "bob".to_string(),
});
}
yield ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "bob".to_string(),
});
yield ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "alice".to_string(),
});
});
let mut inbound = client.run(outbound);
while let Some(result) = inbound.next().await {
let _ = dbg!(result);
}
assert_eq!(
inbound.next().await.unwrap().unwrap(),
ServerResponse::UserStatusResponse(response)
);
assert!(inbound.next().await.is_none());
handle.shutdown();
server_task.await.unwrap().unwrap();


+ 1
- 1
src/proto/server/response.rs View File

@ -1295,7 +1295,7 @@ impl ValueDecode for UserInfoResponse {
* USER STATUS *
*=============*/
#[derive(Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UserStatusResponse {
pub user_name: String,
pub status: UserStatus,


+ 41
- 1
src/proto/server/testing.rs View File

@ -1,18 +1,33 @@
//! Provides utilities for testing protocol code.
use std::collections::HashMap;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use log::{info, warn};
use parking_lot::Mutex;
use tokio::net::TcpListener;
use tokio::sync::watch;
use crate::proto::core::frame::FrameStream;
use crate::proto::server::{LoginResponse, ServerRequest, ServerResponse};
use crate::proto::server::{
LoginResponse, ServerRequest, ServerResponse, UserStatusRequest,
UserStatusResponse,
};
#[derive(Debug, Default)]
pub struct UserStatusMap {
pub map: HashMap<String, UserStatusResponse>,
}
// TODO: fn insert(&self, UserStatusResponse), since response contains username.
// TODO: Derive Debug?
struct Handler {
frame_stream: FrameStream<ServerRequest, ServerResponse>,
peer_address: SocketAddr,
user_status_map: Arc<Mutex<UserStatusMap>>,
}
impl Handler {
@ -51,6 +66,23 @@ impl Handler {
while let Some(request) = self.frame_stream.read().await? {
info!("Handler: received request: {:?}", request);
match request {
ServerRequest::UserStatusRequest(UserStatusRequest { user_name }) => {
let entry = self
.user_status_map
.lock()
.map
.get(&user_name)
.map(|response| response.clone());
if let Some(response) = entry {
let response = ServerResponse::UserStatusResponse(response);
self.frame_stream.write(&response).await?;
}
}
_ => {
warn!("Handler: unhandled request: {:?}", request);
}
}
}
info!("Handler: client disconnecting, shutting down");
@ -86,6 +118,7 @@ impl GracefulHandler {
pub struct FakeServer {
listener: TcpListener,
shutdown_rx: watch::Receiver<()>,
user_status_map: Arc<Mutex<UserStatusMap>>,
}
/// Allows interacting with a running `FakeServer`.
@ -118,9 +151,15 @@ impl FakeServer {
Ok(FakeServer {
listener,
shutdown_rx,
user_status_map: Arc::new(Mutex::new(UserStatusMap::default())),
})
}
pub fn with_user_status_map(mut self, map: UserStatusMap) -> Self {
self.user_status_map = Arc::new(Mutex::new(map));
self
}
/// Returns the address to which this server is bound.
/// This is always localhost and a random port chosen by the OS.
pub fn address(&self) -> io::Result<SocketAddr> {
@ -146,6 +185,7 @@ impl FakeServer {
handler: Handler {
frame_stream: FrameStream::new(stream),
peer_address,
user_status_map: self.user_status_map.clone(),
},
shutdown_rx: self.shutdown_rx.clone(),
};


+ 2
- 0
src/proto/user.rs View File

@ -1,3 +1,5 @@
// TODO: Move to src/proto/core/user.rs.
use std::io;
use crate::proto::core::value::{


Loading…
Cancel
Save