Browse Source

Refactor proto::handler, add Agent.

wip
Titouan Rigoudy 9 years ago
parent
commit
648b99a3ca
3 changed files with 95 additions and 72 deletions
  1. +12
    -30
      src/main.rs
  2. +82
    -41
      src/proto/handler.rs
  3. +1
    -1
      src/proto/mod.rs

+ 12
- 30
src/main.rs View File

@ -17,54 +17,36 @@ extern crate mio;
extern crate rustc_serialize; extern crate rustc_serialize;
extern crate ws; extern crate ws;
use std::sync::mpsc::channel;
use std::sync::mpsc;
use std::thread; use std::thread;
use mio::EventLoop;
use client::Client;
use proto::ConnectionHandler;
fn main() { fn main() {
match env_logger::init() { match env_logger::init() {
Ok(()) => (), Ok(()) => (),
Err(err) => { Err(err) => {
error!("Failed to initialize logger: {}", err);
error!("Error initializing logger: {}", err);
return; return;
} }
}; };
let mut event_loop = match EventLoop::new() {
Ok(event_loop) => event_loop,
let (proto_to_client_tx, proto_to_client_rx) = mpsc::channel();
let mut proto_agent = match proto::Agent::new(proto_to_client_tx) {
Ok(agent) => agent,
Err(err) => { Err(err) => {
error!("Failed to create EventLoop: {}", err);
error!("Error initializing protocol agent: {}", err);
return; return;
} }
}; };
let (handler_to_client_tx, handler_to_client_rx) = channel();
let (control_to_client_tx, control_to_client_rx) = channel();
let client_to_handler_tx = event_loop.channel();
let mut handler = {
let handler_result = ConnectionHandler::new(
config::SERVER_HOST, config::SERVER_PORT,
handler_to_client_tx, &mut event_loop);
match handler_result {
Ok(handler) => handler,
Err(err) => {
error!("Failed to create ConnectionHandler: {}", err);
return;
}
}
};
let client_to_proto_tx = proto_agent.channel();
let (control_to_client_tx, control_to_client_rx) = mpsc::channel();
let mut client = Client::new(
client_to_handler_tx, handler_to_client_rx, control_to_client_rx
let mut client = client::Client::new(
client_to_proto_tx, proto_to_client_rx, control_to_client_rx
); );
thread::spawn(move || control::listen(control_to_client_tx)); thread::spawn(move || control::listen(control_to_client_tx));
thread::spawn(move || event_loop.run(&mut handler).unwrap());
thread::spawn(move || proto_agent.run().unwrap());
client.run(); client.run();
} }

+ 82
- 41
src/proto/handler.rs View File

@ -1,14 +1,16 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io; use std::io;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::mpsc::Sender;
use std::sync::mpsc;
use mio::{EventLoop, EventSet, Handler, PollOpt, Token};
use mio::tcp::TcpStream;
use mio;
use config;
use super::{Packet, PacketStream, Request, Response}; use super::{Packet, PacketStream, Request, Response};
use super::server::*; use super::server::*;
/// This struct provides a simple way to generate different tokens.
struct TokenCounter { struct TokenCounter {
counter: usize, counter: usize,
} }
@ -20,44 +22,37 @@ impl TokenCounter {
} }
} }
fn next(&mut self) -> Token {
fn next(&mut self) -> mio::Token {
self.counter += 1; self.counter += 1;
Token(self.counter - 1)
mio::Token(self.counter - 1)
} }
} }
pub struct ConnectionHandler {
/// This struct handles all the soulseek connections, to the server and to
/// peers.
struct Handler {
token_counter: TokenCounter, token_counter: TokenCounter,
server_token: Token,
server_stream: PacketStream<TcpStream>,
server_token: mio::Token,
server_stream: PacketStream<mio::tcp::TcpStream>,
server_queue: VecDeque<Packet>, server_queue: VecDeque<Packet>,
client_tx: Sender<Response>,
client_tx: mpsc::Sender<Response>,
} }
impl ConnectionHandler {
pub fn new(
server_host: &str,
server_port: u16,
client_tx: Sender<Response>,
event_loop: &mut EventLoop<Self>)
-> io::Result<Self>
{
let server_tcp_stream = try!(Self::connect(server_host, server_port));
let server_stream = PacketStream::new(server_tcp_stream);
info!("Connected to server at {}:{}", server_host, server_port);
impl Handler {
fn new(client_tx: mpsc::Sender<Response>) -> io::Result<Self> {
let host = config::SERVER_HOST;
let port = config::SERVER_PORT;
let server_stream = PacketStream::new(
try!(Self::connect(host, port))
);
info!("Connected to server at {}:{}", host, port);
let mut token_counter = TokenCounter::new(); let mut token_counter = TokenCounter::new();
let server_token = token_counter.next(); let server_token = token_counter.next();
let event_set = EventSet::readable();
let poll_opt = PollOpt::edge() | PollOpt::oneshot();
try!(server_stream.register(
event_loop, server_token, event_set, poll_opt));
Ok(ConnectionHandler {
Ok(Handler {
token_counter: token_counter, token_counter: token_counter,
server_token: server_token, server_token: server_token,
@ -68,14 +63,26 @@ impl ConnectionHandler {
}) })
} }
fn connect(hostname: &str, port: u16) -> io::Result<TcpStream> {
fn register(&self, event_loop: &mut mio::EventLoop<Self>) -> io::Result<()>
{
self.server_stream.register(
event_loop,
self.server_token,
mio::EventSet::readable(),
mio::PollOpt::edge() | mio::PollOpt::oneshot()
)
}
fn connect(hostname: &str, port: u16) -> io::Result<mio::tcp::TcpStream> {
for sock_addr in try!((hostname, port).to_socket_addrs()) { for sock_addr in try!((hostname, port).to_socket_addrs()) {
if let Ok(stream) = TcpStream::connect(&sock_addr) {
if let Ok(stream) = mio::tcp::TcpStream::connect(&sock_addr) {
return Ok(stream) return Ok(stream)
} }
} }
Err(io::Error::new(io::ErrorKind::Other,
format!("Cannot connect to {}:{}", hostname, port)))
Err(io::Error::new(
io::ErrorKind::Other,
format!("Cannot connect to {}:{}", hostname, port)
))
} }
fn read_server(&mut self) { fn read_server(&mut self) {
@ -138,27 +145,28 @@ impl ConnectionHandler {
} }
/// Re-register the server socket with the event loop. /// Re-register the server socket with the event loop.
fn reregister_server(&mut self, event_loop: &mut EventLoop<Self>) {
fn reregister_server(&mut self, event_loop: &mut mio::EventLoop<Self>) {
let event_set = if self.server_queue.len() > 0 { let event_set = if self.server_queue.len() > 0 {
EventSet::readable() | EventSet::writable()
mio::EventSet::readable() | mio::EventSet::writable()
} else { } else {
EventSet::readable()
mio::EventSet::readable()
}; };
let poll_opt = PollOpt::edge() | PollOpt::oneshot();
self.server_stream.reregister( self.server_stream.reregister(
event_loop, self.server_token, event_set, poll_opt
event_loop,
self.server_token,
event_set,
mio::PollOpt::edge() | mio::PollOpt::oneshot()
).unwrap(); ).unwrap();
} }
} }
impl Handler for ConnectionHandler {
impl mio::Handler for Handler {
type Timeout = (); type Timeout = ();
type Message = Request; type Message = Request;
fn ready(&mut self, event_loop: &mut EventLoop<Self>,
token: Token, event_set: EventSet)
fn ready(&mut self, event_loop: &mut mio::EventLoop<Self>,
token: mio::Token, event_set: mio::EventSet)
{ {
if token == self.server_token { if token == self.server_token {
if event_set.is_writable() { if event_set.is_writable() {
@ -173,7 +181,9 @@ impl Handler for ConnectionHandler {
} }
} }
fn notify(&mut self, event_loop: &mut EventLoop<Self>, request: Request) {
fn notify(
&mut self, event_loop: &mut mio::EventLoop<Self>, request: Request)
{
match request { match request {
Request::ServerRequest(server_request) => { Request::ServerRequest(server_request) => {
match self.notify_server(server_request) { match self.notify_server(server_request) {
@ -185,3 +195,34 @@ impl Handler for ConnectionHandler {
} }
} }
} }
pub type Sender = mio::Sender<Request>;
pub struct Agent {
event_loop: mio::EventLoop<Handler>,
handler: Handler,
}
impl Agent {
pub fn new(client_tx: mpsc::Sender<Response>) -> io::Result<Self> {
// Create the event loop.
let mut event_loop = try!(mio::EventLoop::new());
// Create the handler for the event loop.
let handler = try!(Handler::new(client_tx));
// Register the handler's sockets with the event loop.
try!(handler.register(&mut event_loop));
Ok(Agent {
event_loop: event_loop,
handler: handler,
})
}
pub fn channel(&self) -> Sender {
self.event_loop.channel()
}
pub fn run(&mut self) -> io::Result<()> {
self.event_loop.run(&mut self.handler)
}
}

+ 1
- 1
src/proto/mod.rs View File

@ -2,7 +2,7 @@ mod handler;
mod packet; mod packet;
pub mod server; pub mod server;
pub use self::handler::ConnectionHandler;
pub use self::handler::*;
pub use self::packet::{ pub use self::packet::{
Packet, Packet,


Loading…
Cancel
Save