diff --git a/src/main.rs b/src/main.rs index 6b29e84..418c260 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,54 +17,36 @@ extern crate mio; extern crate rustc_serialize; extern crate ws; -use std::sync::mpsc::channel; +use std::sync::mpsc; use std::thread; -use mio::EventLoop; - -use client::Client; -use proto::ConnectionHandler; - fn main() { match env_logger::init() { Ok(()) => (), Err(err) => { - error!("Failed to initialize logger: {}", err); + error!("Error initializing logger: {}", err); return; } }; - let mut event_loop = match EventLoop::new() { - Ok(event_loop) => event_loop, + let (proto_to_client_tx, proto_to_client_rx) = mpsc::channel(); + + let mut proto_agent = match proto::Agent::new(proto_to_client_tx) { + Ok(agent) => agent, Err(err) => { - error!("Failed to create EventLoop: {}", err); + error!("Error initializing protocol agent: {}", err); return; } }; - let (handler_to_client_tx, handler_to_client_rx) = channel(); - let (control_to_client_tx, control_to_client_rx) = channel(); - let client_to_handler_tx = event_loop.channel(); - - let mut handler = { - let handler_result = ConnectionHandler::new( - config::SERVER_HOST, config::SERVER_PORT, - handler_to_client_tx, &mut event_loop); - - match handler_result { - Ok(handler) => handler, - Err(err) => { - error!("Failed to create ConnectionHandler: {}", err); - return; - } - } - }; + let client_to_proto_tx = proto_agent.channel(); + let (control_to_client_tx, control_to_client_rx) = mpsc::channel(); - let mut client = Client::new( - client_to_handler_tx, handler_to_client_rx, control_to_client_rx + let mut client = client::Client::new( + client_to_proto_tx, proto_to_client_rx, control_to_client_rx ); thread::spawn(move || control::listen(control_to_client_tx)); - thread::spawn(move || event_loop.run(&mut handler).unwrap()); + thread::spawn(move || proto_agent.run().unwrap()); client.run(); } diff --git a/src/proto/handler.rs b/src/proto/handler.rs index 1d34045..17f5d57 100644 --- a/src/proto/handler.rs +++ b/src/proto/handler.rs @@ -1,14 +1,16 @@ use std::collections::VecDeque; use std::io; use std::net::ToSocketAddrs; -use std::sync::mpsc::Sender; +use std::sync::mpsc; -use mio::{EventLoop, EventSet, Handler, PollOpt, Token}; -use mio::tcp::TcpStream; +use mio; + +use config; use super::{Packet, PacketStream, Request, Response}; use super::server::*; +/// This struct provides a simple way to generate different tokens. struct TokenCounter { counter: usize, } @@ -20,44 +22,37 @@ impl TokenCounter { } } - fn next(&mut self) -> Token { + fn next(&mut self) -> mio::Token { self.counter += 1; - Token(self.counter - 1) + mio::Token(self.counter - 1) } } -pub struct ConnectionHandler { +/// This struct handles all the soulseek connections, to the server and to +/// peers. +struct Handler { token_counter: TokenCounter, - server_token: Token, - server_stream: PacketStream, + server_token: mio::Token, + server_stream: PacketStream, server_queue: VecDeque, - client_tx: Sender, + client_tx: mpsc::Sender, } -impl ConnectionHandler { - pub fn new( - server_host: &str, - server_port: u16, - client_tx: Sender, - event_loop: &mut EventLoop) - -> io::Result - { - let server_tcp_stream = try!(Self::connect(server_host, server_port)); - let server_stream = PacketStream::new(server_tcp_stream); - info!("Connected to server at {}:{}", server_host, server_port); +impl Handler { + fn new(client_tx: mpsc::Sender) -> io::Result { + let host = config::SERVER_HOST; + let port = config::SERVER_PORT; + let server_stream = PacketStream::new( + try!(Self::connect(host, port)) + ); + info!("Connected to server at {}:{}", host, port); let mut token_counter = TokenCounter::new(); let server_token = token_counter.next(); - let event_set = EventSet::readable(); - let poll_opt = PollOpt::edge() | PollOpt::oneshot(); - - try!(server_stream.register( - event_loop, server_token, event_set, poll_opt)); - - Ok(ConnectionHandler { + Ok(Handler { token_counter: token_counter, server_token: server_token, @@ -68,14 +63,26 @@ impl ConnectionHandler { }) } - fn connect(hostname: &str, port: u16) -> io::Result { + fn register(&self, event_loop: &mut mio::EventLoop) -> io::Result<()> + { + self.server_stream.register( + event_loop, + self.server_token, + mio::EventSet::readable(), + mio::PollOpt::edge() | mio::PollOpt::oneshot() + ) + } + + fn connect(hostname: &str, port: u16) -> io::Result { for sock_addr in try!((hostname, port).to_socket_addrs()) { - if let Ok(stream) = TcpStream::connect(&sock_addr) { + if let Ok(stream) = mio::tcp::TcpStream::connect(&sock_addr) { return Ok(stream) } } - Err(io::Error::new(io::ErrorKind::Other, - format!("Cannot connect to {}:{}", hostname, port))) + Err(io::Error::new( + io::ErrorKind::Other, + format!("Cannot connect to {}:{}", hostname, port) + )) } fn read_server(&mut self) { @@ -138,27 +145,28 @@ impl ConnectionHandler { } /// Re-register the server socket with the event loop. - fn reregister_server(&mut self, event_loop: &mut EventLoop) { + fn reregister_server(&mut self, event_loop: &mut mio::EventLoop) { let event_set = if self.server_queue.len() > 0 { - EventSet::readable() | EventSet::writable() + mio::EventSet::readable() | mio::EventSet::writable() } else { - EventSet::readable() + mio::EventSet::readable() }; - let poll_opt = PollOpt::edge() | PollOpt::oneshot(); - self.server_stream.reregister( - event_loop, self.server_token, event_set, poll_opt + event_loop, + self.server_token, + event_set, + mio::PollOpt::edge() | mio::PollOpt::oneshot() ).unwrap(); } } -impl Handler for ConnectionHandler { +impl mio::Handler for Handler { type Timeout = (); type Message = Request; - fn ready(&mut self, event_loop: &mut EventLoop, - token: Token, event_set: EventSet) + fn ready(&mut self, event_loop: &mut mio::EventLoop, + token: mio::Token, event_set: mio::EventSet) { if token == self.server_token { if event_set.is_writable() { @@ -173,7 +181,9 @@ impl Handler for ConnectionHandler { } } - fn notify(&mut self, event_loop: &mut EventLoop, request: Request) { + fn notify( + &mut self, event_loop: &mut mio::EventLoop, request: Request) + { match request { Request::ServerRequest(server_request) => { match self.notify_server(server_request) { @@ -185,3 +195,34 @@ impl Handler for ConnectionHandler { } } } + +pub type Sender = mio::Sender; + +pub struct Agent { + event_loop: mio::EventLoop, + handler: Handler, +} + +impl Agent { + pub fn new(client_tx: mpsc::Sender) -> io::Result { + // Create the event loop. + let mut event_loop = try!(mio::EventLoop::new()); + // Create the handler for the event loop. + let handler = try!(Handler::new(client_tx)); + // Register the handler's sockets with the event loop. + try!(handler.register(&mut event_loop)); + + Ok(Agent { + event_loop: event_loop, + handler: handler, + }) + } + + pub fn channel(&self) -> Sender { + self.event_loop.channel() + } + + pub fn run(&mut self) -> io::Result<()> { + self.event_loop.run(&mut self.handler) + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 4a4d8a3..41e2c04 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -2,7 +2,7 @@ mod handler; mod packet; pub mod server; -pub use self::handler::ConnectionHandler; +pub use self::handler::*; pub use self::packet::{ Packet,