Browse Source

Add ability to open connections to peers.

wip
Titouan Rigoudy 9 years ago
parent
commit
3955246870
6 changed files with 129 additions and 47 deletions
  1. +6
    -0
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +30
    -7
      src/client.rs
  4. +2
    -0
      src/config.rs
  5. +1
    -0
      src/main.rs
  6. +89
    -40
      src/proto/handler.rs

+ 6
- 0
Cargo.lock View File

@ -9,6 +9,7 @@ dependencies = [
"mio 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rust-crypto 0.2.34 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ws 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -279,6 +280,11 @@ name = "slab"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "slab"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "time"
version = "0.1.34"


+ 1
- 0
Cargo.toml View File

@ -11,4 +11,5 @@ log = "^0.3.5"
mio = "^0.5"
rust-crypto = "^0.2.34"
rustc-serialize = "^0.3.17"
slab = "^0.2"
ws = "^0.4"

+ 30
- 7
src/client.rs View File

@ -11,7 +11,7 @@ use user;
#[derive(Debug)]
enum IncomingMessage {
ServerResponse(server::ServerResponse),
Proto(proto::Response),
ControlNotification(control::Notification),
}
@ -73,8 +73,8 @@ impl Client {
loop {
match self.recv() {
IncomingMessage::ServerResponse(response) =>
self.handle_server_response(response),
IncomingMessage::Proto(response) =>
self.handle_proto_response(response),
IncomingMessage::ControlNotification(notif) =>
self.handle_control_notification(notif),
@ -89,10 +89,7 @@ impl Client {
let control_rx = &self.control_rx;
select! {
result = proto_rx.recv() =>
match result.unwrap() {
proto::Response::ServerResponse(server_response) =>
IncomingMessage::ServerResponse(server_response),
},
IncomingMessage::Proto(result.unwrap()),
result = control_rx.recv() =>
IncomingMessage::ControlNotification(result.unwrap())
@ -256,12 +253,30 @@ impl Client {
));
}
/*=========================*
* PROTO RESPONSE HANDLING *
*=========================*/
fn handle_proto_response(&mut self, response: proto::Response) {
match response {
proto::Response::ServerResponse(server_response) =>
self.handle_server_response(server_response),
_ => {
warn!("Unhandled proto response: {:?}", response);
}
}
}
/*==========================*
* SERVER RESPONSE HANDLING *
*==========================*/
fn handle_server_response(&mut self, response: server::ServerResponse) {
match response {
server::ServerResponse::ConnectToPeerResponse(response) =>
self.handle_connect_to_peer_response(response),
server::ServerResponse::LoginResponse(response) =>
self.handle_login_response(response),
@ -302,6 +317,14 @@ impl Client {
}
}
fn handle_connect_to_peer_response(
&mut self, response: server::ConnectToPeerResponse)
{
self.proto_tx.send(proto::Request::ConnectToPeer(
response.ip, response.port
));
}
fn handle_login_response(&mut self, login: server::LoginResponse) {
if let LoginStatus::Pending = self.login_status {
match login {


+ 2
- 0
src/config.rs View File

@ -11,3 +11,5 @@ pub const SERVER_PORT : u16 = 2242;
pub const CONTROL_HOST: &'static str = "localhost";
pub const CONTROL_PORT: u16 = 2244;
pub const MAX_PEERS: usize = 1000;

+ 1
- 0
src/main.rs View File

@ -15,6 +15,7 @@ extern crate encoding;
extern crate env_logger;
extern crate mio;
extern crate rustc_serialize;
extern crate slab;
extern crate ws;
use std::sync::mpsc;


+ 89
- 40
src/proto/handler.rs View File

@ -1,46 +1,35 @@
use std::fmt;
use std::io;
use std::net;
use std::net::ToSocketAddrs;
use std::sync::mpsc;
use mio;
use slab;
use config;
use super::{Intent, Stream, SendPacket};
use super::server::*;
const SERVER_TOKEN: usize = 0;
const INIT_PEER_TOKEN: usize = 1;
/*====================*
* REQUEST - RESPONSE *
*====================*/
#[derive(Debug)]
pub enum Request {
ConnectToPeer(net::Ipv4Addr, u16),
ServerRequest(ServerRequest)
}
#[derive(Debug)]
pub enum Response {
ServerResponse(ServerResponse)
}
/*===============*
* TOKEN COUNTER *
*===============*/
/// This struct provides a simple way to generate different tokens.
struct TokenCounter {
counter: usize,
}
impl TokenCounter {
fn new() -> Self {
TokenCounter {
counter: 0,
}
}
fn next(&mut self) -> mio::Token {
self.counter += 1;
mio::Token(self.counter - 1)
}
ConnectToPeerError(net::Ipv4Addr, u16),
ConnectToPeerSuccess(net::Ipv4Addr, u16, usize),
ServerResponse(ServerResponse),
}
/*========================*
@ -58,6 +47,21 @@ impl SendPacket for ServerResponseSender {
}
}
/*======================*
* PEER RESPONSE SENDER *
*======================*/
pub struct PeerResponseSender(mpsc::Sender<Response>, usize);
impl SendPacket for PeerResponseSender {
type Value = u32;
type Error = mpsc::SendError<Response>;
fn send_packet(&mut self, value: Self::Value) -> Result<(), Self::Error> {
Ok(())
}
}
/*=========*
* HANDLER *
*=========*/
@ -65,11 +69,11 @@ impl SendPacket for ServerResponseSender {
/// This struct handles all the soulseek connections, to the server and to
/// peers.
struct Handler {
token_counter: TokenCounter,
server_token: mio::Token,
server_stream: Stream<mio::tcp::TcpStream, ServerResponseSender>,
peer_streams:
slab::Slab<Stream<mio::tcp::TcpStream, PeerResponseSender>, usize>,
client_tx: mpsc::Sender<Response>,
}
@ -78,20 +82,18 @@ impl Handler {
let host = config::SERVER_HOST;
let port = config::SERVER_PORT;
let server_stream = Stream::new(
try!(Self::connect(host, port)),
try!(Self::connect((host, port))),
ServerResponseSender(client_tx.clone())
);
info!("Connected to server at {}:{}", host, port);
let mut token_counter = TokenCounter::new();
let server_token = token_counter.next();
Ok(Handler {
token_counter: token_counter,
server_token: server_token,
server_stream: server_stream,
peer_streams: slab::Slab::new_starting_at(
INIT_PEER_TOKEN, config::MAX_PEERS
),
client_tx: client_tx,
})
}
@ -100,21 +102,23 @@ impl Handler {
{
event_loop.register(
self.server_stream.evented(),
self.server_token,
mio::Token(SERVER_TOKEN),
mio::EventSet::readable(),
mio::PollOpt::edge() | mio::PollOpt::oneshot()
)
}
fn connect(hostname: &str, port: u16) -> io::Result<mio::tcp::TcpStream> {
for sock_addr in try!((hostname, port).to_socket_addrs()) {
fn connect<T>(addr_spec: T) -> io::Result<mio::tcp::TcpStream>
where T: ToSocketAddrs + fmt::Debug
{
for sock_addr in try!(addr_spec.to_socket_addrs()) {
if let Ok(stream) = mio::tcp::TcpStream::connect(&sock_addr) {
return Ok(stream)
}
}
Err(io::Error::new(
io::ErrorKind::Other,
format!("Cannot connect to {}:{}", hostname, port)
format!("Cannot connect to {:?}", addr_spec)
))
}
@ -129,13 +133,55 @@ impl Handler {
Intent::Continue(event_set) => {
event_loop.reregister(
self.server_stream.evented(),
self.server_token,
mio::Token(SERVER_TOKEN),
event_set,
mio::PollOpt::edge() | mio::PollOpt::oneshot()
).unwrap();
}
}
}
fn connect_to_peer(&mut self, ip: net::Ipv4Addr, port: u16) {
let vacant_entry = match self.peer_streams.vacant_entry() {
Some(vacant_entry) => vacant_entry,
None => {
error!(
"Cannot connect to peer {}:{}: too many connections open",
ip, port
);
self.client_tx.send(
Response::ConnectToPeerError(ip, port)
).unwrap();
return
},
};
info!("Connecting to peer {}:{}", ip, port);
let tcp_stream = match Self::connect((ip, port)) {
Ok(tcp_stream) => tcp_stream,
Err(err) => {
error!("Cannot connect to peer {}:{}: {}", ip, port, err);
self.client_tx.send(
Response::ConnectToPeerError(ip, port)
).unwrap();
return
}
};
let token = vacant_entry.index();
let peer_stream = Stream::new(
tcp_stream, PeerResponseSender(self.client_tx.clone(), token)
);
vacant_entry.insert(peer_stream);
self.client_tx.send(
Response::ConnectToPeerSuccess(ip, port, token)
).unwrap();
}
}
impl mio::Handler for Handler {
@ -145,7 +191,7 @@ impl mio::Handler for Handler {
fn ready(&mut self, event_loop: &mut mio::EventLoop<Self>,
token: mio::Token, event_set: mio::EventSet)
{
if token == self.server_token {
if token.0 == SERVER_TOKEN {
let intent = self.server_stream.on_ready(event_set);
self.process_server_intent(intent, event_loop);
} else {
@ -157,10 +203,13 @@ impl mio::Handler for Handler {
request: Request)
{
match request {
Request::ConnectToPeer(ip, port) =>
self.connect_to_peer(ip, port),
Request::ServerRequest(server_request) => {
let intent = self.server_stream.on_notify(&server_request);
self.process_server_intent(intent, event_loop);
}
},
}
}
}


Loading…
Cancel
Save