//! Defines a worker model for peer connections. use std::io; use std::net::SocketAddr; use thiserror::Error; use tokio::net::{TcpListener, TcpStream}; use crate::core::{FrameReader, FrameWriter, Worker}; use crate::peer::{Message, PeerConnectionType, PeerInit}; // Peer states: // // - closed // - open // - waiting for pierce firewall // - cannot connect // // Transitions: // // - closed: // -> open: // connect // send peerinit // -> waiting for pierce firewall: // connect failed, or send peerinit failed // send connecttopeer to server // -> open: // accept connection // receive peerinit // - open: // -> closed: // connection closed // - waiting for pierce firewall: // -> open: // accept connection // receive pierce firewall // ???send peer init??? // -> cannot connect: // receive cannot connect // /// A peer to connect to. #[derive(Debug)] pub struct PeerConnection { /// The address of the peer. pub address: SocketAddr, /// The type of connection to establish. pub connection_type: PeerConnectionType, /// The user name as which to identify ourselves to the peer. pub our_user_name: String, } /// An error that arose while establishing a connection to a peer. #[derive(Debug, Error)] pub enum PeerConnectionError { #[error("error establishing network connection: {0}")] ConnectError(#[source] io::Error), #[error("error sending initial message: {0}")] WriteError(#[source] io::Error), } /// A `Worker` that handles an open connection to a peer. type PeerWorker = Worker; impl PeerConnection { pub async fn connect(self) -> Result { let stream = TcpStream::connect(self.address) .await .map_err(PeerConnectionError::ConnectError)?; let (read_half, write_half) = stream.into_split(); let reader = FrameReader::new(read_half); let mut writer = FrameWriter::new(write_half); writer .write(&Message::PeerInit(PeerInit { user_name: self.our_user_name, connection_type: self.connection_type, token: 0, })) .await .map_err(PeerConnectionError::WriteError)?; Ok(Worker::from_parts(reader, writer)) } } #[derive(Debug)] pub struct IncomingPeer { pub user_name: String, pub address: SocketAddr, pub connection_type: PeerConnectionType, pub token: u32, pub worker: PeerWorker, } #[derive(Debug)] pub struct IncomingConnection { address: SocketAddr, stream: TcpStream, } #[derive(Debug, Error)] pub enum IncomingHandshakeError { #[error("error reading message: {0}")] ReadError(#[source] io::Error), #[error("stream closed unexpectedly")] StreamClosed, #[error("unexpected message: {0:?}")] UnexpectedMessage(Message), } impl IncomingConnection { pub async fn handshake(self) -> Result { let (read_half, write_half) = self.stream.into_split(); let mut reader = FrameReader::new(read_half); let writer = FrameWriter::new(write_half); let optional_message = reader .read() .await .map_err(IncomingHandshakeError::ReadError)?; match optional_message { Some(Message::PeerInit(peer_init)) => Ok(IncomingPeer { user_name: peer_init.user_name, address: self.address, connection_type: peer_init.connection_type, token: peer_init.token, worker: Worker::from_parts(reader, writer), }), Some(message) => Err(IncomingHandshakeError::UnexpectedMessage(message)), None => Err(IncomingHandshakeError::StreamClosed), } } } struct PeerListener { inner: TcpListener, } impl PeerListener { pub fn new(listener: TcpListener) -> Self { Self { inner: listener } } pub async fn accept(&mut self) -> io::Result { let (stream, address) = self.inner.accept().await?; Ok(IncomingConnection { stream, address }) } } #[cfg(test)] mod tests { use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use tokio::net::{TcpListener, TcpStream}; use crate::core::{FrameReader, FrameWriter}; use crate::peer::{Message, PeerConnectionType, PeerInit}; use super::{ IncomingHandshakeError, PeerConnection, PeerConnectionError, PeerListener, }; #[tokio::test] async fn peer_connection_connect_error() { let connection = PeerConnection { address: SocketAddr::V4(SocketAddrV4::new( // TODO: use example IP instead, ensuring this fails. Ipv4Addr::new(0, 0, 0, 1), 42, )), connection_type: PeerConnectionType::File, our_user_name: "me".to_string(), }; let err = connection.connect().await.unwrap_err(); match err { PeerConnectionError::ConnectError(_) => (), _ => panic!("Wrong error: {:?}", err), } } #[tokio::test] async fn peer_connection_success() { let listener = TcpListener::bind("localhost:0") .await .expect("binding listener"); let address = listener.local_addr().expect("getting local address"); let connection = PeerConnection { address, connection_type: PeerConnectionType::File, our_user_name: "me".to_string(), }; let (connect_result, accept_result) = tokio::join!(connection.connect(), listener.accept()); let _worker = connect_result.expect("connecting"); let (stream, _remote_address) = accept_result.expect("accepting"); let message = FrameReader::new(stream).read().await.expect("reading"); assert_eq!( message, Some(Message::PeerInit(PeerInit { user_name: "me".to_string(), connection_type: PeerConnectionType::File, token: 0, })) ); } #[tokio::test] async fn peer_accept_stream_closed() { let tcp_listener = TcpListener::bind("localhost:0").await.expect("binding"); let listener_address = tcp_listener.local_addr().expect("getting local address"); let mut peer_listener = PeerListener::new(tcp_listener); let connect_task = async { // Open connection and immediately drop/close it. TcpStream::connect(listener_address) .await .expect("connecting"); }; let ((), accept_result) = tokio::join!(connect_task, peer_listener.accept()); let incoming_connection = accept_result.expect("accepting"); let err = incoming_connection .handshake() .await .expect_err("performing handshake"); match err { IncomingHandshakeError::StreamClosed => (), _ => panic!("Wrong error: {:?}", err), } } #[tokio::test] async fn peer_accept_unexpected_message() { let tcp_listener = TcpListener::bind("localhost:0").await.expect("binding"); let listener_address = tcp_listener.local_addr().expect("getting local address"); let mut peer_listener = PeerListener::new(tcp_listener); let connect_task = async { let stream = TcpStream::connect(listener_address) .await .expect("connecting"); let mut writer = FrameWriter::new(stream); writer.write("garbage").await.expect("writing"); // Return writer so that the connection is kept alive until we notice the // error on the receiving side. writer }; let (_writer, accept_result) = tokio::join!(connect_task, peer_listener.accept()); let incoming_connection = accept_result.expect("accepting"); let err = incoming_connection .handshake() .await .expect_err("performing handshake"); match err { IncomingHandshakeError::ReadError(_) => (), _ => panic!("Wrong error: {:?}", err), } } #[tokio::test] async fn peer_accept_success() { let tcp_listener = TcpListener::bind("localhost:0").await.expect("binding"); let listener_address = tcp_listener.local_addr().expect("getting local address"); let mut peer_listener = PeerListener::new(tcp_listener); let outgoing_connection = PeerConnection { address: listener_address, connection_type: PeerConnectionType::File, our_user_name: "olabode".to_string(), }; let accept_task = async { let incoming_connection = peer_listener.accept().await.expect("accepting"); incoming_connection .handshake() .await .expect("performing handshake") }; let (connect_result, incoming_peer) = tokio::join!(outgoing_connection.connect(), accept_task); let _worker = connect_result.expect("connecting"); assert_eq!(incoming_peer.user_name, "olabode"); assert_eq!(incoming_peer.connection_type, PeerConnectionType::File); assert_eq!(incoming_peer.token, 0); } }