| @ -0,0 +1,231 @@ | |||||
| //! A client interface for remote servers. | |||||
| use std::io; | |||||
| use thiserror::Error; | |||||
| use tokio::net; | |||||
| use tokio::sync::mpsc; | |||||
| use crate::proto::core::frame::FrameStream; | |||||
| use crate::proto::server::{ | |||||
| LoginRequest, LoginResponse, ServerRequest, ServerResponse, | |||||
| }; | |||||
| struct ClientOptions { | |||||
| request_channel_capacity: usize, | |||||
| response_channel_capacity: usize, | |||||
| } | |||||
| impl Default for ClientOptions { | |||||
| fn default() -> Self { | |||||
| ClientOptions { | |||||
| request_channel_capacity: 100, | |||||
| response_channel_capacity: 100, | |||||
| } | |||||
| } | |||||
| } | |||||
| pub struct Client { | |||||
| frame_stream: FrameStream<ServerResponse, ServerRequest>, | |||||
| request_receiver: mpsc::Receiver<ServerRequest>, | |||||
| response_sender: Option<mpsc::Sender<ServerResponse>>, | |||||
| } | |||||
| pub struct NewClient { | |||||
| pub client: Client, | |||||
| pub request_sender: mpsc::Sender<ServerRequest>, | |||||
| pub response_receiver: mpsc::Receiver<ServerResponse>, | |||||
| } | |||||
| #[derive(Debug, Error)] | |||||
| pub enum ClientRunError { | |||||
| #[error("client has run to completion already")] | |||||
| CompletedError, | |||||
| #[error("response channel send error: {0}")] | |||||
| SendResponseError(#[from] mpsc::error::SendError<ServerResponse>), | |||||
| #[error("i/o error: {0}")] | |||||
| IOError(#[from] io::Error), | |||||
| } | |||||
| async fn login( | |||||
| frame_stream: &mut FrameStream<ServerResponse, ServerRequest>, | |||||
| ) -> io::Result<()> { | |||||
| let request = ServerRequest::LoginRequest( | |||||
| LoginRequest::new("alice", "sekrit", 181, 100).unwrap(), | |||||
| ); | |||||
| frame_stream.write(&request).await?; | |||||
| let response = frame_stream.read().await?; | |||||
| match response { | |||||
| ServerResponse::LoginResponse(LoginResponse::LoginOk { | |||||
| motd, | |||||
| ip, | |||||
| password_md5_opt, | |||||
| }) => { | |||||
| println!("Logged in successfully!"); | |||||
| println!("Message Of The Day: {}", motd); | |||||
| println!("Public IP address: {}", ip); | |||||
| println!("Password MD5: {:?}", password_md5_opt); | |||||
| } | |||||
| ServerResponse::LoginResponse(LoginResponse::LoginFail { reason }) => { | |||||
| return Err(io::Error::new( | |||||
| io::ErrorKind::InvalidData, | |||||
| format!("Login failed: {}", reason), | |||||
| )); | |||||
| } | |||||
| _ => { | |||||
| return Err(io::Error::new( | |||||
| io::ErrorKind::InvalidData, | |||||
| format!("expected login response, got: {:?}", response), | |||||
| )); | |||||
| } | |||||
| }; | |||||
| Ok(()) | |||||
| } | |||||
| impl Client { | |||||
| async fn connect( | |||||
| address: std::net::SocketAddr, | |||||
| options: ClientOptions, | |||||
| ) -> io::Result<NewClient> { | |||||
| let tcp_stream = net::TcpStream::connect(address).await?; | |||||
| let mut frame_stream = FrameStream::new(tcp_stream); | |||||
| login(&mut frame_stream).await?; | |||||
| let (request_sender, request_receiver) = | |||||
| mpsc::channel(options.request_channel_capacity); | |||||
| let (response_sender, response_receiver) = | |||||
| mpsc::channel(options.response_channel_capacity); | |||||
| let client = Client { | |||||
| frame_stream, | |||||
| request_receiver, | |||||
| response_sender: Some(response_sender), | |||||
| }; | |||||
| Ok(NewClient { | |||||
| client, | |||||
| request_sender, | |||||
| response_receiver, | |||||
| }) | |||||
| } | |||||
| /// Runs this client's event loop once. | |||||
| /// | |||||
| /// Returns `Ok(true)` if `run()` should continue. | |||||
| /// Returns `Ok(false)` if `run()` should return successfully. | |||||
| /// Returns an error if `run()` should return that error. | |||||
| async fn run_once(&mut self) -> Result<bool, ClientRunError> { | |||||
| let response_sender = match self.response_sender { | |||||
| Some(ref response_sender) => response_sender, | |||||
| None => return Err(ClientRunError::CompletedError), | |||||
| }; | |||||
| let mut done = false; | |||||
| tokio::select!( | |||||
| maybe_request = self.request_receiver.recv() => { | |||||
| if let Some(request) = maybe_request { | |||||
| self.frame_stream.write(&request).await?; | |||||
| } else { | |||||
| // Sender has been dropped. | |||||
| done = true | |||||
| } | |||||
| } | |||||
| read_result = self.frame_stream.read() => { | |||||
| let response = read_result?; | |||||
| response_sender.send(response).await?; | |||||
| } | |||||
| ); | |||||
| Ok(!done) | |||||
| } | |||||
| /// Runs this client - sending requests and receiving responses. | |||||
| pub async fn run(&mut self) -> Result<(), ClientRunError> { | |||||
| while self.run_once().await? {} | |||||
| self.response_sender = None; | |||||
| Ok(()) | |||||
| } | |||||
| } | |||||
| #[cfg(test)] | |||||
| mod tests { | |||||
| use crate::proto::server::testing::FakeServer; | |||||
| use crate::proto::server::*; | |||||
| use super::{Client, ClientOptions}; | |||||
| #[tokio::test] | |||||
| async fn login() { | |||||
| // TODO: Check that server does not crash. | |||||
| let mut server = FakeServer::new().await.unwrap(); | |||||
| let address = server.address().unwrap(); | |||||
| let _server_task = tokio::spawn(async move { server.run().await.unwrap() }); | |||||
| let new_client = Client::connect(address, ClientOptions::default()) | |||||
| .await | |||||
| .unwrap(); | |||||
| let mut client = new_client.client; | |||||
| let mut receiver = new_client.response_receiver; | |||||
| let client_task = tokio::spawn(async move { client.run().await.unwrap() }); | |||||
| let receive_task = tokio::spawn(async move { | |||||
| loop { | |||||
| match receiver.recv().await { | |||||
| None => break, | |||||
| Some(response) => { | |||||
| dbg!(response); | |||||
| } | |||||
| } | |||||
| } | |||||
| }); | |||||
| { | |||||
| let sender = new_client.request_sender; | |||||
| let request = ServerRequest::UserStatusRequest(UserStatusRequest { | |||||
| user_name: "alice".to_string(), | |||||
| }); | |||||
| sender.send(request).await.unwrap(); | |||||
| } | |||||
| client_task.await.unwrap(); | |||||
| receive_task.await.unwrap(); | |||||
| } | |||||
| #[tokio::test] | |||||
| async fn sender_is_cloneable() { | |||||
| let mut server = FakeServer::new().await.unwrap(); | |||||
| let address = server.address().unwrap(); | |||||
| tokio::spawn(async move { server.run().await.unwrap() }); | |||||
| let new_client = Client::connect(address, ClientOptions::default()) | |||||
| .await | |||||
| .unwrap(); | |||||
| let mut client = new_client.client; | |||||
| tokio::spawn(async move { client.run().await.unwrap() }); | |||||
| let sender = new_client.request_sender; | |||||
| let sender2 = sender.clone(); | |||||
| tokio::spawn(async move { | |||||
| let request = ServerRequest::UserStatusRequest(UserStatusRequest { | |||||
| user_name: "alice".to_string(), | |||||
| }); | |||||
| sender2.send(request).await.unwrap(); | |||||
| }); | |||||
| let request = ServerRequest::UserStatusRequest(UserStatusRequest { | |||||
| user_name: "bob".to_string(), | |||||
| }); | |||||
| sender.send(request).await.unwrap(); | |||||
| } | |||||
| } | |||||