diff --git a/proto/src/core/worker.rs b/proto/src/core/worker.rs index 27cb8e4..0e01ab0 100644 --- a/proto/src/core/worker.rs +++ b/proto/src/core/worker.rs @@ -67,6 +67,7 @@ where ReadFrame: ValueDecode + Debug, WriteFrame: ValueEncode + Debug, { + /// Wraps the given `stream`. pub fn new(stream: TcpStream) -> Self { let (read_half, write_half) = stream.into_split(); let reader = FrameReader::new(read_half); @@ -74,6 +75,24 @@ where Self { reader, writer } } + /// Assembles a worker from a `reader` and a `writer`. + /// + /// Useful when some data has already been exchanged on a TCP stream, and + /// possibly buffered by either half. + pub fn from_parts( + reader: FrameReader, + writer: FrameWriter, + ) -> Self { + Self { reader, writer } + } + + /// Runs this worker until `outgoing_rx` is closed. + /// + /// Forwards incoming frames from the underlying stream into `incoming_tx`. + /// Forwards outgoing frames from `outgoing_rx` onto the underlying stream. + /// + /// Returns an error if reading or writing frames to and from the underlying + /// stream fails, or if `incoming_tx` is closed. pub async fn run( &mut self, incoming_tx: mpsc::Sender, @@ -87,6 +106,7 @@ where Ok(()) } + /// Returns the underlying stream. Discards any buffered data. pub fn into_inner(self) -> TcpStream { let read_half = self.reader.into_inner(); let write_half = self.writer.into_inner(); diff --git a/proto/src/server/client.rs b/proto/src/server/client.rs index 86dcded..a372392 100644 --- a/proto/src/server/client.rs +++ b/proto/src/server/client.rs @@ -1,22 +1,22 @@ //! A client interface for remote servers. +use std::io; + use log::{debug, info}; use thiserror::Error; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; -use tokio::sync::mpsc; -use crate::core::{Worker, WorkerError}; +use crate::core::{FrameReader, FrameWriter, Worker}; use crate::server::{ Credentials, LoginResponse, ServerRequest, ServerResponse, Version, }; -/// A `Worker` that sends `ServerRequest`s and receives `ServerResponse`s. -pub type ClientWorker = Worker; - /// A client for the client-server protocol. #[derive(Debug)] pub struct Client { - stream: TcpStream, + reader: FrameReader, + writer: FrameWriter, version: Version, } @@ -24,26 +24,31 @@ pub struct Client { #[derive(Debug, Error)] pub enum ClientLoginError { #[error("login failed: {0}")] - LoginFailed(String, ClientWorker), + LoginFailed(String, Client), #[error("unexpected response: {0:?}")] UnexpectedResponse(ServerResponse), - #[error("send error: {0}")] - SendError(#[from] mpsc::error::SendError), + #[error("read error: {0}")] + ReadError(#[source] io::Error), - #[error("worker error: {0}")] - WorkerError(#[from] WorkerError), + #[error("write error: {0}")] + WriteError(#[source] io::Error), #[error("stream closed unexpectedly")] StreamClosed, } +/// A `Worker` that sends `ServerRequest`s and receives `ServerResponse`s. +pub type ClientWorker = Worker; + impl Client { /// Instantiates a new client pub fn new(stream: TcpStream) -> Self { + let (read_half, write_half) = stream.into_split(); Client { - stream, + reader: FrameReader::new(read_half), + writer: FrameWriter::new(write_half), version: Version::default(), } } @@ -56,32 +61,23 @@ impl Client { /// Performs the login exchange, presenting `credentials` to the server. pub async fn login( - self, + mut self, credentials: Credentials, ) -> Result { - let mut worker = ClientWorker::new(self.stream); - let (request_tx, request_rx) = mpsc::channel(1); - let (response_tx, mut response_rx) = mpsc::channel(1); - - let worker_task = tokio::spawn(async move { - worker - .run(response_tx, request_rx) - .await - .map(move |()| worker) - }); - let login_request = credentials.into_login_request(self.version); debug!("Sending login request: {:?}", login_request); + self + .writer + .write(&login_request.into()) + .await + .map_err(ClientLoginError::WriteError)?; - request_tx.send(login_request.into()).await?; - let optional_response = response_rx.recv().await; - - // Join the worker even if we received `None`, in case it failed. - // Panic in case of join error, as if we had run the worker itself. - drop(request_tx); - let worker = worker_task.await.expect("joining worker")?; - - let response = match optional_response { + let response = match self + .reader + .read() + .await + .map_err(ClientLoginError::ReadError)? + { None => return Err(ClientLoginError::StreamClosed), Some(response) => response, }; @@ -98,10 +94,10 @@ impl Client { info!("Login: Public IP address: {}", ip); info!("Login: Password MD5: {:?}", password_md5_opt); - Ok(worker) + Ok(Worker::from_parts(self.reader, self.writer)) } ServerResponse::LoginResponse(LoginResponse::LoginFail { reason }) => { - Err(ClientLoginError::LoginFailed(reason, worker)) + Err(ClientLoginError::LoginFailed(reason, self)) } response => Err(ClientLoginError::UnexpectedResponse(response)), }