Browse Source

Heavily refactor peer code to detect failed connections.

wip
Titouan Rigoudy 9 years ago
parent
commit
772e03b2e7
3 changed files with 246 additions and 208 deletions
  1. +92
    -76
      src/client.rs
  2. +98
    -109
      src/proto/handler.rs
  3. +56
    -23
      src/proto/stream.rs

+ 92
- 76
src/client.rs View File

@ -1,4 +1,3 @@
use std::collections::hash_map;
use std::net;
use std::sync::mpsc;
@ -13,6 +12,27 @@ use proto::server;
use room;
use user;
/*===============*
* TOKEN COUNTER *
*===============*/
struct TokenCounter {
next_token: u32
}
impl TokenCounter {
fn new() -> Self {
TokenCounter { next_token: 0 }
}
fn next(&mut self) -> u32 {
let token = self.next_token;
self.next_token += 1;
token
}
}
#[derive(Debug)]
enum IncomingMessage {
Proto(proto::Response),
@ -26,16 +46,20 @@ enum LoginStatus {
Failure(String),
}
#[derive(Clone, Copy, Debug)]
enum ConnectingPeer {
Direct,
Firewalled(u32)
#[derive(Debug)]
enum PeerState {
Opening,
OpeningFirewalled,
Open
}
#[derive(Debug)]
struct Peer {
ip: net::Ipv4Addr,
ip: net::Ipv4Addr,
port: u16,
connection_type: String,
token: u32,
state: PeerState,
}
pub struct Client {
@ -50,9 +74,8 @@ pub struct Client {
rooms: room::RoomMap,
users: user::UserMap,
connecting_peers: hash_map::HashMap<(net::Ipv4Addr, u16), ConnectingPeer>,
peers: slab::Slab<Peer, usize>,
next_peer_token: u32,
peers: slab::Slab<Peer, usize>,
token_counter: TokenCounter,
}
impl Client {
@ -77,9 +100,8 @@ impl Client {
rooms: room::RoomMap::new(),
users: user::UserMap::new(),
connecting_peers: hash_map::HashMap::new(),
peers: slab::Slab::new(config::MAX_PEERS),
next_peer_token: 0,
peers: slab::Slab::new(config::MAX_PEERS),
token_counter: TokenCounter::new()
}
}
@ -292,8 +314,8 @@ impl Client {
proto::Response::ServerResponse(server_response) =>
self.handle_server_response(server_response),
proto::Response::PeerConnectionOpen(ip, port, peer_id) =>
self.handle_peer_connection_open(ip, port, peer_id),
proto::Response::PeerConnectionOpen(peer_id) =>
self.handle_peer_connection_open(peer_id),
proto::Response::PeerConnectionClosed(peer_id) =>
self.handle_peer_connection_closed(peer_id),
@ -306,74 +328,53 @@ impl Client {
fn handle_peer_connection_closed(&mut self, peer_id: usize) {
info!("Connection to peer {} has closed", peer_id);
if let None = self.peers.remove(peer_id) {
error!("Unknown peer {}", peer_id);
match self.peers.remove(peer_id) {
None => error!("Unknown peer {}", peer_id),
Some(peer) => {
// TODO if the peer was not connected, talk to the server
}
}
}
fn handle_peer_connection_open(
&mut self, ip: net::Ipv4Addr, port: u16, peer_id: usize)
fn handle_peer_connection_open(&mut self, peer_id: usize)
{
info!("Connected to peer {}:{} with id {}", ip, port, peer_id);
let connecting_peer = match self.connecting_peers.remove(&(ip, port)) {
None => {
error!("Unknown peer {}:{}", ip, port);
return
},
Some(connecting_peer) => connecting_peer,
};
match self.peers.entry(peer_id) {
let message = match self.peers.get_mut(peer_id) {
None => {
error!(
"Slab entry at {} for {}:{} does not exist",
peer_id, ip, port
);
error!("Unknown peer connection {} is open", peer_id);
return
},
Some(slab::Entry::Occupied(occupied_entry)) => {
Some(peer @ &mut Peer { state: PeerState::Open, .. }) => {
error!(
"Slab entry at {} for {}:{} is occupied by {:?}",
peer_id, ip, port, occupied_entry.get()
"Peer connection {} was already open: {:?}",
peer_id, peer
);
return
},
Some(slab::Entry::Vacant(vacant_entry)) => {
vacant_entry.insert(Peer {
ip: ip,
port: port,
});
Some(peer @ &mut Peer { state: PeerState::Opening, .. }) => {
info!("Peer connection {} is now open: {:?}", peer_id, peer);
// Mark it as open.
peer.state = PeerState::Open;
// Send a PeerInit.
peer::Message::PeerInit(peer::PeerInit {
user_name: config::USERNAME.to_string(),
connection_type: peer.connection_type.clone(),
token: peer.token,
})
},
}
match connecting_peer {
ConnectingPeer::Direct => {
let token = self.next_peer_token;
self.next_peer_token += 1;
self.send_to_peer(peer_id, peer::Message::PeerInit(
peer::PeerInit {
user_name: config::USERNAME.to_string(),
connection_type: "P".to_string(),
token: token,
}
));
Some(peer @ &mut Peer{state: PeerState::OpeningFirewalled, ..}) => {
info!("Peer connection {} is now open: {:?}", peer_id, peer);
// Mark it as open.
peer.state = PeerState::Open;
// Send a PierceFirewall.
peer::Message::PierceFirewall(peer.token)
},
};
ConnectingPeer::Firewalled(token) => {
debug!(
"Piercing firewall for peer {} with token {}",
peer_id, token
);
self.send_to_peer(
peer_id, peer::Message::PierceFirewall(token)
);
}
}
self.send_to_peer(peer_id, message);
}
/*==========================*
@ -428,17 +429,32 @@ impl Client {
fn handle_connect_to_peer_response(
&mut self, response: server::ConnectToPeerResponse)
{
info!(
"Connecting to peer {}:{} with token {} to pierce firewall",
response.ip, response.port, response.token
);
self.connecting_peers.insert(
(response.ip, response.port),
ConnectingPeer::Firewalled(response.token)
);
self.proto_tx.send(proto::Request::PeerConnect(
response.ip, response.port
)).unwrap();
let peer = Peer {
ip: response.ip,
port: response.port,
connection_type: response.connection_type,
token: response.token,
state: PeerState::OpeningFirewalled
};
match self.peers.insert(peer) {
Ok(peer_id) => {
info!(
"Opening peer connection {} to {}:{} to pierce firewall",
peer_id, response.ip, response.port
);
self.proto_tx.send(proto::Request::PeerConnect(
peer_id, response.ip, response.port
)).unwrap();
},
Err(peer) => {
warn!(
"Cannot open peer connection {:?}: too many already open",
peer
);
}
}
}
fn handle_login_response(&mut self, login: server::LoginResponse) {


+ 98
- 109
src/proto/handler.rs View File

@ -1,7 +1,5 @@
use std::fmt;
use std::io;
use std::net;
use std::net::ToSocketAddrs;
use std::sync::mpsc;
use mio;
@ -13,11 +11,14 @@ use super::{Intent, Stream, SendPacket};
use super::server::*;
use super::peer;
const SERVER_TOKEN: usize = 0;
const INIT_PEER_TOKEN: usize = 1;
/*===========*
* CONSTANTS *
*===========*/
type ServerStream = Stream<mio::tcp::TcpStream, ServerResponseSender>;
type PeerStream = Stream<mio::tcp::TcpStream, PeerResponseSender>;
// There are only ever MAX_PEERS peer tokens, from 0 to MAX_PEERS - 1.
// This way we ensure no overlap and eliminate the need for coordination
// between client and handler that would otherwise be needed.
const SERVER_TOKEN: usize = config::MAX_PEERS;
/*====================*
* REQUEST - RESPONSE *
@ -25,7 +26,7 @@ type PeerStream = Stream<mio::tcp::TcpStream, PeerResponseSender>;
#[derive(Debug)]
pub enum Request {
PeerConnect(net::Ipv4Addr, u16),
PeerConnect(usize, net::Ipv4Addr, u16),
PeerMessage(usize, peer::Message),
ServerRequest(ServerRequest)
}
@ -33,9 +34,8 @@ pub enum Request {
#[derive(Debug)]
pub enum Response {
PeerConnectionClosed(usize),
PeerConnectionError(net::Ipv4Addr, u16),
PeerConnectionOpen(net::Ipv4Addr, u16, usize),
PeerMessage(peer::Message),
PeerConnectionOpen(usize),
PeerMessage(usize, peer::Message),
ServerResponse(ServerResponse),
}
@ -52,20 +52,31 @@ impl SendPacket for ServerResponseSender {
fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> {
self.0.send(Response::ServerResponse(value))
}
fn notify_open(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
/*======================*
* PEER RESPONSE SENDER *
*======================*/
pub struct PeerResponseSender(mpsc::Sender<Response>, usize);
pub struct PeerResponseSender {
sender: mpsc::Sender<Response>,
peer_id: usize,
}
impl SendPacket for PeerResponseSender {
type Value = peer::Message;
type Error = mpsc::SendError<Response>;
fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> {
self.0.send(Response::PeerMessage(value))
self.sender.send(Response::PeerMessage(self.peer_id, value))
}
fn notify_open(&mut self) -> Result<(), Self::Error> {
self.sender.send(Response::PeerConnectionOpen(self.peer_id))
}
}
@ -76,56 +87,84 @@ impl SendPacket for PeerResponseSender {
/// This struct handles all the soulseek connections, to the server and to
/// peers.
struct Handler {
server_stream: ServerStream,
server_stream: Stream<ServerResponseSender>,
peer_streams: slab::Slab<PeerStream, usize>,
peer_streams: slab::Slab<Stream<PeerResponseSender>, usize>,
client_tx: mpsc::Sender<Response>,
}
impl Handler {
fn new(client_tx: mpsc::Sender<Response>) -> io::Result<Self> {
fn new(
client_tx: mpsc::Sender<Response>,
event_loop: &mut mio::EventLoop<Self>)
-> io::Result<Self>
{
let host = config::SERVER_HOST;
let port = config::SERVER_PORT;
let server_stream = Stream::new(
try!(Self::connect((host, port))),
let server_stream = try!(Stream::new(
(host, port),
ServerResponseSender(client_tx.clone())
);
));
info!("Connected to server at {}:{}", host, port);
try!(event_loop.register(
server_stream.evented(),
mio::Token(SERVER_TOKEN),
mio::EventSet::all(),
mio::PollOpt::edge() | mio::PollOpt::oneshot()
));
Ok(Handler {
server_stream: server_stream,
peer_streams: slab::Slab::new_starting_at(
INIT_PEER_TOKEN, config::MAX_PEERS
),
peer_streams: slab::Slab::new(config::MAX_PEERS),
client_tx: client_tx,
})
}
fn register(&self, event_loop: &mut mio::EventLoop<Self>) -> io::Result<()>
fn connect_to_peer(
&mut self,
peer_id: usize,
ip: net::Ipv4Addr,
port: u16,
event_loop: &mut mio::EventLoop<Self>)
-> Result<(), String>
{
let vacant_entry = match self.peer_streams.entry(peer_id) {
None => return Err("id out of range".to_string()),
Some(slab::Entry::Occupied(occupied_entry)) =>
return Err("id already taken".to_string()),
Some(slab::Entry::Vacant(vacant_entry)) => vacant_entry,
};
info!("Opening peer connection {} to {}:{}", peer_id, ip, port);
let sender = PeerResponseSender {
sender: self.client_tx.clone(),
peer_id: peer_id
};
let peer_stream = match Stream::new((ip, port), sender) {
Ok(peer_stream) => peer_stream,
Err(err) => return Err(format!("i/o error: {}", err))
};
event_loop.register(
self.server_stream.evented(),
mio::Token(SERVER_TOKEN),
mio::EventSet::readable(),
peer_stream.evented(),
mio::Token(peer_id),
mio::EventSet::all(),
mio::PollOpt::edge() | mio::PollOpt::oneshot()
)
}
).unwrap();
fn connect<T>(addr_spec: T) -> io::Result<mio::tcp::TcpStream>
where T: ToSocketAddrs + fmt::Debug
{
for sock_addr in try!(addr_spec.to_socket_addrs()) {
if let Ok(stream) = mio::tcp::TcpStream::connect(&sock_addr) {
return Ok(stream)
}
}
Err(io::Error::new(
io::ErrorKind::Other,
format!("Cannot connect to {:?}", addr_spec)
))
vacant_entry.insert(peer_stream);
Ok(())
}
fn process_server_intent(
@ -147,67 +186,6 @@ impl Handler {
}
}
fn connect_to_peer(
&mut self,
ip: net::Ipv4Addr,
port: u16,
event_loop: &mut mio::EventLoop<Self>)
{
let vacant_entry = match self.peer_streams.vacant_entry() {
Some(vacant_entry) => vacant_entry,
None => {
error!(
"Cannot connect to peer {}:{}: too many connections open",
ip, port
);
self.client_tx.send(
Response::PeerConnectionError(ip, port)
).unwrap();
return
},
};
info!("Connecting to peer {}:{}", ip, port);
let mut tcp_stream = match Self::connect((ip, port)) {
Ok(tcp_stream) => tcp_stream,
Err(err) => {
error!("Cannot connect to peer {}:{}: {}", ip, port, err);
self.client_tx.send(
Response::PeerConnectionError(ip, port)
).unwrap();
return
}
};
let peer_id = vacant_entry.index();
event_loop.register(
&mut tcp_stream,
mio::Token(peer_id),
mio::EventSet::readable(),
mio::PollOpt::edge() | mio::PollOpt::oneshot()
).unwrap();
let peer_stream = Stream::new(
tcp_stream, PeerResponseSender(self.client_tx.clone(), peer_id)
);
vacant_entry.insert(peer_stream);
// This is actually false, because the socket might still be connecting
// asynchronously.
// We will know if the connection worked or not when we get an event
// and try to read or write.
// There is nothing too wrong about telling the client it worked though,
// and closing the connection as soon as the client tries to use it,
// at which point the client will forget about the whole thing.
self.client_tx.send(
Response::PeerConnectionOpen(ip, port, peer_id)
).unwrap();
}
fn process_peer_intent(
&mut self,
intent: Intent,
@ -217,8 +195,9 @@ impl Handler {
match intent {
Intent::Done => {
self.peer_streams.remove(token.0);
self.client_tx.send(Response::PeerConnectionClosed(token.0))
.unwrap();
self.client_tx.send(
Response::PeerConnectionClosed(token.0)
).unwrap();
},
Intent::Continue(event_set) => {
@ -248,7 +227,8 @@ impl mio::Handler for Handler {
} else {
let intent = match self.peer_streams.get_mut(token.0) {
Some(peer_stream) => peer_stream.on_ready(event_set),
None => unreachable!("Unknown token is ready"),
None => unreachable!("Unknown token {} is ready", token.0),
};
self.process_peer_intent(intent, token, event_loop);
}
@ -258,8 +238,18 @@ impl mio::Handler for Handler {
request: Request)
{
match request {
Request::PeerConnect(ip, port) =>
self.connect_to_peer(ip, port, event_loop),
Request::PeerConnect(peer_id, ip, port) =>
if let Err(err) =
self.connect_to_peer(peer_id, ip, port, event_loop)
{
error!(
"Cannot open peer connection {} to {}:{}: {}",
peer_id, ip, port, err
);
self.client_tx.send(
Response::PeerConnectionClosed(peer_id)
).unwrap();
},
Request::PeerMessage(peer_id, message) => {
let intent = match self.peer_streams.get_mut(peer_id) {
@ -296,10 +286,9 @@ impl Agent {
pub fn new(client_tx: mpsc::Sender<Response>) -> io::Result<Self> {
// Create the event loop.
let mut event_loop = try!(mio::EventLoop::new());
// Create the handler for the event loop.
let handler = try!(Handler::new(client_tx));
// Register the handler's sockets with the event loop.
try!(handler.register(&mut event_loop));
// Create the handler for the event loop and register the handler's
// sockets with the event loop.
let handler = try!(Handler::new(client_tx, &mut event_loop));
Ok(Agent {
event_loop: event_loop,


+ 56
- 23
src/proto/stream.rs View File

@ -1,6 +1,8 @@
use std::collections::VecDeque;
use std::error;
use std::fmt;
use std::io;
use std::net::ToSocketAddrs;
use mio;
@ -59,6 +61,8 @@ pub trait SendPacket {
type Error: error::Error;
fn send_packet(&mut self, Self::Value) -> Result<(), Self::Error>;
fn notify_open(&mut self) -> Result<(), Self::Error>;
}
/// This enum defines the possible actions the stream wants to take after
@ -72,37 +76,48 @@ pub enum Intent {
Continue(mio::EventSet),
}
/// This struct wraps around an mio byte stream and handles packet reads and
/// This struct wraps around an mio tcp stream and handles packet reads and
/// writes.
#[derive(Debug)]
pub struct Stream<T, U>
where T: io::Read + io::Write + mio::Evented,
U: SendPacket
pub struct Stream<T: SendPacket>
{
parser: Parser,
queue: VecDeque<OutBuf>,
sender: U,
stream: T,
sender: T,
stream: mio::tcp::TcpStream,
is_connected: bool,
}
impl<T, U> Stream<T, U>
where T: io::Read + io::Write + mio::Evented,
U: SendPacket
impl<T: SendPacket> Stream<T>
{
/// Returns a new struct wrapping the provided byte stream, which will
/// forward packets to the provided sink.
pub fn new(stream: T, sender: U) -> Self {
Stream {
parser: Parser::new(),
queue: VecDeque::new(),
sender: sender,
stream: stream,
/// Returns a new stream, asynchronously connected to the given address,
/// which forwards incoming packets to the given sender.
/// If an error occurs when connecting, returns an error.
pub fn new<U>(addr_spec: U, sender: T) -> io::Result<Self>
where U: ToSocketAddrs + fmt::Debug
{
for sock_addr in try!(addr_spec.to_socket_addrs()) {
if let Ok(stream) = mio::tcp::TcpStream::connect(&sock_addr) {
return Ok(Stream {
parser: Parser::new(),
queue: VecDeque::new(),
sender: sender,
stream: stream,
is_connected: false,
})
}
}
Err(io::Error::new(
io::ErrorKind::Other,
format!("Cannot connect to {:?}", addr_spec)
))
}
/// Returns a reference to the underlying byte stream, to allow it to be
/// registered with an event loop.
pub fn evented(&self) -> &T {
pub fn evented(&self) -> &mio::tcp::TcpStream {
&self.stream
}
@ -158,6 +173,9 @@ impl<T, U> Stream<T, U>
/// The stream is ready to read, write, or both.
pub fn on_ready(&mut self, event_set: mio::EventSet) -> Intent {
if event_set.is_hup() || event_set.is_error() {
return Intent::Done
}
if event_set.is_readable() {
let result = self.on_readable();
if let Err(e) = result {
@ -173,14 +191,29 @@ impl<T, U> Stream<T, U>
}
}
// We must have read or written something succesfully if we're here,
// so the stream must be connected.
if !self.is_connected {
// If we weren't already connected, notify the sink.
if let Err(err) = self.sender.notify_open() {
error!("Cannot notify client that stream is open: {}", err);
return Intent::Done
}
// And record the fact that we are now connected.
self.is_connected = true;
}
// We're always interested in reading more.
let mut event_set =
mio::EventSet::readable() |
mio::EventSet::hup() |
mio::EventSet::error();
// If there is still stuff to write in the queue, we're interested in
// the socket becoming writable too.
let event_set = if self.queue.len() > 0 {
mio::EventSet::readable() | mio::EventSet::writable()
} else {
mio::EventSet::readable()
};
if self.queue.len() > 0 {
event_set = event_set | mio::EventSet::writable();
}
Intent::Continue(event_set)
}


Loading…
Cancel
Save