From d93b785c400b72ca307b8f903b4cb534924de959 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Sat, 16 Jan 2021 22:20:09 +0100 Subject: [PATCH] Define proto::server::Client, implement rough login. --- src/proto/server/client.rs | 231 ++++++++++++++++++++++++++++++++++++ src/proto/server/mod.rs | 1 + src/proto/server/testing.rs | 40 +++++-- 3 files changed, 264 insertions(+), 8 deletions(-) create mode 100644 src/proto/server/client.rs diff --git a/src/proto/server/client.rs b/src/proto/server/client.rs new file mode 100644 index 0000000..963d0e8 --- /dev/null +++ b/src/proto/server/client.rs @@ -0,0 +1,231 @@ +//! A client interface for remote servers. + +use std::io; + +use thiserror::Error; +use tokio::net; +use tokio::sync::mpsc; + +use crate::proto::core::frame::FrameStream; +use crate::proto::server::{ + LoginRequest, LoginResponse, ServerRequest, ServerResponse, +}; + +struct ClientOptions { + request_channel_capacity: usize, + response_channel_capacity: usize, +} + +impl Default for ClientOptions { + fn default() -> Self { + ClientOptions { + request_channel_capacity: 100, + response_channel_capacity: 100, + } + } +} + +pub struct Client { + frame_stream: FrameStream, + request_receiver: mpsc::Receiver, + response_sender: Option>, +} + +pub struct NewClient { + pub client: Client, + pub request_sender: mpsc::Sender, + pub response_receiver: mpsc::Receiver, +} + +#[derive(Debug, Error)] +pub enum ClientRunError { + #[error("client has run to completion already")] + CompletedError, + + #[error("response channel send error: {0}")] + SendResponseError(#[from] mpsc::error::SendError), + + #[error("i/o error: {0}")] + IOError(#[from] io::Error), +} + +async fn login( + frame_stream: &mut FrameStream, +) -> io::Result<()> { + let request = ServerRequest::LoginRequest( + LoginRequest::new("alice", "sekrit", 181, 100).unwrap(), + ); + frame_stream.write(&request).await?; + + let response = frame_stream.read().await?; + match response { + ServerResponse::LoginResponse(LoginResponse::LoginOk { + motd, + ip, + password_md5_opt, + }) => { + println!("Logged in successfully!"); + println!("Message Of The Day: {}", motd); + println!("Public IP address: {}", ip); + println!("Password MD5: {:?}", password_md5_opt); + } + ServerResponse::LoginResponse(LoginResponse::LoginFail { reason }) => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Login failed: {}", reason), + )); + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("expected login response, got: {:?}", response), + )); + } + }; + + Ok(()) +} + +impl Client { + async fn connect( + address: std::net::SocketAddr, + options: ClientOptions, + ) -> io::Result { + let tcp_stream = net::TcpStream::connect(address).await?; + let mut frame_stream = FrameStream::new(tcp_stream); + + login(&mut frame_stream).await?; + + let (request_sender, request_receiver) = + mpsc::channel(options.request_channel_capacity); + let (response_sender, response_receiver) = + mpsc::channel(options.response_channel_capacity); + + let client = Client { + frame_stream, + request_receiver, + response_sender: Some(response_sender), + }; + + Ok(NewClient { + client, + request_sender, + response_receiver, + }) + } + + /// Runs this client's event loop once. + /// + /// Returns `Ok(true)` if `run()` should continue. + /// Returns `Ok(false)` if `run()` should return successfully. + /// Returns an error if `run()` should return that error. + async fn run_once(&mut self) -> Result { + let response_sender = match self.response_sender { + Some(ref response_sender) => response_sender, + None => return Err(ClientRunError::CompletedError), + }; + + let mut done = false; + + tokio::select!( + maybe_request = self.request_receiver.recv() => { + if let Some(request) = maybe_request { + self.frame_stream.write(&request).await?; + } else { + // Sender has been dropped. + done = true + } + } + read_result = self.frame_stream.read() => { + let response = read_result?; + response_sender.send(response).await?; + } + ); + + Ok(!done) + } + + /// Runs this client - sending requests and receiving responses. + pub async fn run(&mut self) -> Result<(), ClientRunError> { + while self.run_once().await? {} + + self.response_sender = None; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::proto::server::testing::FakeServer; + use crate::proto::server::*; + + use super::{Client, ClientOptions}; + + #[tokio::test] + async fn login() { + // TODO: Check that server does not crash. + let mut server = FakeServer::new().await.unwrap(); + let address = server.address().unwrap(); + let _server_task = tokio::spawn(async move { server.run().await.unwrap() }); + + let new_client = Client::connect(address, ClientOptions::default()) + .await + .unwrap(); + + let mut client = new_client.client; + let mut receiver = new_client.response_receiver; + + let client_task = tokio::spawn(async move { client.run().await.unwrap() }); + + let receive_task = tokio::spawn(async move { + loop { + match receiver.recv().await { + None => break, + Some(response) => { + dbg!(response); + } + } + } + }); + + { + let sender = new_client.request_sender; + let request = ServerRequest::UserStatusRequest(UserStatusRequest { + user_name: "alice".to_string(), + }); + sender.send(request).await.unwrap(); + } + + client_task.await.unwrap(); + receive_task.await.unwrap(); + } + + #[tokio::test] + async fn sender_is_cloneable() { + let mut server = FakeServer::new().await.unwrap(); + let address = server.address().unwrap(); + tokio::spawn(async move { server.run().await.unwrap() }); + + let new_client = Client::connect(address, ClientOptions::default()) + .await + .unwrap(); + + let mut client = new_client.client; + tokio::spawn(async move { client.run().await.unwrap() }); + + let sender = new_client.request_sender; + + let sender2 = sender.clone(); + tokio::spawn(async move { + let request = ServerRequest::UserStatusRequest(UserStatusRequest { + user_name: "alice".to_string(), + }); + sender2.send(request).await.unwrap(); + }); + + let request = ServerRequest::UserStatusRequest(UserStatusRequest { + user_name: "bob".to_string(), + }); + sender.send(request).await.unwrap(); + } +} diff --git a/src/proto/server/mod.rs b/src/proto/server/mod.rs index 99b0d28..74eb3a6 100644 --- a/src/proto/server/mod.rs +++ b/src/proto/server/mod.rs @@ -1,3 +1,4 @@ +mod client; mod constants; mod request; mod response; diff --git a/src/proto/server/testing.rs b/src/proto/server/testing.rs index 1978e62..31d7b90 100644 --- a/src/proto/server/testing.rs +++ b/src/proto/server/testing.rs @@ -1,19 +1,25 @@ //! Provides utilities for testing protocol code. use std::io; -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::net::{TcpListener, TcpStream}; use crate::proto::core::frame::FrameStream; -use crate::proto::{ServerRequest, ServerResponse}; +use crate::proto::server::{LoginResponse, ServerRequest, ServerResponse}; -async fn process(stream: TcpStream) -> io::Result<()> { - let mut connection = +async fn process( + stream: TcpStream, + peer_address: SocketAddr, +) -> io::Result<()> { + let mut frame_stream = FrameStream::::new(stream); - let _request = match connection.read().await? { - ServerRequest::LoginRequest(request) => request, + match frame_stream.read().await? { + ServerRequest::LoginRequest(request) => { + // TODO: Logging. + println!("FakeServer: Received login request: {:?}", request); + } request => { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -22,6 +28,24 @@ async fn process(stream: TcpStream) -> io::Result<()> { } }; + let ipv4_addr = match peer_address.ip() { + IpAddr::V4(ipv4_addr) => ipv4_addr, + IpAddr::V6(ipv6_addr) => { + println!( + "FakeServer: peer connected from IPv6 address {}, echoing 0.0.0.0", + ipv6_addr + ); + Ipv4Addr::UNSPECIFIED + } + }; + + let response = ServerResponse::LoginResponse(LoginResponse::LoginOk { + motd: "hi there".to_string(), + ip: ipv4_addr, + password_md5_opt: None, + }); + frame_stream.write(&response).await?; + Ok(()) } @@ -46,8 +70,8 @@ impl FakeServer { /// Runs the server: accepts incoming connections and responds to requests. pub async fn run(&mut self) -> io::Result<()> { loop { - let (socket, _peer_address) = self.listener.accept().await?; - tokio::spawn(async move { process(socket).await }); + let (socket, peer_address) = self.listener.accept().await?; + tokio::spawn(async move { process(socket, peer_address).await }); } } }