From 2ad5f198504c7e7eb9efcef9eed35a59b50bf342 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Tue, 10 May 2016 17:42:09 +0200 Subject: [PATCH] Add basic communication with peers. --- src/client.rs | 120 +++++++++++++++++++++++++++++++++++- src/proto/handler.rs | 95 ++++++++++++++++++++++++---- src/proto/mod.rs | 1 + src/proto/peer/constants.rs | 2 + src/proto/peer/message.rs | 96 +++++++++++++++++++++++++++++ src/proto/peer/mod.rs | 4 ++ 6 files changed, 304 insertions(+), 14 deletions(-) create mode 100644 src/proto/peer/constants.rs create mode 100644 src/proto/peer/message.rs create mode 100644 src/proto/peer/mod.rs diff --git a/src/client.rs b/src/client.rs index ea5015e..1433bf7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,14 @@ +use std::collections::hash_map; +use std::net; use std::sync::mpsc; use mio; +use slab; use config; use control; use proto; +use proto::peer; use proto::server; use room; use user; @@ -15,13 +19,25 @@ enum IncomingMessage { ControlNotification(control::Notification), } -#[derive(Debug, Clone)] +#[derive(Clone, Debug)] enum LoginStatus { Pending, Success(String), Failure(String), } +#[derive(Clone, Copy, Debug)] +enum ConnectingPeer { + Direct, + Firewalled(u32) +} + +#[derive(Debug)] +struct Peer { + ip: net::Ipv4Addr, + port: u16, +} + pub struct Client { proto_tx: mio::Sender, proto_rx: mpsc::Receiver, @@ -33,6 +49,10 @@ pub struct Client { rooms: room::RoomMap, users: user::UserMap, + + connecting_peers: hash_map::HashMap<(net::Ipv4Addr, u16), ConnectingPeer>, + peers: slab::Slab, + next_peer_token: u32, } impl Client { @@ -56,6 +76,10 @@ impl Client { rooms: room::RoomMap::new(), users: user::UserMap::new(), + + connecting_peers: hash_map::HashMap::new(), + peers: slab::Slab::new(config::MAX_PEERS), + next_peer_token: 0, } } @@ -101,6 +125,12 @@ impl Client { self.proto_tx.send(proto::Request::ServerRequest(request)).unwrap(); } + /// Send a message to a peer. + fn send_to_peer(&self, peer_id: usize, message: peer::Message) { + self.proto_tx.send(proto::Request::PeerMessage(peer_id, message)) + .unwrap(); + } + /// Send a response to the controller client. fn send_to_controller(&mut self, response: control::Response) { let result = match self.control_tx { @@ -262,12 +292,90 @@ impl Client { proto::Response::ServerResponse(server_response) => self.handle_server_response(server_response), + proto::Response::ConnectToPeerSuccess(ip, port, peer_id) => + self.handle_connect_to_peer_success(ip, port, peer_id), + + proto::Response::PeerConnectionClosed(peer_id) => + self.handle_peer_connection_closed(peer_id), + _ => { warn!("Unhandled proto response: {:?}", response); } } } + fn handle_peer_connection_closed(&mut self, peer_id: usize) { + info!("Connection to peer {} has closed", peer_id); + if let None = self.peers.remove(peer_id) { + error!("Unknown peer {}", peer_id); + } + } + + fn handle_connect_to_peer_success( + &mut self, ip: net::Ipv4Addr, port: u16, peer_id: usize) + { + info!("Connected to peer {}:{} with id {}", ip, port, peer_id); + + let connecting_peer = match self.connecting_peers.remove(&(ip, port)) { + None => { + error!("Unknown peer {}:{}", ip, port); + return + }, + + Some(connecting_peer) => connecting_peer, + }; + + match self.peers.entry(peer_id) { + None => { + error!( + "Slab entry at {} for {}:{} does not exist", + peer_id, ip, port + ); + return + }, + + Some(slab::Entry::Occupied(occupied_entry)) => { + error!( + "Slab entry at {} for {}:{} is occupied by {:?}", + peer_id, ip, port, occupied_entry.get() + ); + return + }, + + Some(slab::Entry::Vacant(vacant_entry)) => { + vacant_entry.insert(Peer { + ip: ip, + port: port, + }); + }, + } + + match connecting_peer { + ConnectingPeer::Direct => { + let token = self.next_peer_token; + self.next_peer_token += 1; + + self.send_to_peer(peer_id, peer::Message::PeerInit( + peer::PeerInit { + user_name: config::USERNAME.to_string(), + connection_type: "P".to_string(), + token: token, + } + )); + }, + + ConnectingPeer::Firewalled(token) => { + debug!( + "Piercing firewall for peer {} with token {}", + peer_id, token + ); + self.send_to_peer( + peer_id, peer::Message::PierceFirewall(token) + ); + } + } + } + /*==========================* * SERVER RESPONSE HANDLING * *==========================*/ @@ -320,9 +428,17 @@ impl Client { fn handle_connect_to_peer_response( &mut self, response: server::ConnectToPeerResponse) { + info!( + "Connecting to peer {}:{} with token {} to pierce firewall", + response.ip, response.port, response.token + ); + self.connecting_peers.insert( + (response.ip, response.port), + ConnectingPeer::Firewalled(response.token) + ); self.proto_tx.send(proto::Request::ConnectToPeer( response.ip, response.port - )); + )).unwrap(); } fn handle_login_response(&mut self, login: server::LoginResponse) { diff --git a/src/proto/handler.rs b/src/proto/handler.rs index 980d712..1bbd68f 100644 --- a/src/proto/handler.rs +++ b/src/proto/handler.rs @@ -11,10 +11,14 @@ use config; use super::{Intent, Stream, SendPacket}; use super::server::*; +use super::peer; const SERVER_TOKEN: usize = 0; const INIT_PEER_TOKEN: usize = 1; +type ServerStream = Stream; +type PeerStream = Stream; + /*====================* * REQUEST - RESPONSE * *====================*/ @@ -22,6 +26,7 @@ const INIT_PEER_TOKEN: usize = 1; #[derive(Debug)] pub enum Request { ConnectToPeer(net::Ipv4Addr, u16), + PeerMessage(usize, peer::Message), ServerRequest(ServerRequest) } @@ -29,6 +34,8 @@ pub enum Request { pub enum Response { ConnectToPeerError(net::Ipv4Addr, u16), ConnectToPeerSuccess(net::Ipv4Addr, u16, usize), + PeerConnectionClosed(usize), + PeerMessage(peer::Message), ServerResponse(ServerResponse), } @@ -54,11 +61,11 @@ impl SendPacket for ServerResponseSender { pub struct PeerResponseSender(mpsc::Sender, usize); impl SendPacket for PeerResponseSender { - type Value = u32; + type Value = peer::Message; type Error = mpsc::SendError; fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> { - Ok(()) + self.0.send(Response::PeerMessage(value)) } } @@ -69,10 +76,9 @@ impl SendPacket for PeerResponseSender { /// This struct handles all the soulseek connections, to the server and to /// peers. struct Handler { - server_stream: Stream, + server_stream: ServerStream, - peer_streams: - slab::Slab, usize>, + peer_streams: slab::Slab, client_tx: mpsc::Sender, } @@ -141,7 +147,12 @@ impl Handler { } } - fn connect_to_peer(&mut self, ip: net::Ipv4Addr, port: u16) { + fn connect_to_peer( + &mut self, + ip: net::Ipv4Addr, + port: u16, + event_loop: &mut mio::EventLoop) + { let vacant_entry = match self.peer_streams.vacant_entry() { Some(vacant_entry) => vacant_entry, None => { @@ -158,7 +169,7 @@ impl Handler { info!("Connecting to peer {}:{}", ip, port); - let tcp_stream = match Self::connect((ip, port)) { + let mut tcp_stream = match Self::connect((ip, port)) { Ok(tcp_stream) => tcp_stream, Err(err) => { error!("Cannot connect to peer {}:{}: {}", ip, port, err); @@ -170,18 +181,58 @@ impl Handler { } }; - let token = vacant_entry.index(); + let peer_id = vacant_entry.index(); + + event_loop.register( + &mut tcp_stream, + mio::Token(peer_id), + mio::EventSet::readable(), + mio::PollOpt::edge() | mio::PollOpt::oneshot() + ).unwrap(); let peer_stream = Stream::new( - tcp_stream, PeerResponseSender(self.client_tx.clone(), token) + tcp_stream, PeerResponseSender(self.client_tx.clone(), peer_id) ); vacant_entry.insert(peer_stream); + // This is actually false, because the socket might still be connecting + // asynchronously. + // We will know if the connection worked or not when we get an event + // and try to read or write. + // There is nothing too wrong about telling the client it worked though, + // and closing the connection as soon as the client tries to use it, + // at which point the client will forget about the whole thing. self.client_tx.send( - Response::ConnectToPeerSuccess(ip, port, token) + Response::ConnectToPeerSuccess(ip, port, peer_id) ).unwrap(); } + + fn process_peer_intent( + &mut self, + intent: Intent, + token: mio::Token, + event_loop: &mut mio::EventLoop) + { + match intent { + Intent::Done => { + self.peer_streams.remove(token.0); + self.client_tx.send(Response::PeerConnectionClosed(token.0)) + .unwrap(); + }, + + Intent::Continue(event_set) => { + if let Some(peer_stream) = self.peer_streams.get_mut(token.0) { + event_loop.reregister( + peer_stream.evented(), + token, + event_set, + mio::PollOpt::edge() | mio::PollOpt::oneshot() + ).unwrap(); + } + }, + } + } } impl mio::Handler for Handler { @@ -195,7 +246,11 @@ impl mio::Handler for Handler { let intent = self.server_stream.on_ready(event_set); self.process_server_intent(intent, event_loop); } else { - unreachable!("Unknown token!"); + let intent = match self.peer_streams.get_mut(token.0) { + Some(peer_stream) => peer_stream.on_ready(event_set), + None => unreachable!("Unknown token is ready"), + }; + self.process_peer_intent(intent, token, event_loop); } } @@ -204,7 +259,23 @@ impl mio::Handler for Handler { { match request { Request::ConnectToPeer(ip, port) => - self.connect_to_peer(ip, port), + self.connect_to_peer(ip, port, event_loop), + + Request::PeerMessage(peer_id, message) => { + let intent = match self.peer_streams.get_mut(peer_id) { + Some(peer_stream) => peer_stream.on_notify(&message), + None => { + error!( + "Cannot send peer message {:?}: unknown id {}", + message, peer_id + ); + return + } + }; + self.process_peer_intent( + intent, mio::Token(peer_id), event_loop + ); + }, Request::ServerRequest(server_request) => { let intent = self.server_stream.on_notify(&server_request); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d05b0f2..51286fe 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,6 +1,7 @@ mod constants; mod handler; mod packet; +pub mod peer; pub mod server; mod stream; diff --git a/src/proto/peer/constants.rs b/src/proto/peer/constants.rs new file mode 100644 index 0000000..1555d98 --- /dev/null +++ b/src/proto/peer/constants.rs @@ -0,0 +1,2 @@ +pub const CODE_PIERCE_FIREWALL: u32 = 0; +pub const CODE_PEER_INIT: u32 = 1; diff --git a/src/proto/peer/message.rs b/src/proto/peer/message.rs new file mode 100644 index 0000000..ad9cb54 --- /dev/null +++ b/src/proto/peer/message.rs @@ -0,0 +1,96 @@ +use std::io; + +use super::super::{ + MutPacket, Packet, PacketReadError, ReadFromPacket, WriteToPacket +}; +use super::constants::*; + +/*=========* + * MESSAGE * + *=========*/ + +/// This enum contains all the possible messages peers can exchange. +#[derive(Clone, Debug)] +pub enum Message { + PierceFirewall(u32), + PeerInit(PeerInit), + Unknown(u32), +} + +impl ReadFromPacket for Message { + fn read_from_packet(packet: &mut Packet) -> Result { + let code: u32 = try!(packet.read_value()); + let message = match code { + CODE_PIERCE_FIREWALL => + Message::PierceFirewall( + try!(packet.read_value()) + ), + + CODE_PEER_INIT => + Message::PeerInit( + try!(packet.read_value()) + ), + + code => Message::Unknown(code) + }; + + let bytes_remaining = packet.bytes_remaining(); + if bytes_remaining > 0 { + warn!( + "Peer message with code {} contains {} extra bytes", + code, bytes_remaining + ) + } + + Ok(message) + } +} + +impl WriteToPacket for Message { + fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> { + match *self { + Message::PierceFirewall(ref token) => { + try!(packet.write_value(&CODE_PIERCE_FIREWALL)); + try!(packet.write_value(token)); + }, + + Message::PeerInit(ref request) => { + try!(packet.write_value(&CODE_PEER_INIT)); + try!(packet.write_value(request)); + }, + + Message::Unknown(_) => unreachable!(), + } + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct PeerInit { + pub user_name: String, + pub connection_type: String, + pub token: u32, +} + +impl ReadFromPacket for PeerInit { + fn read_from_packet(packet: &mut Packet) -> Result { + let user_name = try!(packet.read_value()); + let connection_type = try!(packet.read_value()); + let token = try!(packet.read_value()); + Ok(PeerInit { + user_name: user_name, + connection_type: connection_type, + token: token, + }) + } +} + +impl WriteToPacket for PeerInit { + fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> { + try!(packet.write_value(&self.user_name)); + try!(packet.write_value(&self.connection_type)); + try!(packet.write_value(&self.token)); + Ok(()) + } +} + diff --git a/src/proto/peer/mod.rs b/src/proto/peer/mod.rs new file mode 100644 index 0000000..f92fbb0 --- /dev/null +++ b/src/proto/peer/mod.rs @@ -0,0 +1,4 @@ +mod constants; +mod message; + +pub use self::message::*;