Browse Source

Add basic communication with peers.

wip
Titouan Rigoudy 9 years ago
parent
commit
2ad5f19850
6 changed files with 304 additions and 14 deletions
  1. +118
    -2
      src/client.rs
  2. +83
    -12
      src/proto/handler.rs
  3. +1
    -0
      src/proto/mod.rs
  4. +2
    -0
      src/proto/peer/constants.rs
  5. +96
    -0
      src/proto/peer/message.rs
  6. +4
    -0
      src/proto/peer/mod.rs

+ 118
- 2
src/client.rs View File

@ -1,10 +1,14 @@
use std::collections::hash_map;
use std::net;
use std::sync::mpsc;
use mio;
use slab;
use config;
use control;
use proto;
use proto::peer;
use proto::server;
use room;
use user;
@ -15,13 +19,25 @@ enum IncomingMessage {
ControlNotification(control::Notification),
}
#[derive(Debug, Clone)]
#[derive(Clone, Debug)]
enum LoginStatus {
Pending,
Success(String),
Failure(String),
}
#[derive(Clone, Copy, Debug)]
enum ConnectingPeer {
Direct,
Firewalled(u32)
}
#[derive(Debug)]
struct Peer {
ip: net::Ipv4Addr,
port: u16,
}
pub struct Client {
proto_tx: mio::Sender<proto::Request>,
proto_rx: mpsc::Receiver<proto::Response>,
@ -33,6 +49,10 @@ pub struct Client {
rooms: room::RoomMap,
users: user::UserMap,
connecting_peers: hash_map::HashMap<(net::Ipv4Addr, u16), ConnectingPeer>,
peers: slab::Slab<Peer, usize>,
next_peer_token: u32,
}
impl Client {
@ -56,6 +76,10 @@ impl Client {
rooms: room::RoomMap::new(),
users: user::UserMap::new(),
connecting_peers: hash_map::HashMap::new(),
peers: slab::Slab::new(config::MAX_PEERS),
next_peer_token: 0,
}
}
@ -101,6 +125,12 @@ impl Client {
self.proto_tx.send(proto::Request::ServerRequest(request)).unwrap();
}
/// Send a message to a peer.
fn send_to_peer(&self, peer_id: usize, message: peer::Message) {
self.proto_tx.send(proto::Request::PeerMessage(peer_id, message))
.unwrap();
}
/// Send a response to the controller client.
fn send_to_controller(&mut self, response: control::Response) {
let result = match self.control_tx {
@ -262,12 +292,90 @@ impl Client {
proto::Response::ServerResponse(server_response) =>
self.handle_server_response(server_response),
proto::Response::ConnectToPeerSuccess(ip, port, peer_id) =>
self.handle_connect_to_peer_success(ip, port, peer_id),
proto::Response::PeerConnectionClosed(peer_id) =>
self.handle_peer_connection_closed(peer_id),
_ => {
warn!("Unhandled proto response: {:?}", response);
}
}
}
fn handle_peer_connection_closed(&mut self, peer_id: usize) {
info!("Connection to peer {} has closed", peer_id);
if let None = self.peers.remove(peer_id) {
error!("Unknown peer {}", peer_id);
}
}
fn handle_connect_to_peer_success(
&mut self, ip: net::Ipv4Addr, port: u16, peer_id: usize)
{
info!("Connected to peer {}:{} with id {}", ip, port, peer_id);
let connecting_peer = match self.connecting_peers.remove(&(ip, port)) {
None => {
error!("Unknown peer {}:{}", ip, port);
return
},
Some(connecting_peer) => connecting_peer,
};
match self.peers.entry(peer_id) {
None => {
error!(
"Slab entry at {} for {}:{} does not exist",
peer_id, ip, port
);
return
},
Some(slab::Entry::Occupied(occupied_entry)) => {
error!(
"Slab entry at {} for {}:{} is occupied by {:?}",
peer_id, ip, port, occupied_entry.get()
);
return
},
Some(slab::Entry::Vacant(vacant_entry)) => {
vacant_entry.insert(Peer {
ip: ip,
port: port,
});
},
}
match connecting_peer {
ConnectingPeer::Direct => {
let token = self.next_peer_token;
self.next_peer_token += 1;
self.send_to_peer(peer_id, peer::Message::PeerInit(
peer::PeerInit {
user_name: config::USERNAME.to_string(),
connection_type: "P".to_string(),
token: token,
}
));
},
ConnectingPeer::Firewalled(token) => {
debug!(
"Piercing firewall for peer {} with token {}",
peer_id, token
);
self.send_to_peer(
peer_id, peer::Message::PierceFirewall(token)
);
}
}
}
/*==========================*
* SERVER RESPONSE HANDLING *
*==========================*/
@ -320,9 +428,17 @@ impl Client {
fn handle_connect_to_peer_response(
&mut self, response: server::ConnectToPeerResponse)
{
info!(
"Connecting to peer {}:{} with token {} to pierce firewall",
response.ip, response.port, response.token
);
self.connecting_peers.insert(
(response.ip, response.port),
ConnectingPeer::Firewalled(response.token)
);
self.proto_tx.send(proto::Request::ConnectToPeer(
response.ip, response.port
));
)).unwrap();
}
fn handle_login_response(&mut self, login: server::LoginResponse) {


+ 83
- 12
src/proto/handler.rs View File

@ -11,10 +11,14 @@ use config;
use super::{Intent, Stream, SendPacket};
use super::server::*;
use super::peer;
const SERVER_TOKEN: usize = 0;
const INIT_PEER_TOKEN: usize = 1;
type ServerStream = Stream<mio::tcp::TcpStream, ServerResponseSender>;
type PeerStream = Stream<mio::tcp::TcpStream, PeerResponseSender>;
/*====================*
* REQUEST - RESPONSE *
*====================*/
@ -22,6 +26,7 @@ const INIT_PEER_TOKEN: usize = 1;
#[derive(Debug)]
pub enum Request {
ConnectToPeer(net::Ipv4Addr, u16),
PeerMessage(usize, peer::Message),
ServerRequest(ServerRequest)
}
@ -29,6 +34,8 @@ pub enum Request {
pub enum Response {
ConnectToPeerError(net::Ipv4Addr, u16),
ConnectToPeerSuccess(net::Ipv4Addr, u16, usize),
PeerConnectionClosed(usize),
PeerMessage(peer::Message),
ServerResponse(ServerResponse),
}
@ -54,11 +61,11 @@ impl SendPacket for ServerResponseSender {
pub struct PeerResponseSender(mpsc::Sender<Response>, usize);
impl SendPacket for PeerResponseSender {
type Value = u32;
type Value = peer::Message;
type Error = mpsc::SendError<Response>;
fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> {
Ok(())
self.0.send(Response::PeerMessage(value))
}
}
@ -69,10 +76,9 @@ impl SendPacket for PeerResponseSender {
/// This struct handles all the soulseek connections, to the server and to
/// peers.
struct Handler {
server_stream: Stream<mio::tcp::TcpStream, ServerResponseSender>,
server_stream: ServerStream,
peer_streams:
slab::Slab<Stream<mio::tcp::TcpStream, PeerResponseSender>, usize>,
peer_streams: slab::Slab<PeerStream, usize>,
client_tx: mpsc::Sender<Response>,
}
@ -141,7 +147,12 @@ impl Handler {
}
}
fn connect_to_peer(&mut self, ip: net::Ipv4Addr, port: u16) {
fn connect_to_peer(
&mut self,
ip: net::Ipv4Addr,
port: u16,
event_loop: &mut mio::EventLoop<Self>)
{
let vacant_entry = match self.peer_streams.vacant_entry() {
Some(vacant_entry) => vacant_entry,
None => {
@ -158,7 +169,7 @@ impl Handler {
info!("Connecting to peer {}:{}", ip, port);
let tcp_stream = match Self::connect((ip, port)) {
let mut tcp_stream = match Self::connect((ip, port)) {
Ok(tcp_stream) => tcp_stream,
Err(err) => {
error!("Cannot connect to peer {}:{}: {}", ip, port, err);
@ -170,18 +181,58 @@ impl Handler {
}
};
let token = vacant_entry.index();
let peer_id = vacant_entry.index();
event_loop.register(
&mut tcp_stream,
mio::Token(peer_id),
mio::EventSet::readable(),
mio::PollOpt::edge() | mio::PollOpt::oneshot()
).unwrap();
let peer_stream = Stream::new(
tcp_stream, PeerResponseSender(self.client_tx.clone(), token)
tcp_stream, PeerResponseSender(self.client_tx.clone(), peer_id)
);
vacant_entry.insert(peer_stream);
// This is actually false, because the socket might still be connecting
// asynchronously.
// We will know if the connection worked or not when we get an event
// and try to read or write.
// There is nothing too wrong about telling the client it worked though,
// and closing the connection as soon as the client tries to use it,
// at which point the client will forget about the whole thing.
self.client_tx.send(
Response::ConnectToPeerSuccess(ip, port, token)
Response::ConnectToPeerSuccess(ip, port, peer_id)
).unwrap();
}
fn process_peer_intent(
&mut self,
intent: Intent,
token: mio::Token,
event_loop: &mut mio::EventLoop<Self>)
{
match intent {
Intent::Done => {
self.peer_streams.remove(token.0);
self.client_tx.send(Response::PeerConnectionClosed(token.0))
.unwrap();
},
Intent::Continue(event_set) => {
if let Some(peer_stream) = self.peer_streams.get_mut(token.0) {
event_loop.reregister(
peer_stream.evented(),
token,
event_set,
mio::PollOpt::edge() | mio::PollOpt::oneshot()
).unwrap();
}
},
}
}
}
impl mio::Handler for Handler {
@ -195,7 +246,11 @@ impl mio::Handler for Handler {
let intent = self.server_stream.on_ready(event_set);
self.process_server_intent(intent, event_loop);
} else {
unreachable!("Unknown token!");
let intent = match self.peer_streams.get_mut(token.0) {
Some(peer_stream) => peer_stream.on_ready(event_set),
None => unreachable!("Unknown token is ready"),
};
self.process_peer_intent(intent, token, event_loop);
}
}
@ -204,7 +259,23 @@ impl mio::Handler for Handler {
{
match request {
Request::ConnectToPeer(ip, port) =>
self.connect_to_peer(ip, port),
self.connect_to_peer(ip, port, event_loop),
Request::PeerMessage(peer_id, message) => {
let intent = match self.peer_streams.get_mut(peer_id) {
Some(peer_stream) => peer_stream.on_notify(&message),
None => {
error!(
"Cannot send peer message {:?}: unknown id {}",
message, peer_id
);
return
}
};
self.process_peer_intent(
intent, mio::Token(peer_id), event_loop
);
},
Request::ServerRequest(server_request) => {
let intent = self.server_stream.on_notify(&server_request);


+ 1
- 0
src/proto/mod.rs View File

@ -1,6 +1,7 @@
mod constants;
mod handler;
mod packet;
pub mod peer;
pub mod server;
mod stream;


+ 2
- 0
src/proto/peer/constants.rs View File

@ -0,0 +1,2 @@
pub const CODE_PIERCE_FIREWALL: u32 = 0;
pub const CODE_PEER_INIT: u32 = 1;

+ 96
- 0
src/proto/peer/message.rs View File

@ -0,0 +1,96 @@
use std::io;
use super::super::{
MutPacket, Packet, PacketReadError, ReadFromPacket, WriteToPacket
};
use super::constants::*;
/*=========*
* MESSAGE *
*=========*/
/// This enum contains all the possible messages peers can exchange.
#[derive(Clone, Debug)]
pub enum Message {
PierceFirewall(u32),
PeerInit(PeerInit),
Unknown(u32),
}
impl ReadFromPacket for Message {
fn read_from_packet(packet: &mut Packet) -> Result<Self, PacketReadError> {
let code: u32 = try!(packet.read_value());
let message = match code {
CODE_PIERCE_FIREWALL =>
Message::PierceFirewall(
try!(packet.read_value())
),
CODE_PEER_INIT =>
Message::PeerInit(
try!(packet.read_value())
),
code => Message::Unknown(code)
};
let bytes_remaining = packet.bytes_remaining();
if bytes_remaining > 0 {
warn!(
"Peer message with code {} contains {} extra bytes",
code, bytes_remaining
)
}
Ok(message)
}
}
impl WriteToPacket for Message {
fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> {
match *self {
Message::PierceFirewall(ref token) => {
try!(packet.write_value(&CODE_PIERCE_FIREWALL));
try!(packet.write_value(token));
},
Message::PeerInit(ref request) => {
try!(packet.write_value(&CODE_PEER_INIT));
try!(packet.write_value(request));
},
Message::Unknown(_) => unreachable!(),
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct PeerInit {
pub user_name: String,
pub connection_type: String,
pub token: u32,
}
impl ReadFromPacket for PeerInit {
fn read_from_packet(packet: &mut Packet) -> Result<Self, PacketReadError> {
let user_name = try!(packet.read_value());
let connection_type = try!(packet.read_value());
let token = try!(packet.read_value());
Ok(PeerInit {
user_name: user_name,
connection_type: connection_type,
token: token,
})
}
}
impl WriteToPacket for PeerInit {
fn write_to_packet(&self, packet: &mut MutPacket) -> io::Result<()> {
try!(packet.write_value(&self.user_name));
try!(packet.write_value(&self.connection_type));
try!(packet.write_value(&self.token));
Ok(())
}
}

+ 4
- 0
src/proto/peer/mod.rs View File

@ -0,0 +1,4 @@
mod constants;
mod message;
pub use self::message::*;

Loading…
Cancel
Save