|
|
|
@ -1,142 +1,134 @@ |
|
|
|
//! This module provides tokio Codec implementations for protocol messages.
|
|
|
|
//! This module provides a codec implementation for protocol frames.
|
|
|
|
//!
|
|
|
|
//! Specifically, the following types:
|
|
|
|
//!
|
|
|
|
//! * proto::peer::Message
|
|
|
|
//! * proto::server::ServerRequest
|
|
|
|
//! * proto::server::ServerResponse
|
|
|
|
//!
|
|
|
|
//! This enables wrapping AsyncRead and AsyncWrite objects into Stream and Sink
|
|
|
|
//! objects using tokio_codec's FramedRead and FramedWrite adapters.
|
|
|
|
|
|
|
|
// TODO: Refactor all this into futures and remove tokio dependency.
|
|
|
|
//! The goal of this codec is to transform byte streams into value streams.
|
|
|
|
|
|
|
|
use std::io;
|
|
|
|
use std::convert::TryInto;
|
|
|
|
use std::marker;
|
|
|
|
|
|
|
|
use bytes::BytesMut;
|
|
|
|
use tokio_codec;
|
|
|
|
|
|
|
|
use super::peer::Message;
|
|
|
|
use super::server::{ServerRequest, ServerResponse};
|
|
|
|
use super::u32::U32_BYTE_LEN;
|
|
|
|
use super::value_codec::{ValueDecode, ValueDecoder, ValueEncode, ValueEncoder};
|
|
|
|
use thiserror::Error;
|
|
|
|
|
|
|
|
use super::prefix::Prefixer;
|
|
|
|
use super::u32::{decode_u32, U32_BYTE_LEN};
|
|
|
|
use super::value_codec::{
|
|
|
|
ValueDecode, ValueDecodeError, ValueDecoder, ValueEncode, ValueEncodeError, ValueEncoder,
|
|
|
|
};
|
|
|
|
|
|
|
|
#[derive(Debug, Error, PartialEq)]
|
|
|
|
pub enum FrameEncodeError {
|
|
|
|
#[error("encoded value length {length} is too large")]
|
|
|
|
ValueTooLarge {
|
|
|
|
/// The length of the encoded value.
|
|
|
|
length: usize,
|
|
|
|
},
|
|
|
|
|
|
|
|
#[error("failed to encode value: {0}")]
|
|
|
|
ValueEncodeError(#[from] ValueEncodeError),
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Implements tokio's Encoder trait for types that implement ValueEncode.
|
|
|
|
pub struct LengthPrefixedEncoder<T> {
|
|
|
|
/// Encodes entire protocol frames containing values of type `T`.
|
|
|
|
pub struct FrameEncoder<T> {
|
|
|
|
phantom: marker::PhantomData<T>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> LengthPrefixedEncoder<T> {
|
|
|
|
impl<T: ValueEncode> FrameEncoder<T> {
|
|
|
|
pub fn new() -> Self {
|
|
|
|
Self {
|
|
|
|
phantom: marker::PhantomData,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: ValueEncode> tokio_codec::Encoder for LengthPrefixedEncoder<T> {
|
|
|
|
type Item = T;
|
|
|
|
type Error = io::Error;
|
|
|
|
fn encode_to(&mut self, value: &T, buffer: &mut BytesMut) -> Result<(), FrameEncodeError> {
|
|
|
|
let mut prefixer = Prefixer::new(buffer);
|
|
|
|
|
|
|
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
|
|
// Encode the message.
|
|
|
|
// Note that this is ugly right now, but will get better once we switch
|
|
|
|
// off of Tokio and onto regular futures.
|
|
|
|
let mut buffer = vec![];
|
|
|
|
ValueEncoder::new(&mut buffer).encode(&item)?;
|
|
|
|
ValueEncoder::new(prefixer.suffix_mut()).encode(value)?;
|
|
|
|
|
|
|
|
// Encode the message length.
|
|
|
|
let mut prefix = vec![];
|
|
|
|
ValueEncoder::new(&mut prefix).encode_u32(buffer.len() as u32)?;
|
|
|
|
if let Err(prefixer) = prefixer.finalize() {
|
|
|
|
return Err(FrameEncodeError::ValueTooLarge {
|
|
|
|
length: prefixer.suffix().len(),
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
dst.reserve(prefix.len() + buffer.len());
|
|
|
|
dst.extend_from_slice(&prefix);
|
|
|
|
dst.extend_from_slice(&buffer);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Implements tokio's Decoder trait for types that implement ValueDecode.
|
|
|
|
pub struct LengthPrefixedDecoder<T> {
|
|
|
|
// The length, as a number of bytes, of the next item to decode.
|
|
|
|
// None if we have not read the length prefix yet.
|
|
|
|
// Some(n) if we read the length prefix, and are now waiting for `n` bytes
|
|
|
|
// to be available.
|
|
|
|
length: Option<usize>,
|
|
|
|
|
|
|
|
/// Decodes entire protocol frames containing values of type `T`.
|
|
|
|
pub struct FrameDecoder<T> {
|
|
|
|
// Only here to enable parameterizing `Decoder` by `T`.
|
|
|
|
phantom: marker::PhantomData<T>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> LengthPrefixedDecoder<T> {
|
|
|
|
impl<T: ValueDecode> FrameDecoder<T> {
|
|
|
|
pub fn new() -> Self {
|
|
|
|
Self {
|
|
|
|
length: None,
|
|
|
|
phantom: marker::PhantomData,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// If necessary, atempts to decode a length prefix from `src`.
|
|
|
|
//
|
|
|
|
// Helper for decode() below.
|
|
|
|
//
|
|
|
|
// If self.length is not None, returns Ok(()).
|
|
|
|
// If there are not enough bytes in `src`, returns Ok(()).
|
|
|
|
// Otherwise, splits off the length prefix bytes from `src`, and:
|
|
|
|
// - returns an error if decoding the value failed.
|
|
|
|
// - sets self.length to Some(length) and returns Ok(()) otherwise.
|
|
|
|
fn maybe_decode_length(&mut self, src: &mut BytesMut) -> io::Result<()> {
|
|
|
|
if self.length.is_some() {
|
|
|
|
return Ok(()); // Aready read length.
|
|
|
|
}
|
|
|
|
|
|
|
|
if src.len() < U32_BYTE_LEN {
|
|
|
|
return Ok(()); // Not enough bytes yet.
|
|
|
|
/// Attempts to decode an entire frame from the given buffer.
|
|
|
|
///
|
|
|
|
/// Returns `Ok(Some(frame))` if successful, in which case the frame's bytes
|
|
|
|
/// have been split off from the left of `bytes`.
|
|
|
|
///
|
|
|
|
/// Returns `Ok(None)` if not enough bytes are available to decode an entire
|
|
|
|
/// frame yet, in which case `bytes` is untouched.
|
|
|
|
///
|
|
|
|
/// Returns an error if the length prefix or the framed value are malformed,
|
|
|
|
/// in which case `bytes` is untouched.
|
|
|
|
pub fn decode_from(&mut self, bytes: &mut BytesMut) -> Result<Option<T>, ValueDecodeError> {
|
|
|
|
if bytes.len() < U32_BYTE_LEN {
|
|
|
|
return Ok(None); // Not enough bytes yet.
|
|
|
|
}
|
|
|
|
|
|
|
|
let prefix = src.split_to(U32_BYTE_LEN);
|
|
|
|
let length = ValueDecoder::new(&prefix).decode::<u32>()?;
|
|
|
|
|
|
|
|
self.length = Some(length as usize);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: ValueDecode> tokio_codec::Decoder for LengthPrefixedDecoder<T> {
|
|
|
|
type Item = T;
|
|
|
|
type Error = io::Error;
|
|
|
|
|
|
|
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
|
|
// If we have not read the length prefix yet, attempt to do so first.
|
|
|
|
self.maybe_decode_length(src)?;
|
|
|
|
|
|
|
|
let length = match self.length {
|
|
|
|
None => return Ok(None), // Not enough bytes yet.
|
|
|
|
Some(n) => n,
|
|
|
|
};
|
|
|
|
|
|
|
|
if src.len() < length {
|
|
|
|
// Split the prefix off. After this:
|
|
|
|
//
|
|
|
|
// | bytes (len 4) | suffix |
|
|
|
|
//
|
|
|
|
// NOTE: This method would be simpler if we could use split_to() instead
|
|
|
|
// here such that `bytes` contained the suffix. At the end, we would not
|
|
|
|
// have to replace `bytes` with `suffix`. However, that would require
|
|
|
|
// calling `prefix.unsplit(*bytes)`, and that does not work since
|
|
|
|
// `bytes` is only borrowed, and unsplit() takes its argument by value.
|
|
|
|
let mut suffix = bytes.split_off(U32_BYTE_LEN);
|
|
|
|
|
|
|
|
// unwrap() cannot panic because `bytes` is of the exact right length.
|
|
|
|
let array: [u8; U32_BYTE_LEN] = bytes.as_ref().try_into().unwrap();
|
|
|
|
let length = decode_u32(array) as usize;
|
|
|
|
|
|
|
|
if suffix.len() < length {
|
|
|
|
// Re-assemble `bytes` as it first was.
|
|
|
|
bytes.unsplit(suffix);
|
|
|
|
return Ok(None); // Not enough bytes yet.
|
|
|
|
}
|
|
|
|
|
|
|
|
// Split off the right amount of bytes from the buffer.
|
|
|
|
let buf = src.split_to(length);
|
|
|
|
self.length = None;
|
|
|
|
// Split off the right amount of bytes from the buffer. After this:
|
|
|
|
//
|
|
|
|
// | bytes (len 4) | contents | suffix |
|
|
|
|
//
|
|
|
|
let mut contents = suffix.split_to(length);
|
|
|
|
|
|
|
|
// Attempt to decode the value.
|
|
|
|
let item = ValueDecoder::new(&buf).decode()?;
|
|
|
|
let item = match ValueDecoder::new(&contents).decode() {
|
|
|
|
Ok(item) => item,
|
|
|
|
Err(error) => {
|
|
|
|
// Re-assemble `bytes` as it first was.
|
|
|
|
contents.unsplit(suffix);
|
|
|
|
bytes.unsplit(contents);
|
|
|
|
return Err(error);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// Remove the decoded bytes from the left of `bytes`.
|
|
|
|
*bytes = suffix;
|
|
|
|
Ok(Some(item))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
mod tests {
|
|
|
|
use bytes::BytesMut;
|
|
|
|
use tokio_codec::{Decoder, Encoder};
|
|
|
|
|
|
|
|
use crate::proto::ValueEncode;
|
|
|
|
|
|
|
|
use super::{LengthPrefixedDecoder, LengthPrefixedEncoder};
|
|
|
|
use super::{FrameDecoder, FrameEncoder};
|
|
|
|
|
|
|
|
// Test value: [1, 3, 3, 7] in little-endian.
|
|
|
|
const U32_1337: u32 = 1 + (3 << 8) + (3 << 16) + (7 << 24);
|
|
|
|
@ -144,8 +136,9 @@ mod tests { |
|
|
|
#[test]
|
|
|
|
fn encode_u32() {
|
|
|
|
let mut bytes = BytesMut::new();
|
|
|
|
LengthPrefixedEncoder::new()
|
|
|
|
.encode(U32_1337, &mut bytes)
|
|
|
|
|
|
|
|
FrameEncoder::new()
|
|
|
|
.encode_to(&U32_1337, &mut bytes)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
@ -157,12 +150,31 @@ mod tests { |
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn encode_appends() {
|
|
|
|
let mut bytes = BytesMut::new();
|
|
|
|
|
|
|
|
let mut encoder = FrameEncoder::new();
|
|
|
|
encoder.encode_to(&U32_1337, &mut bytes).unwrap();
|
|
|
|
encoder.encode_to(&U32_1337, &mut bytes).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
bytes,
|
|
|
|
vec![
|
|
|
|
4, 0, 0, 0, // 1 32-bit integer = 4 bytes.
|
|
|
|
1, 3, 3, 7, // Little-endian integer.
|
|
|
|
4, 0, 0, 0, // Repeated.
|
|
|
|
1, 3, 3, 7,
|
|
|
|
]
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn encode_vec() {
|
|
|
|
let v: Vec<u32> = vec![1, 3, 3, 7];
|
|
|
|
|
|
|
|
let mut bytes = BytesMut::new();
|
|
|
|
LengthPrefixedEncoder::new().encode(v, &mut bytes).unwrap();
|
|
|
|
FrameEncoder::new().encode_to(&v, &mut bytes).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
bytes,
|
|
|
|
@ -178,17 +190,33 @@ mod tests { |
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn decode_not_enough_data() {
|
|
|
|
fn decode_not_enough_data_for_prefix() {
|
|
|
|
let mut bytes = BytesMut::from(vec![
|
|
|
|
4, 0, 0, // Incomplete 32-bit length prefix.
|
|
|
|
]);
|
|
|
|
|
|
|
|
let value: Option<u32> = LengthPrefixedDecoder::new().decode(&mut bytes).unwrap();
|
|
|
|
let value: Option<u32> = FrameDecoder::new().decode_from(&mut bytes).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(value, None);
|
|
|
|
assert_eq!(bytes, vec![4, 0, 0]); // Untouched.
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn decode_not_enough_data_for_contents() {
|
|
|
|
let initial_bytes = vec![
|
|
|
|
4, 0, 0, 0, // Length 4.
|
|
|
|
1, 2, 3, // But there are only 3 bytes!
|
|
|
|
];
|
|
|
|
|
|
|
|
let mut bytes = BytesMut::new();
|
|
|
|
bytes.extend_from_slice(&initial_bytes);
|
|
|
|
|
|
|
|
let value: Option<u32> = FrameDecoder::new().decode_from(&mut bytes).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(value, None);
|
|
|
|
assert_eq!(bytes, initial_bytes); // Untouched.
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn decode_u32() {
|
|
|
|
let mut bytes = BytesMut::from(vec![
|
|
|
|
@ -197,7 +225,7 @@ mod tests { |
|
|
|
4, 2, // Trailing bytes.
|
|
|
|
]);
|
|
|
|
|
|
|
|
let value = LengthPrefixedDecoder::new().decode(&mut bytes).unwrap();
|
|
|
|
let value = FrameDecoder::new().decode_from(&mut bytes).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(value, Some(U32_1337));
|
|
|
|
assert_eq!(bytes, vec![4, 2]); // Decoded bytes were split off.
|
|
|
|
@ -215,47 +243,13 @@ mod tests { |
|
|
|
4, 2, // Trailing bytes.
|
|
|
|
]);
|
|
|
|
|
|
|
|
let value = LengthPrefixedDecoder::new().decode(&mut bytes).unwrap();
|
|
|
|
let value = FrameDecoder::new().decode_from(&mut bytes).unwrap();
|
|
|
|
|
|
|
|
let expected_value: Vec<u32> = vec![1, 3, 3, 7];
|
|
|
|
assert_eq!(value, Some(expected_value));
|
|
|
|
assert_eq!(bytes, vec![4, 2]); // Decoded bytes were split off.
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn decode_stateful() {
|
|
|
|
let mut decoder = LengthPrefixedDecoder::new();
|
|
|
|
|
|
|
|
let mut bytes = BytesMut::from(vec![
|
|
|
|
4, 0, 0, 0, // 32-bit integer = 4 bytes.
|
|
|
|
1, 3, // Incomplete integer.
|
|
|
|
]);
|
|
|
|
|
|
|
|
let value = decoder.decode(&mut bytes).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(value, None);
|
|
|
|
assert_eq!(bytes, vec![1, 3]); // Decoded bytes were split off.
|
|
|
|
|
|
|
|
bytes.extend_from_slice(&[
|
|
|
|
3, 7, // End of integer.
|
|
|
|
4, 0, 0, 0, // Second identical message waiting to be read.
|
|
|
|
1, 3, 3, 7, //
|
|
|
|
4, 2, // Trailing bytes.
|
|
|
|
]);
|
|
|
|
|
|
|
|
// Decoder has state, remembers that the length prefix was 4.
|
|
|
|
let value = decoder.decode(&mut bytes).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(value, Some(U32_1337));
|
|
|
|
|
|
|
|
// Decoder state resets after entire item is decoded.
|
|
|
|
// Decode the second message now.
|
|
|
|
let value = decoder.decode(&mut bytes).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(value, Some(U32_1337));
|
|
|
|
assert_eq!(bytes, vec![4, 2]); // Decoded bytes were split off.
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn roundtrip() {
|
|
|
|
let value: Vec<String> = vec![
|
|
|
|
@ -267,10 +261,8 @@ mod tests { |
|
|
|
|
|
|
|
let mut buffer = BytesMut::new();
|
|
|
|
|
|
|
|
LengthPrefixedEncoder::new()
|
|
|
|
.encode(value.clone(), &mut buffer)
|
|
|
|
.unwrap();
|
|
|
|
let decoded = LengthPrefixedDecoder::new().decode(&mut buffer).unwrap();
|
|
|
|
FrameEncoder::new().encode_to(&value, &mut buffer).unwrap();
|
|
|
|
let decoded = FrameDecoder::new().decode_from(&mut buffer).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(decoded, Some(value));
|
|
|
|
assert_eq!(buffer, vec![]);
|
|
|
|
|