From b55307f123e4d54b010916af437b1d4ca131a264 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Sat, 24 Nov 2018 21:03:40 +0000 Subject: [PATCH] Introduce length-prefixed Tokio decoder. --- src/proto/codec.rs | 228 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 194 insertions(+), 34 deletions(-) diff --git a/src/proto/codec.rs b/src/proto/codec.rs index bb5a8d7..566d882 100644 --- a/src/proto/codec.rs +++ b/src/proto/codec.rs @@ -1,24 +1,28 @@ use std::io; use std::marker; -use tokio_codec; use bytes::BytesMut; +use tokio_codec; -use super::base_codec::{Decode, ProtoEncode, ProtoEncoder, U32_BYTE_LEN}; -use super::server::{ServerRequest,ServerResponse}; +use super::base_codec::{ProtoDecode, ProtoDecoder, ProtoEncode, ProtoEncoder, U32_BYTE_LEN}; use super::peer::Message; +use super::server::{ServerRequest, ServerResponse}; /*===================================* * TOKIO CODEC TRAIT IMPLEMENTATIONS * *===================================*/ // Encodes types that implement ProtoEncode with a length prefix. -struct Encoder { - phantom: marker::PhantomData +pub struct Encoder { + phantom: marker::PhantomData, } impl Encoder { - fn new() -> Self { Self{phantom: marker::PhantomData} } + pub fn new() -> Self { + Self { + phantom: marker::PhantomData, + } + } } impl tokio_codec::Encoder for Encoder { @@ -42,48 +46,104 @@ impl tokio_codec::Encoder for Encoder { } } -pub type ServerRequestEncoder = Encoder; -pub type ServerResponseEncoder = Encoder; -pub type PeerMessageEncoder = Encoder; +// Decodes length-prefixed values from byte buffers. +pub struct Decoder { + // The length, as a number of bytes, of the next item to decode. + // None if we have not read the length prefix yet. + // Some(n) if we read the length prefix, and are now waiting for `n` bytes + // to be available. + length: Option, + + // Only here to enable parameterizing `Decoder` by `T`. + phantom: marker::PhantomData, +} + +impl Decoder { + pub fn new() -> Self { + Self { + length: None, + phantom: marker::PhantomData, + } + } + + // If necessary, atempts to decode a length prefix from `src`. + // + // Helper for decode() below. + // + // If self.length is not None, returns Ok(()). + // If there are not enough bytes in `src`, returns Ok(()). + // Otherwise, splits off the length prefix bytes from `src`, and: + // - returns an error if decoding the value failed. + // - sets self.length to Some(length) and returns Ok(()) otherwise. + fn maybe_decode_length(&mut self, src: &mut BytesMut) -> io::Result<()> { + if self.length.is_some() { + return Ok(()); // Aready read length. + } + + if src.len() < U32_BYTE_LEN { + return Ok(()); // Not enough bytes yet. + } -struct Decoder { - data: marker::PhantomData + let prefix = src.split_to(U32_BYTE_LEN); + let length = ProtoDecoder::new(&prefix).decode::()?; + + self.length = Some(length as usize); + Ok(()) + } } -impl tokio_codec::Decoder for Decoder -where BytesMut: Decode { +impl tokio_codec::Decoder for Decoder { type Item = T; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - Ok(Some(src.decode()?)) + // If we have not read the length prefix yet, attempt to do so first. + self.maybe_decode_length(src)?; + + let length = match self.length { + None => return Ok(None), // Not enough bytes yet. + Some(n) => n, + }; + + if src.len() < length { + return Ok(None); // Not enough bytes yet. + } + + // Split off the right amount of bytes from the buffer. + let buf = src.split_to(length); + self.length = None; + + // Attempt to decode the value. + let item = ProtoDecoder::new(&buf).decode()?; + Ok(Some(item)) } } -pub type ServerRequestDecoder = Decoder; -pub type ServerResponseDecoder = Decoder; -pub type PeerMessageDecoder = Decoder; - mod tests { use bytes::BytesMut; - use tokio_codec::Encoder; + use tokio_codec::{Decoder, Encoder}; use proto::ProtoEncode; - // Avoid name conflict with tokio_codec::Encoder. + // Avoid name conflict with tokio_codec traits. + use super::Decoder as MyDecoder; use super::Encoder as MyEncoder; + // Test value: [1, 3, 3, 7] in little-endian. + const U32_1337: u32 = 1 + (3 << 8) + (3 << 16) + (7 << 24); + #[test] fn encode_u32() { - let val: u32 = 13 + 37*256; - let mut bytes = BytesMut::new(); - MyEncoder::new().encode(val, &mut bytes).unwrap(); + MyEncoder::new().encode(U32_1337, &mut bytes).unwrap(); - assert_eq!(bytes, vec![ - 4, 0, 0, 0, - 13, 37, 0, 0, - ]); + assert_eq!( + bytes, + vec![ + 4, 0, 0, 0, // 1 32-bit integer = 4 bytes. + 1, 3, 3, 7, // Little-endian integer. + ] + ); } #[test] @@ -93,13 +153,113 @@ mod tests { let mut bytes = BytesMut::new(); MyEncoder::new().encode(v, &mut bytes).unwrap(); - assert_eq!(bytes, vec![ - 20, 0, 0, 0, // 5 32-bit integers = 20 bytes. - 4, 0, 0, 0, - 1, 0, 0, 0, - 3, 0, 0, 0, - 3, 0, 0, 0, - 7, 0, 0, 0 + assert_eq!( + bytes, + vec![ + 20, 0, 0, 0, // 5 32-bit integers = 20 bytes. + 4, 0, 0, 0, // 4 elements in the vector. + 1, 0, 0, 0, // Little-endian vector elements. + 3, 0, 0, 0, // + 3, 0, 0, 0, // + 7, 0, 0, 0, // + ] + ); + } + + #[test] + fn decode_not_enough_data() { + let mut bytes = BytesMut::from(vec![ + 4, 0, 0, // Incomplete 32-bit length prefix. + ]); + + let value: Option = MyDecoder::new().decode(&mut bytes).unwrap(); + + assert_eq!(value, None); + assert_eq!(bytes, vec![4, 0, 0]); // Untouched. + } + + #[test] + fn decode_u32() { + let mut bytes = BytesMut::from(vec![ + 4, 0, 0, 0, // 1 32-bit integer = 4 bytes. + 1, 3, 3, 7, // Little-endian integer. + 4, 2, // Trailing bytes. + ]); + + let value = MyDecoder::new().decode(&mut bytes).unwrap(); + + assert_eq!(value, Some(U32_1337)); + assert_eq!(bytes, vec![4, 2]); // Decoded bytes were split off. + } + + #[test] + fn decode_vec() { + let mut bytes = BytesMut::from(vec![ + 20, 0, 0, 0, // 5 32-bit integers = 20 bytes. + 4, 0, 0, 0, // 4 elements in the vector. + 1, 0, 0, 0, // Little-endian vector elements. + 3, 0, 0, 0, // + 3, 0, 0, 0, // + 7, 0, 0, 0, // + 4, 2, // Trailing bytes. + ]); + + let value = MyDecoder::new().decode(&mut bytes).unwrap(); + + let expected_value: Vec = vec![1, 3, 3, 7]; + assert_eq!(value, Some(expected_value)); + assert_eq!(bytes, vec![4, 2]); // Decoded bytes were split off. + } + + #[test] + fn decode_stateful() { + let mut decoder = MyDecoder::new(); + + let mut bytes = BytesMut::from(vec![ + 4, 0, 0, 0, // 32-bit integer = 4 bytes. + 1, 3, // Incomplete integer. ]); + + let value = decoder.decode(&mut bytes).unwrap(); + + assert_eq!(value, None); + assert_eq!(bytes, vec![1, 3]); // Decoded bytes were split off. + + bytes.extend_from_slice(&[ + 3, 7, // End of integer. + 4, 0, 0, 0, // Second identical message waiting to be read. + 1, 3, 3, 7, // + 4, 2, // Trailing bytes. + ]); + + // Decoder has state, remembers that the length prefix was 4. + let value = decoder.decode(&mut bytes).unwrap(); + + assert_eq!(value, Some(U32_1337)); + + // Decoder state resets after entire item is decoded. + // Decode the second message now. + let value = decoder.decode(&mut bytes).unwrap(); + + assert_eq!(value, Some(U32_1337)); + assert_eq!(bytes, vec![4, 2]); // Decoded bytes were split off. + } + + #[test] + fn roundtrip() { + let value: Vec = vec![ + "apples".to_string(), // + "bananas".to_string(), // + "oranges".to_string(), // + "and cheese!".to_string(), // + ]; + + let mut buffer = BytesMut::new(); + + MyEncoder::new().encode(value.clone(), &mut buffer).unwrap(); + let decoded = MyDecoder::new().decode(&mut buffer).unwrap(); + + assert_eq!(decoded, Some(value)); + assert_eq!(buffer, vec![]); } }