Browse Source

Fix buffering issue when logging in.

Previously, a race condition meant some incoming messages could
have been lost during the handshake if they arrived fast enough.
wip
Titouan Rigoudy 4 years ago
parent
commit
e77b260e61
2 changed files with 51 additions and 35 deletions
  1. +20
    -0
      proto/src/core/worker.rs
  2. +31
    -35
      proto/src/server/client.rs

+ 20
- 0
proto/src/core/worker.rs View File

@ -67,6 +67,7 @@ where
ReadFrame: ValueDecode + Debug,
WriteFrame: ValueEncode + Debug,
{
/// Wraps the given `stream`.
pub fn new(stream: TcpStream) -> Self {
let (read_half, write_half) = stream.into_split();
let reader = FrameReader::new(read_half);
@ -74,6 +75,24 @@ where
Self { reader, writer }
}
/// Assembles a worker from a `reader` and a `writer`.
///
/// Useful when some data has already been exchanged on a TCP stream, and
/// possibly buffered by either half.
pub fn from_parts(
reader: FrameReader<ReadFrame, OwnedReadHalf>,
writer: FrameWriter<WriteFrame, OwnedWriteHalf>,
) -> Self {
Self { reader, writer }
}
/// Runs this worker until `outgoing_rx` is closed.
///
/// Forwards incoming frames from the underlying stream into `incoming_tx`.
/// Forwards outgoing frames from `outgoing_rx` onto the underlying stream.
///
/// Returns an error if reading or writing frames to and from the underlying
/// stream fails, or if `incoming_tx` is closed.
pub async fn run(
&mut self,
incoming_tx: mpsc::Sender<ReadFrame>,
@ -87,6 +106,7 @@ where
Ok(())
}
/// Returns the underlying stream. Discards any buffered data.
pub fn into_inner(self) -> TcpStream {
let read_half = self.reader.into_inner();
let write_half = self.writer.into_inner();


+ 31
- 35
proto/src/server/client.rs View File

@ -1,22 +1,22 @@
//! A client interface for remote servers.
use std::io;
use log::{debug, info};
use thiserror::Error;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use crate::core::{Worker, WorkerError};
use crate::core::{FrameReader, FrameWriter, Worker};
use crate::server::{
Credentials, LoginResponse, ServerRequest, ServerResponse, Version,
};
/// A `Worker` that sends `ServerRequest`s and receives `ServerResponse`s.
pub type ClientWorker = Worker<ServerResponse, ServerRequest>;
/// A client for the client-server protocol.
#[derive(Debug)]
pub struct Client {
stream: TcpStream,
reader: FrameReader<ServerResponse, OwnedReadHalf>,
writer: FrameWriter<ServerRequest, OwnedWriteHalf>,
version: Version,
}
@ -24,26 +24,31 @@ pub struct Client {
#[derive(Debug, Error)]
pub enum ClientLoginError {
#[error("login failed: {0}")]
LoginFailed(String, ClientWorker),
LoginFailed(String, Client),
#[error("unexpected response: {0:?}")]
UnexpectedResponse(ServerResponse),
#[error("send error: {0}")]
SendError(#[from] mpsc::error::SendError<ServerRequest>),
#[error("read error: {0}")]
ReadError(#[source] io::Error),
#[error("worker error: {0}")]
WorkerError(#[from] WorkerError),
#[error("write error: {0}")]
WriteError(#[source] io::Error),
#[error("stream closed unexpectedly")]
StreamClosed,
}
/// A `Worker` that sends `ServerRequest`s and receives `ServerResponse`s.
pub type ClientWorker = Worker<ServerResponse, ServerRequest>;
impl Client {
/// Instantiates a new client
pub fn new(stream: TcpStream) -> Self {
let (read_half, write_half) = stream.into_split();
Client {
stream,
reader: FrameReader::new(read_half),
writer: FrameWriter::new(write_half),
version: Version::default(),
}
}
@ -56,32 +61,23 @@ impl Client {
/// Performs the login exchange, presenting `credentials` to the server.
pub async fn login(
self,
mut self,
credentials: Credentials,
) -> Result<ClientWorker, ClientLoginError> {
let mut worker = ClientWorker::new(self.stream);
let (request_tx, request_rx) = mpsc::channel(1);
let (response_tx, mut response_rx) = mpsc::channel(1);
let worker_task = tokio::spawn(async move {
worker
.run(response_tx, request_rx)
.await
.map(move |()| worker)
});
let login_request = credentials.into_login_request(self.version);
debug!("Sending login request: {:?}", login_request);
self
.writer
.write(&login_request.into())
.await
.map_err(ClientLoginError::WriteError)?;
request_tx.send(login_request.into()).await?;
let optional_response = response_rx.recv().await;
// Join the worker even if we received `None`, in case it failed.
// Panic in case of join error, as if we had run the worker itself.
drop(request_tx);
let worker = worker_task.await.expect("joining worker")?;
let response = match optional_response {
let response = match self
.reader
.read()
.await
.map_err(ClientLoginError::ReadError)?
{
None => return Err(ClientLoginError::StreamClosed),
Some(response) => response,
};
@ -98,10 +94,10 @@ impl Client {
info!("Login: Public IP address: {}", ip);
info!("Login: Password MD5: {:?}", password_md5_opt);
Ok(worker)
Ok(Worker::from_parts(self.reader, self.writer))
}
ServerResponse::LoginResponse(LoginResponse::LoginFail { reason }) => {
Err(ClientLoginError::LoginFailed(reason, worker))
Err(ClientLoginError::LoginFailed(reason, self))
}
response => Err(ClientLoginError::UnexpectedResponse(response)),
}


Loading…
Cancel
Save