Browse Source

Rework Client API to something inspired by tonic bidi streaming RPCs.

wip
Titouan Rigoudy 4 years ago
parent
commit
d465c9f216
4 changed files with 756 additions and 638 deletions
  1. +613
    -480
      Cargo.lock
  2. +2
    -1
      Cargo.toml
  3. +137
    -156
      src/proto/server/client.rs
  4. +4
    -1
      src/proto/server/testing.rs

+ 613
- 480
Cargo.lock
File diff suppressed because it is too large
View File


+ 2
- 1
Cargo.toml View File

@ -5,13 +5,14 @@ authors = ["letitz"]
edition = "2018"
[dependencies]
async-stream = "^0.3"
byteorder = "^0.5.1"
bytes = "^1.0"
crossbeam-channel = "^0.3"
encoding = "^0.2"
encoding_rs = "^0.8"
env_logger = "^0.3.2"
futures = "^0.1"
futures = "^0.3"
log = "^0.3.5"
mio = "^0.6"
parking_lot = "^0.8"


+ 137
- 156
src/proto/server/client.rs View File

@ -2,230 +2,211 @@
use std::io;
use futures::stream::{Stream, StreamExt};
use thiserror::Error;
use tokio::net;
use tokio::sync::mpsc;
use crate::proto::core::frame::FrameStream;
use crate::proto::server::{
LoginRequest, LoginResponse, ServerRequest, ServerResponse,
};
struct ClientOptions {
request_channel_capacity: usize,
response_channel_capacity: usize,
/// Specifies a protocol version.
pub struct Version {
/// The major version number.
pub major: u32,
/// The minor version number.
pub minor: u32,
}
impl Default for ClientOptions {
impl Default for Version {
fn default() -> Self {
ClientOptions {
request_channel_capacity: 100,
response_channel_capacity: 100,
Self {
major: 181,
minor: 100,
}
}
}
pub struct Client {
frame_stream: FrameStream<ServerResponse, ServerRequest>,
request_receiver: mpsc::Receiver<ServerRequest>,
response_sender: Option<mpsc::Sender<ServerResponse>>,
/// Specifies options for a new `Client`.
#[derive(Default)]
struct ClientOptions {
user_name: String,
password: String,
version: Version,
}
pub struct NewClient {
pub client: Client,
pub request_sender: mpsc::Sender<ServerRequest>,
pub response_receiver: mpsc::Receiver<ServerResponse>,
/// A client for the client-server protocol.
pub struct Client {
frame_stream: FrameStream<ServerResponse, ServerRequest>,
}
/// An error that arose while logging in to a remote server.
#[derive(Debug, Error)]
pub enum ClientRunError {
#[error("client has run to completion already")]
CompletedError,
pub enum ClientLoginError {
#[error("invalid credentials: {0}")]
InvalidCredentials(String),
#[error("login failed: {0}")]
LoginFailed(String),
#[error("response channel send error: {0}")]
SendResponseError(#[from] mpsc::error::SendError<ServerResponse>),
#[error("unexpected response: {0:?}")]
UnexpectedResponse(ServerResponse),
#[error("i/o error: {0}")]
IOError(#[from] io::Error),
}
async fn login(
frame_stream: &mut FrameStream<ServerResponse, ServerRequest>,
) -> io::Result<()> {
let request = ServerRequest::LoginRequest(
LoginRequest::new("alice", "sekrit", 181, 100).unwrap(),
);
frame_stream.write(&request).await?;
let response = frame_stream.read().await?;
match response {
ServerResponse::LoginResponse(LoginResponse::LoginOk {
motd,
ip,
password_md5_opt,
}) => {
println!("Logged in successfully!");
println!("Message Of The Day: {}", motd);
println!("Public IP address: {}", ip);
println!("Password MD5: {:?}", password_md5_opt);
}
ServerResponse::LoginResponse(LoginResponse::LoginFail { reason }) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Login failed: {}", reason),
));
}
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("expected login response, got: {:?}", response),
));
}
};
/// An error that arose while running the client.
#[derive(Debug, Error)]
pub enum ClientRunError {
#[error("i/o error: {0}")]
IOError(#[from] io::Error),
}
Ok(())
enum RunOnceResult {
Break,
Continue,
Response(ServerResponse),
}
impl Client {
async fn connect(
address: std::net::SocketAddr,
async fn login(
tcp_stream: net::TcpStream,
options: ClientOptions,
) -> io::Result<NewClient> {
let tcp_stream = net::TcpStream::connect(address).await?;
let mut frame_stream = FrameStream::new(tcp_stream);
login(&mut frame_stream).await?;
let (request_sender, request_receiver) =
mpsc::channel(options.request_channel_capacity);
let (response_sender, response_receiver) =
mpsc::channel(options.response_channel_capacity);
let client = Client {
frame_stream,
request_receiver,
response_sender: Some(response_sender),
) -> Result<Client, ClientLoginError> {
let mut client = Client {
frame_stream: FrameStream::new(tcp_stream),
};
Ok(NewClient {
client,
request_sender,
response_receiver,
})
}
client.handshake(&options).await?;
/// Runs this client's event loop once.
///
/// Returns `Ok(true)` if `run()` should continue.
/// Returns `Ok(false)` if `run()` should return successfully.
/// Returns an error if `run()` should return that error.
async fn run_once(&mut self) -> Result<bool, ClientRunError> {
let response_sender = match self.response_sender {
Some(ref response_sender) => response_sender,
None => return Err(ClientRunError::CompletedError),
};
Ok(client)
}
let mut done = false;
// Performs the login exchange.
// Called this way because `public` is already taken.
async fn handshake(
&mut self,
options: &ClientOptions,
) -> Result<(), ClientLoginError> {
let login_request = LoginRequest::new(
&options.user_name,
&options.password,
options.version.major,
options.version.minor,
)
.map_err(|err| ClientLoginError::InvalidCredentials(err.to_string()))?;
let request = ServerRequest::LoginRequest(login_request);
self.frame_stream.write(&request).await?;
let response = self.frame_stream.read().await?;
match response {
ServerResponse::LoginResponse(LoginResponse::LoginOk {
motd,
ip,
password_md5_opt,
}) => {
println!("Logged in successfully!");
println!("Message Of The Day: {}", motd);
println!("Public IP address: {}", ip);
println!("Password MD5: {:?}", password_md5_opt);
Ok(())
}
ServerResponse::LoginResponse(LoginResponse::LoginFail { reason }) => {
Err(ClientLoginError::LoginFailed(reason))
}
response @ _ => Err(ClientLoginError::UnexpectedResponse(response)),
}
}
async fn run_once<S>(
&mut self,
request_stream: &mut S,
) -> Result<RunOnceResult, ClientRunError>
where
S: Stream<Item = ServerRequest> + Unpin,
{
tokio::select!(
maybe_request = self.request_receiver.recv() => {
maybe_request = request_stream.next() => {
if let Some(request) = maybe_request {
self.frame_stream.write(&request).await?;
Ok(RunOnceResult::Continue)
} else {
// Sender has been dropped.
done = true
Ok(RunOnceResult::Break)
}
}
},
read_result = self.frame_stream.read() => {
let response = read_result?;
response_sender.send(response).await?;
}
);
Ok(!done)
Ok(RunOnceResult::Response(response))
},
)
}
/// Runs this client - sending requests and receiving responses.
pub async fn run(&mut self) -> Result<(), ClientRunError> {
while self.run_once().await? {}
self.response_sender = None;
Ok(())
fn run<S>(
mut self,
mut request_stream: S,
) -> impl Stream<Item = Result<ServerResponse, ClientRunError>> + Unpin
where
S: Stream<Item = ServerRequest> + Unpin,
{
Box::pin(async_stream::try_stream! {
loop {
match self.run_once(&mut request_stream).await? {
// TODO: Close frame stream connection when we break, so that the
// other side notices we've gone away.
RunOnceResult::Break => break,
RunOnceResult::Continue => continue,
RunOnceResult::Response(response) => yield response,
}
}
})
}
}
#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
use tokio::net;
use crate::proto::server::testing::FakeServer;
use crate::proto::server::*;
use super::{Client, ClientOptions};
use super::{Client, ClientOptions, Version};
#[tokio::test]
async fn login() {
async fn client_like_grpc() {
// TODO: Check that server does not crash.
let mut server = FakeServer::new().await.unwrap();
let address = server.address().unwrap();
let _server_task = tokio::spawn(async move { server.run().await.unwrap() });
let new_client = Client::connect(address, ClientOptions::default())
.await
.unwrap();
let mut client = new_client.client;
let mut receiver = new_client.response_receiver;
let stream = net::TcpStream::connect(address).await.unwrap();
let client_task = tokio::spawn(async move { client.run().await.unwrap() });
let options = ClientOptions {
user_name: "alice".to_string(),
password: "sekrit".to_string(),
version: Version::default(),
};
let client = Client::login(stream, options).await.unwrap();
let receive_task = tokio::spawn(async move {
let outbound = Box::pin(async_stream::stream! {
loop {
match receiver.recv().await {
None => break,
Some(response) => {
dbg!(response);
}
}
yield ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "bob".to_string(),
});
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
{
let sender = new_client.request_sender;
let request = ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "alice".to_string(),
});
sender.send(request).await.unwrap();
let mut inbound = client.run(outbound);
while let Some(result) = inbound.next().await {
let _ = dbg!(result);
}
client_task.await.unwrap();
receive_task.await.unwrap();
}
#[tokio::test]
async fn sender_is_cloneable() {
let mut server = FakeServer::new().await.unwrap();
let address = server.address().unwrap();
tokio::spawn(async move { server.run().await.unwrap() });
let new_client = Client::connect(address, ClientOptions::default())
.await
.unwrap();
let mut client = new_client.client;
tokio::spawn(async move { client.run().await.unwrap() });
let sender = new_client.request_sender;
let sender2 = sender.clone();
tokio::spawn(async move {
let request = ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "alice".to_string(),
});
sender2.send(request).await.unwrap();
});
let request = ServerRequest::UserStatusRequest(UserStatusRequest {
user_name: "bob".to_string(),
});
sender.send(request).await.unwrap();
// TODO: Way to run FakeServer and stop it at will?
}
}

+ 4
- 1
src/proto/server/testing.rs View File

@ -46,7 +46,10 @@ async fn process(
});
frame_stream.write(&response).await?;
Ok(())
loop {
let request = frame_stream.read().await?;
println!("FakeServer: received request: {:?}", request);
}
}
/// A fake server for connecting to in tests.


Loading…
Cancel
Save