Solstice client.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

305 lines
8.1 KiB

//! A client interface for remote servers.
use std::io;
use futures::stream::{Stream, StreamExt};
use log::{debug, info};
use thiserror::Error;
use tokio::net;
use crate::proto::core::frame::FrameStream;
use crate::proto::server::{
Credentials, LoginResponse, ServerRequest, ServerResponse, Version,
};
/// Specifies options for a new `Client`.
pub struct ClientOptions {
pub credentials: Credentials,
pub version: Version,
}
/// A client for the client-server protocol.
pub struct Client {
frame_stream: FrameStream<ServerResponse, ServerRequest>,
}
/// An error that arose while logging in to a remote server.
#[derive(Debug, Error)]
pub enum ClientLoginError {
#[error("login failed: {0}")]
LoginFailed(String),
#[error("unexpected response: {0:?}")]
UnexpectedResponse(ServerResponse),
#[error("unexpected end of file")]
UnexpectedEof,
#[error("i/o error: {0}")]
IOError(#[from] io::Error),
}
/// An error that arose while running the client.
#[derive(Debug, Error)]
pub enum ClientRunError {
#[error("underlying stream was closed unexpectedly")]
StreamClosed,
#[error("i/o error: {0}")]
IOError(#[from] io::Error),
}
impl ClientRunError {
fn is_stream_closed(&self) -> bool {
match self {
ClientRunError::StreamClosed => true,
_ => false,
}
}
}
enum RunOnceResult {
Break,
Continue,
Response(ServerResponse),
}
impl Client {
async fn login(
tcp_stream: net::TcpStream,
options: ClientOptions,
) -> Result<Client, ClientLoginError> {
let mut client = Client {
frame_stream: FrameStream::new(tcp_stream),
};
client.handshake(options).await?;
Ok(client)
}
// Performs the login exchange.
// Called this way because `login` is already taken.
async fn handshake(
&mut self,
options: ClientOptions,
) -> Result<(), ClientLoginError> {
// TODO: Use into() once ServerRequest implements From<LoginRequest>.
let login_request = options.credentials.into_login_request(options.version);
debug!("Client: sending login request: {:?}", login_request);
let request = ServerRequest::LoginRequest(login_request);
self.frame_stream.write(&request).await?;
let response = self.frame_stream.read().await?;
debug!("Client: received first response: {:?}", response);
match response {
Some(ServerResponse::LoginResponse(LoginResponse::LoginOk {
motd,
ip,
password_md5_opt,
})) => {
info!("Client: Logged in successfully!");
info!("Client: Message Of The Day: {}", motd);
info!("Client: Public IP address: {}", ip);
info!("Client: Password MD5: {:?}", password_md5_opt);
Ok(())
}
Some(ServerResponse::LoginResponse(LoginResponse::LoginFail {
reason,
})) => Err(ClientLoginError::LoginFailed(reason)),
Some(response) => Err(ClientLoginError::UnexpectedResponse(response)),
None => Err(ClientLoginError::UnexpectedEof),
}
}
async fn run_once<S>(
&mut self,
request_stream: &mut S,
) -> Result<RunOnceResult, ClientRunError>
where
S: Stream<Item = ServerRequest> + Unpin,
{
tokio::select!(
maybe_request = request_stream.next() => {
if let Some(request) = maybe_request {
debug!("Client: sending request: {:?}", request);
self.frame_stream.write(&request).await?;
Ok(RunOnceResult::Continue)
} else {
// Sender has been dropped.
Ok(RunOnceResult::Break)
}
},
read_result = self.frame_stream.read() => {
match read_result? {
Some(response) => {
debug!("Client: received response: {:?}", response);
Ok(RunOnceResult::Response(response))
}
None => Err(ClientRunError::StreamClosed),
}
},
)
}
fn run<S>(
mut self,
mut request_stream: S,
) -> impl Stream<Item = Result<ServerResponse, ClientRunError>> + Unpin
where
S: Stream<Item = ServerRequest> + Unpin,
{
Box::pin(async_stream::try_stream! {
// Drive the main loop: send requests and receive responses.
loop {
match self.run_once(&mut request_stream).await? {
RunOnceResult::Break => break,
RunOnceResult::Continue => continue,
RunOnceResult::Response(response) => yield response,
}
}
debug!("Client: shutting down outbound stream");
self.frame_stream.shutdown().await?;
// Drain the receiving end of the connection.
while let Some(response) = self.frame_stream.read().await? {
debug!("Client: received response: {:?}", response);
yield response;
}
})
}
}
#[cfg(test)]
mod tests {
use futures::stream::{empty, StreamExt};
use tokio::net;
use tokio::sync::mpsc;
use crate::proto::server::testing::{ServerBuilder, UserStatusMap};
use crate::proto::server::{
Credentials, ServerRequest, ServerResponse, UserStatusRequest,
UserStatusResponse,
};
use crate::proto::UserStatus;
use super::{Client, ClientOptions, Version};
// Enable capturing logs in tests.
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}
// Returns default ClientOptions suitable for testing.
fn client_options() -> ClientOptions {
let user_name = "alice".to_string();
let password = "sekrit".to_string();
let credentials = Credentials::new(user_name, password).unwrap();
ClientOptions {
credentials,
version: Version::default(),
}
}
#[tokio::test]
async fn login() {
init();
let (server, handle) = ServerBuilder::default().bind().await.unwrap();
let server_task = tokio::spawn(server.serve());
let stream = net::TcpStream::connect(handle.address()).await.unwrap();
let client = Client::login(stream, client_options()).await.unwrap();
// Send nothing, receive no responses.
let mut inbound = client.run(empty());
assert!(inbound.next().await.is_none());
handle.shutdown();
server_task.await.unwrap().unwrap();
}
#[tokio::test]
async fn simple_exchange() {
init();
let response = UserStatusResponse {
user_name: "alice".to_string(),
status: UserStatus::Online,
is_privileged: false,
};
let mut user_status_map = UserStatusMap::default();
user_status_map.insert(response.clone());
let (server, handle) = ServerBuilder::default()
.with_user_status_map(user_status_map)
.bind()
.await
.unwrap();
let server_task = tokio::spawn(server.serve());
let stream = net::TcpStream::connect(handle.address()).await.unwrap();
let client = Client::login(stream, client_options()).await.unwrap();
let outbound = Box::pin(async_stream::stream! {
yield ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "bob".to_string(),
});
yield ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "alice".to_string(),
});
});
let mut inbound = client.run(outbound);
assert_eq!(
inbound.next().await.unwrap().unwrap(),
ServerResponse::UserStatusResponse(response)
);
assert!(inbound.next().await.is_none());
handle.shutdown();
server_task.await.unwrap().unwrap();
}
#[tokio::test]
async fn stream_closed() {
init();
let (server, handle) = ServerBuilder::default().bind().await.unwrap();
let server_task = tokio::spawn(server.serve());
let stream = net::TcpStream::connect(handle.address()).await.unwrap();
let client = Client::login(stream, client_options()).await.unwrap();
let (_request_tx, mut request_rx) = mpsc::channel(1);
let outbound = Box::pin(async_stream::stream! {
while let Some(request) = request_rx.recv().await {
yield request;
}
});
let mut inbound = client.run(outbound);
// Server shuts down, closing its connection before the client has had a
// chance to send all of `outbound`.
handle.shutdown();
// Check that the client returns the correct error.
assert!(inbound
.next()
.await
.unwrap()
.unwrap_err()
.is_stream_closed());
server_task.await.unwrap().unwrap();
}
}