From 772e03b2e753fc823616e7a9286a7b10dda62fe6 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Wed, 11 May 2016 18:17:10 +0200 Subject: [PATCH] Heavily refactor peer code to detect failed connections. --- src/client.rs | 168 +++++++++++++++++++---------------- src/proto/handler.rs | 207 ++++++++++++++++++++----------------------- src/proto/stream.rs | 79 ++++++++++++----- 3 files changed, 246 insertions(+), 208 deletions(-) diff --git a/src/client.rs b/src/client.rs index da51100..90d6594 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,3 @@ -use std::collections::hash_map; use std::net; use std::sync::mpsc; @@ -13,6 +12,27 @@ use proto::server; use room; use user; +/*===============* + * TOKEN COUNTER * + *===============*/ + +struct TokenCounter { + next_token: u32 +} + +impl TokenCounter { + fn new() -> Self { + TokenCounter { next_token: 0 } + } + + fn next(&mut self) -> u32 { + let token = self.next_token; + self.next_token += 1; + token + } +} + + #[derive(Debug)] enum IncomingMessage { Proto(proto::Response), @@ -26,16 +46,20 @@ enum LoginStatus { Failure(String), } -#[derive(Clone, Copy, Debug)] -enum ConnectingPeer { - Direct, - Firewalled(u32) +#[derive(Debug)] +enum PeerState { + Opening, + OpeningFirewalled, + Open } #[derive(Debug)] struct Peer { - ip: net::Ipv4Addr, + ip: net::Ipv4Addr, port: u16, + connection_type: String, + token: u32, + state: PeerState, } pub struct Client { @@ -50,9 +74,8 @@ pub struct Client { rooms: room::RoomMap, users: user::UserMap, - connecting_peers: hash_map::HashMap<(net::Ipv4Addr, u16), ConnectingPeer>, - peers: slab::Slab, - next_peer_token: u32, + peers: slab::Slab, + token_counter: TokenCounter, } impl Client { @@ -77,9 +100,8 @@ impl Client { rooms: room::RoomMap::new(), users: user::UserMap::new(), - connecting_peers: hash_map::HashMap::new(), - peers: slab::Slab::new(config::MAX_PEERS), - next_peer_token: 0, + peers: slab::Slab::new(config::MAX_PEERS), + token_counter: TokenCounter::new() } } @@ -292,8 +314,8 @@ impl Client { proto::Response::ServerResponse(server_response) => self.handle_server_response(server_response), - proto::Response::PeerConnectionOpen(ip, port, peer_id) => - self.handle_peer_connection_open(ip, port, peer_id), + proto::Response::PeerConnectionOpen(peer_id) => + self.handle_peer_connection_open(peer_id), proto::Response::PeerConnectionClosed(peer_id) => self.handle_peer_connection_closed(peer_id), @@ -306,74 +328,53 @@ impl Client { fn handle_peer_connection_closed(&mut self, peer_id: usize) { info!("Connection to peer {} has closed", peer_id); - if let None = self.peers.remove(peer_id) { - error!("Unknown peer {}", peer_id); + match self.peers.remove(peer_id) { + None => error!("Unknown peer {}", peer_id), + + Some(peer) => { + // TODO if the peer was not connected, talk to the server + } } } - fn handle_peer_connection_open( - &mut self, ip: net::Ipv4Addr, port: u16, peer_id: usize) + fn handle_peer_connection_open(&mut self, peer_id: usize) { - info!("Connected to peer {}:{} with id {}", ip, port, peer_id); - - let connecting_peer = match self.connecting_peers.remove(&(ip, port)) { - None => { - error!("Unknown peer {}:{}", ip, port); - return - }, - - Some(connecting_peer) => connecting_peer, - }; - - match self.peers.entry(peer_id) { + let message = match self.peers.get_mut(peer_id) { None => { - error!( - "Slab entry at {} for {}:{} does not exist", - peer_id, ip, port - ); + error!("Unknown peer connection {} is open", peer_id); return }, - Some(slab::Entry::Occupied(occupied_entry)) => { + Some(peer @ &mut Peer { state: PeerState::Open, .. }) => { error!( - "Slab entry at {} for {}:{} is occupied by {:?}", - peer_id, ip, port, occupied_entry.get() + "Peer connection {} was already open: {:?}", + peer_id, peer ); return }, - Some(slab::Entry::Vacant(vacant_entry)) => { - vacant_entry.insert(Peer { - ip: ip, - port: port, - }); + Some(peer @ &mut Peer { state: PeerState::Opening, .. }) => { + info!("Peer connection {} is now open: {:?}", peer_id, peer); + // Mark it as open. + peer.state = PeerState::Open; + // Send a PeerInit. + peer::Message::PeerInit(peer::PeerInit { + user_name: config::USERNAME.to_string(), + connection_type: peer.connection_type.clone(), + token: peer.token, + }) }, - } - - match connecting_peer { - ConnectingPeer::Direct => { - let token = self.next_peer_token; - self.next_peer_token += 1; - self.send_to_peer(peer_id, peer::Message::PeerInit( - peer::PeerInit { - user_name: config::USERNAME.to_string(), - connection_type: "P".to_string(), - token: token, - } - )); + Some(peer @ &mut Peer{state: PeerState::OpeningFirewalled, ..}) => { + info!("Peer connection {} is now open: {:?}", peer_id, peer); + // Mark it as open. + peer.state = PeerState::Open; + // Send a PierceFirewall. + peer::Message::PierceFirewall(peer.token) }, + }; - ConnectingPeer::Firewalled(token) => { - debug!( - "Piercing firewall for peer {} with token {}", - peer_id, token - ); - self.send_to_peer( - peer_id, peer::Message::PierceFirewall(token) - ); - } - } + self.send_to_peer(peer_id, message); } /*==========================* @@ -428,17 +429,32 @@ impl Client { fn handle_connect_to_peer_response( &mut self, response: server::ConnectToPeerResponse) { - info!( - "Connecting to peer {}:{} with token {} to pierce firewall", - response.ip, response.port, response.token - ); - self.connecting_peers.insert( - (response.ip, response.port), - ConnectingPeer::Firewalled(response.token) - ); - self.proto_tx.send(proto::Request::PeerConnect( - response.ip, response.port - )).unwrap(); + let peer = Peer { + ip: response.ip, + port: response.port, + connection_type: response.connection_type, + token: response.token, + state: PeerState::OpeningFirewalled + }; + + match self.peers.insert(peer) { + Ok(peer_id) => { + info!( + "Opening peer connection {} to {}:{} to pierce firewall", + peer_id, response.ip, response.port + ); + self.proto_tx.send(proto::Request::PeerConnect( + peer_id, response.ip, response.port + )).unwrap(); + }, + + Err(peer) => { + warn!( + "Cannot open peer connection {:?}: too many already open", + peer + ); + } + } } fn handle_login_response(&mut self, login: server::LoginResponse) { diff --git a/src/proto/handler.rs b/src/proto/handler.rs index 8aa3cdb..04f29f3 100644 --- a/src/proto/handler.rs +++ b/src/proto/handler.rs @@ -1,7 +1,5 @@ -use std::fmt; use std::io; use std::net; -use std::net::ToSocketAddrs; use std::sync::mpsc; use mio; @@ -13,11 +11,14 @@ use super::{Intent, Stream, SendPacket}; use super::server::*; use super::peer; -const SERVER_TOKEN: usize = 0; -const INIT_PEER_TOKEN: usize = 1; +/*===========* + * CONSTANTS * + *===========*/ -type ServerStream = Stream; -type PeerStream = Stream; +// There are only ever MAX_PEERS peer tokens, from 0 to MAX_PEERS - 1. +// This way we ensure no overlap and eliminate the need for coordination +// between client and handler that would otherwise be needed. +const SERVER_TOKEN: usize = config::MAX_PEERS; /*====================* * REQUEST - RESPONSE * @@ -25,7 +26,7 @@ type PeerStream = Stream; #[derive(Debug)] pub enum Request { - PeerConnect(net::Ipv4Addr, u16), + PeerConnect(usize, net::Ipv4Addr, u16), PeerMessage(usize, peer::Message), ServerRequest(ServerRequest) } @@ -33,9 +34,8 @@ pub enum Request { #[derive(Debug)] pub enum Response { PeerConnectionClosed(usize), - PeerConnectionError(net::Ipv4Addr, u16), - PeerConnectionOpen(net::Ipv4Addr, u16, usize), - PeerMessage(peer::Message), + PeerConnectionOpen(usize), + PeerMessage(usize, peer::Message), ServerResponse(ServerResponse), } @@ -52,20 +52,31 @@ impl SendPacket for ServerResponseSender { fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> { self.0.send(Response::ServerResponse(value)) } + + fn notify_open(&mut self) -> Result<(), Self::Error> { + Ok(()) + } } /*======================* * PEER RESPONSE SENDER * *======================*/ -pub struct PeerResponseSender(mpsc::Sender, usize); +pub struct PeerResponseSender { + sender: mpsc::Sender, + peer_id: usize, +} impl SendPacket for PeerResponseSender { type Value = peer::Message; type Error = mpsc::SendError; fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> { - self.0.send(Response::PeerMessage(value)) + self.sender.send(Response::PeerMessage(self.peer_id, value)) + } + + fn notify_open(&mut self) -> Result<(), Self::Error> { + self.sender.send(Response::PeerConnectionOpen(self.peer_id)) } } @@ -76,56 +87,84 @@ impl SendPacket for PeerResponseSender { /// This struct handles all the soulseek connections, to the server and to /// peers. struct Handler { - server_stream: ServerStream, + server_stream: Stream, - peer_streams: slab::Slab, + peer_streams: slab::Slab, usize>, client_tx: mpsc::Sender, } impl Handler { - fn new(client_tx: mpsc::Sender) -> io::Result { + fn new( + client_tx: mpsc::Sender, + event_loop: &mut mio::EventLoop) + -> io::Result + { let host = config::SERVER_HOST; let port = config::SERVER_PORT; - let server_stream = Stream::new( - try!(Self::connect((host, port))), + let server_stream = try!(Stream::new( + (host, port), ServerResponseSender(client_tx.clone()) - ); + )); + info!("Connected to server at {}:{}", host, port); + try!(event_loop.register( + server_stream.evented(), + mio::Token(SERVER_TOKEN), + mio::EventSet::all(), + mio::PollOpt::edge() | mio::PollOpt::oneshot() + )); + Ok(Handler { server_stream: server_stream, - peer_streams: slab::Slab::new_starting_at( - INIT_PEER_TOKEN, config::MAX_PEERS - ), + peer_streams: slab::Slab::new(config::MAX_PEERS), client_tx: client_tx, }) } - fn register(&self, event_loop: &mut mio::EventLoop) -> io::Result<()> + fn connect_to_peer( + &mut self, + peer_id: usize, + ip: net::Ipv4Addr, + port: u16, + event_loop: &mut mio::EventLoop) + -> Result<(), String> { + let vacant_entry = match self.peer_streams.entry(peer_id) { + None => return Err("id out of range".to_string()), + + Some(slab::Entry::Occupied(occupied_entry)) => + return Err("id already taken".to_string()), + + Some(slab::Entry::Vacant(vacant_entry)) => vacant_entry, + }; + + info!("Opening peer connection {} to {}:{}", peer_id, ip, port); + + let sender = PeerResponseSender { + sender: self.client_tx.clone(), + peer_id: peer_id + }; + + let peer_stream = match Stream::new((ip, port), sender) { + Ok(peer_stream) => peer_stream, + + Err(err) => return Err(format!("i/o error: {}", err)) + }; + event_loop.register( - self.server_stream.evented(), - mio::Token(SERVER_TOKEN), - mio::EventSet::readable(), + peer_stream.evented(), + mio::Token(peer_id), + mio::EventSet::all(), mio::PollOpt::edge() | mio::PollOpt::oneshot() - ) - } + ).unwrap(); - fn connect(addr_spec: T) -> io::Result - where T: ToSocketAddrs + fmt::Debug - { - for sock_addr in try!(addr_spec.to_socket_addrs()) { - if let Ok(stream) = mio::tcp::TcpStream::connect(&sock_addr) { - return Ok(stream) - } - } - Err(io::Error::new( - io::ErrorKind::Other, - format!("Cannot connect to {:?}", addr_spec) - )) + vacant_entry.insert(peer_stream); + + Ok(()) } fn process_server_intent( @@ -147,67 +186,6 @@ impl Handler { } } - fn connect_to_peer( - &mut self, - ip: net::Ipv4Addr, - port: u16, - event_loop: &mut mio::EventLoop) - { - let vacant_entry = match self.peer_streams.vacant_entry() { - Some(vacant_entry) => vacant_entry, - None => { - error!( - "Cannot connect to peer {}:{}: too many connections open", - ip, port - ); - self.client_tx.send( - Response::PeerConnectionError(ip, port) - ).unwrap(); - return - }, - }; - - info!("Connecting to peer {}:{}", ip, port); - - let mut tcp_stream = match Self::connect((ip, port)) { - Ok(tcp_stream) => tcp_stream, - Err(err) => { - error!("Cannot connect to peer {}:{}: {}", ip, port, err); - - self.client_tx.send( - Response::PeerConnectionError(ip, port) - ).unwrap(); - return - } - }; - - let peer_id = vacant_entry.index(); - - event_loop.register( - &mut tcp_stream, - mio::Token(peer_id), - mio::EventSet::readable(), - mio::PollOpt::edge() | mio::PollOpt::oneshot() - ).unwrap(); - - let peer_stream = Stream::new( - tcp_stream, PeerResponseSender(self.client_tx.clone(), peer_id) - ); - - vacant_entry.insert(peer_stream); - - // This is actually false, because the socket might still be connecting - // asynchronously. - // We will know if the connection worked or not when we get an event - // and try to read or write. - // There is nothing too wrong about telling the client it worked though, - // and closing the connection as soon as the client tries to use it, - // at which point the client will forget about the whole thing. - self.client_tx.send( - Response::PeerConnectionOpen(ip, port, peer_id) - ).unwrap(); - } - fn process_peer_intent( &mut self, intent: Intent, @@ -217,8 +195,9 @@ impl Handler { match intent { Intent::Done => { self.peer_streams.remove(token.0); - self.client_tx.send(Response::PeerConnectionClosed(token.0)) - .unwrap(); + self.client_tx.send( + Response::PeerConnectionClosed(token.0) + ).unwrap(); }, Intent::Continue(event_set) => { @@ -248,7 +227,8 @@ impl mio::Handler for Handler { } else { let intent = match self.peer_streams.get_mut(token.0) { Some(peer_stream) => peer_stream.on_ready(event_set), - None => unreachable!("Unknown token is ready"), + + None => unreachable!("Unknown token {} is ready", token.0), }; self.process_peer_intent(intent, token, event_loop); } @@ -258,8 +238,18 @@ impl mio::Handler for Handler { request: Request) { match request { - Request::PeerConnect(ip, port) => - self.connect_to_peer(ip, port, event_loop), + Request::PeerConnect(peer_id, ip, port) => + if let Err(err) = + self.connect_to_peer(peer_id, ip, port, event_loop) + { + error!( + "Cannot open peer connection {} to {}:{}: {}", + peer_id, ip, port, err + ); + self.client_tx.send( + Response::PeerConnectionClosed(peer_id) + ).unwrap(); + }, Request::PeerMessage(peer_id, message) => { let intent = match self.peer_streams.get_mut(peer_id) { @@ -296,10 +286,9 @@ impl Agent { pub fn new(client_tx: mpsc::Sender) -> io::Result { // Create the event loop. let mut event_loop = try!(mio::EventLoop::new()); - // Create the handler for the event loop. - let handler = try!(Handler::new(client_tx)); - // Register the handler's sockets with the event loop. - try!(handler.register(&mut event_loop)); + // Create the handler for the event loop and register the handler's + // sockets with the event loop. + let handler = try!(Handler::new(client_tx, &mut event_loop)); Ok(Agent { event_loop: event_loop, diff --git a/src/proto/stream.rs b/src/proto/stream.rs index c928736..a7144dc 100644 --- a/src/proto/stream.rs +++ b/src/proto/stream.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; use std::error; +use std::fmt; use std::io; +use std::net::ToSocketAddrs; use mio; @@ -59,6 +61,8 @@ pub trait SendPacket { type Error: error::Error; fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>; + + fn notify_open(&mut self) -> Result<(), Self::Error>; } /// This enum defines the possible actions the stream wants to take after @@ -72,37 +76,48 @@ pub enum Intent { Continue(mio::EventSet), } -/// This struct wraps around an mio byte stream and handles packet reads and +/// This struct wraps around an mio tcp stream and handles packet reads and /// writes. #[derive(Debug)] -pub struct Stream - where T: io::Read + io::Write + mio::Evented, - U: SendPacket +pub struct Stream { parser: Parser, queue: VecDeque, - sender: U, - stream: T, + sender: T, + stream: mio::tcp::TcpStream, + + is_connected: bool, } -impl Stream - where T: io::Read + io::Write + mio::Evented, - U: SendPacket +impl Stream { - /// Returns a new struct wrapping the provided byte stream, which will - /// forward packets to the provided sink. - pub fn new(stream: T, sender: U) -> Self { - Stream { - parser: Parser::new(), - queue: VecDeque::new(), - sender: sender, - stream: stream, + /// Returns a new stream, asynchronously connected to the given address, + /// which forwards incoming packets to the given sender. + /// If an error occurs when connecting, returns an error. + pub fn new(addr_spec: U, sender: T) -> io::Result + where U: ToSocketAddrs + fmt::Debug + { + for sock_addr in try!(addr_spec.to_socket_addrs()) { + if let Ok(stream) = mio::tcp::TcpStream::connect(&sock_addr) { + return Ok(Stream { + parser: Parser::new(), + queue: VecDeque::new(), + sender: sender, + stream: stream, + + is_connected: false, + }) + } } + Err(io::Error::new( + io::ErrorKind::Other, + format!("Cannot connect to {:?}", addr_spec) + )) } /// Returns a reference to the underlying byte stream, to allow it to be /// registered with an event loop. - pub fn evented(&self) -> &T { + pub fn evented(&self) -> &mio::tcp::TcpStream { &self.stream } @@ -158,6 +173,9 @@ impl Stream /// The stream is ready to read, write, or both. pub fn on_ready(&mut self, event_set: mio::EventSet) -> Intent { + if event_set.is_hup() || event_set.is_error() { + return Intent::Done + } if event_set.is_readable() { let result = self.on_readable(); if let Err(e) = result { @@ -173,14 +191,29 @@ impl Stream } } + // We must have read or written something succesfully if we're here, + // so the stream must be connected. + if !self.is_connected { + // If we weren't already connected, notify the sink. + if let Err(err) = self.sender.notify_open() { + error!("Cannot notify client that stream is open: {}", err); + return Intent::Done + } + // And record the fact that we are now connected. + self.is_connected = true; + } + // We're always interested in reading more. + let mut event_set = + mio::EventSet::readable() | + mio::EventSet::hup() | + mio::EventSet::error(); // If there is still stuff to write in the queue, we're interested in // the socket becoming writable too. - let event_set = if self.queue.len() > 0 { - mio::EventSet::readable() | mio::EventSet::writable() - } else { - mio::EventSet::readable() - }; + if self.queue.len() > 0 { + event_set = event_set | mio::EventSet::writable(); + } + Intent::Continue(event_set) }