Browse Source

Add ServerTransport.

wip
Titouan Rigoudy 7 years ago
parent
commit
1f94a399fc
4 changed files with 73 additions and 37 deletions
  1. +3
    -0
      .gitignore
  2. +4
    -37
      src/proto/codec.rs
  3. +3
    -0
      src/proto/mod.rs
  4. +63
    -0
      src/proto/transport.rs

+ 3
- 0
.gitignore View File

@ -4,6 +4,9 @@
*.rlib
*.dll
# Rustfmt backups
*.bk
# Executables
*.exe


+ 4
- 37
src/proto/codec.rs View File

@ -5,15 +5,8 @@ use std::net;
use std::u16;
use bytes::{Buf, BufMut, BytesMut, LittleEndian};
use bytes::buf::IntoBuf;
use encoding::{Encoding, EncoderTrap, DecoderTrap};
use encoding::all::WINDOWS_1252;
use tokio_core::io::EasyBuf;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::{Decoder, Encoder};
use tokio_io::codec::length_delimited;
use proto::server::ServerResponse;
/// Length of an encoded 32-bit integer in bytes.
const U32_BYTE_LEN: usize = 4;
@ -87,9 +80,9 @@ fn unexpected_eof_error(value_type: &str) -> DecodeError {
DecodeError::from(io::Error::new(io::ErrorKind::UnexpectedEof, value_type))
}
/*=================*
* DECODE / ENCODE *
*=================*/
/*===================================*
* BASIC TYPES ENCODING AND DECODING *
*===================================*/
// The protocol is pretty basic, though quirky. Base types are serialized in
// the following way:
@ -105,7 +98,7 @@ fn unexpected_eof_error(value_type: &str) -> DecodeError {
/// This trait is implemented by types that can be decoded from messages with
/// a `ProtoDecoder`.
/// Only here to enable ProtoDecoder::decode_vec.
/// Only here to enable `ProtoDecoder::decode_vec`.
pub trait ProtoDecode: Sized {
/// Attempts to decode an instance of `Self` using the given decoder.
fn decode(decoder: &mut ProtoDecoder) -> Result<Self, DecodeError>;
@ -333,32 +326,6 @@ impl<T: ProtoEncode> ProtoEncode for Vec<T> {
}
}
/*=================*
* DECODER/ENCODER *
*=================*/
fn new_length_prefixed_framed<T, B>(inner: T) -> length_delimited::Framed<T, B>
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
length_delimited::Builder::new()
.length_field_length(4)
.little_endian()
.new_framed(inner)
}
struct ServerResponseDecoder;
impl Decoder for ServerResponseDecoder {
type Item = ServerResponse;
type Error = DecodeError;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
unimplemented!();
}
}
/*=======*
* TESTS *
*=======*/


+ 3
- 0
src/proto/mod.rs View File

@ -5,7 +5,10 @@ pub mod peer;
pub mod server;
mod stream;
mod codec;
mod transport;
pub use self::handler::*;
pub use self::packet::*;
pub use self::stream::*;
pub use self::server::{ServerResponse, ServerRequest};
pub use self::transport::ServerTransport;

+ 63
- 0
src/proto/transport.rs View File

@ -0,0 +1,63 @@
use std::io;
use bytes::BytesMut;
use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::{Decoder, Encoder, length_delimited};
use super::codec::DecodeError;
use super::{ServerResponse, ServerRequest};
pub struct ServerTransport<T> {
framed: length_delimited::Framed<T, BytesMut>,
}
impl<T: AsyncRead + AsyncWrite> ServerTransport<T> {
fn new(io: T) -> ServerTransport<T> {
ServerTransport {
framed: length_delimited::Builder::new()
.length_field_length(4)
.little_endian()
.new_framed(io),
}
}
}
fn decode(bytes: &mut BytesMut) -> ServerResponse {
unimplemented!();
}
fn encode(request: &ServerRequest) -> BytesMut {
unimplemented!();
}
impl<T: AsyncRead> Stream for ServerTransport<T> {
type Item = ServerResponse;
type Error = DecodeError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.framed.poll() {
Ok(Async::Ready(Some(mut bytes))) => Ok(Async::Ready(Some(decode(&mut bytes)))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(DecodeError::from(err)),
}
}
}
impl<T: AsyncWrite> Sink for ServerTransport<T> {
type SinkItem = ServerRequest;
type SinkError = io::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
match self.framed.start_send(encode(&item)) {
Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(item)),
Err(err) => Err(err),
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.framed.poll_complete()
}
}

Loading…
Cancel
Save