quic/s2n-quic-core/src/buffer/duplex/interposer.rs (234 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::{
buffer::{
duplex,
reader::{self, Reader, Storage as _},
writer::{self, Writer},
Error,
},
varint::VarInt,
};
use core::convert::Infallible;
/// A wrapper around an underlying buffer (`duplex`) which will prefer to read/write from a
/// user-provided temporary buffer (`storage`). The underlying buffer (`duplex`)'s current
/// position and total length are updated if needed.
pub struct Interposer<'a, S, D>
where
S: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
storage: &'a mut S,
duplex: &'a mut D,
}
impl<'a, S, D> Interposer<'a, S, D>
where
S: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
#[inline]
pub fn new(storage: &'a mut S, duplex: &'a mut D) -> Self {
debug_assert!(
!storage.has_remaining_capacity() || duplex.buffer_is_empty(),
"`duplex` (len={}) should be drained into `storage` (cap={}) before constructing an Interposer",
duplex.buffered_len(),
storage.remaining_capacity()
);
Self { storage, duplex }
}
}
/// Delegates to the inner Duplex
impl<S, D> reader::Storage for Interposer<'_, S, D>
where
S: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
type Error = D::Error;
#[inline]
fn buffered_len(&self) -> usize {
self.duplex.buffered_len()
}
#[inline]
fn buffer_is_empty(&self) -> bool {
self.duplex.buffer_is_empty()
}
#[inline]
fn read_chunk(&mut self, watermark: usize) -> Result<reader::storage::Chunk<'_>, Self::Error> {
self.duplex.read_chunk(watermark)
}
#[inline]
fn partial_copy_into<Dest>(
&mut self,
dest: &mut Dest,
) -> Result<reader::storage::Chunk<'_>, Self::Error>
where
Dest: writer::Storage + ?Sized,
{
self.duplex.partial_copy_into(dest)
}
#[inline]
fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
where
Dest: writer::Storage + ?Sized,
{
self.duplex.copy_into(dest)
}
}
/// Delegates to the inner Duplex
impl<C, D> Reader for Interposer<'_, C, D>
where
C: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
#[inline]
fn current_offset(&self) -> VarInt {
self.duplex.current_offset()
}
#[inline]
fn final_offset(&self) -> Option<VarInt> {
self.duplex.final_offset()
}
#[inline]
fn has_buffered_fin(&self) -> bool {
self.duplex.has_buffered_fin()
}
#[inline]
fn is_consumed(&self) -> bool {
self.duplex.is_consumed()
}
}
impl<C, D> Writer for Interposer<'_, C, D>
where
C: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
#[inline]
fn read_from<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
where
R: Reader + ?Sized,
{
let final_offset = reader.final_offset();
{
// if the storage specializes writing zero-copy Bytes/BytesMut, then just write to the
// receive buffer, since that's what it stores
let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT;
// if the storage has no space left then write into the duplex
should_delegate |= !self.storage.has_remaining_capacity();
// if this packet is non-contiguous, then delegate to the wrapped writer
should_delegate |= reader.current_offset() != self.duplex.current_offset();
// if the storage has less than half of the payload, then delegate
should_delegate |= self.storage.remaining_capacity() < (reader.buffered_len() / 2);
if should_delegate {
self.duplex.read_from(reader)?;
// don't copy into `storage` here - let the caller do that later since it can be
// more efficient to pull from `duplex` all in one go.
return Ok(());
}
}
debug_assert!(
self.storage.has_remaining_capacity(),
"this code should only be executed if the storage has capacity"
);
{
// track the number of consumed bytes
let mut reader = reader.track_read();
reader.copy_into(self.storage)?;
let write_len = reader.consumed_len();
let write_len = VarInt::try_from(write_len).map_err(|_| Error::OutOfRange)?;
// notify the duplex that we bypassed it and should skip
self.duplex
.skip(write_len, final_offset)
.map_err(Error::mapped)?;
}
// if we still have some remaining bytes consume the rest in the duplex
if !reader.buffer_is_empty() {
self.duplex.read_from(reader)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
buffer::{
reader::Reader,
writer::{Storage as _, Writer},
Reassembler,
},
stream::testing::Data,
};
#[test]
fn undersized_storage_test() {
let mut duplex = Reassembler::default();
let mut reader = Data::new(10);
let mut checker = reader;
let mut storage: Vec<u8> = vec![];
{
// limit the storage capacity so we force writing into the duplex
let mut storage = storage.with_write_limit(1);
let mut interposer = Interposer::new(&mut storage, &mut duplex);
interposer.read_from(&mut reader).unwrap();
}
// the storage was too small so we delegated to duplex
assert!(storage.is_empty());
assert_eq!(duplex.buffered_len(), 10);
// move the reassembled bytes into the checker
checker.read_from(&mut duplex).unwrap();
assert_eq!(duplex.current_offset().as_u64(), 10);
assert!(duplex.is_consumed());
}
#[test]
fn out_of_order_test() {
let mut duplex = Reassembler::default();
// first write 5 bytes at offset 5
{
let mut reader = Data::new(10);
// advance the reader by 5 bytes
let _ = reader.send_one(5);
let mut storage: Vec<u8> = vec![];
let mut interposer = Interposer::new(&mut storage, &mut duplex);
interposer.read_from(&mut reader).unwrap();
// make sure we consumed the reader
assert_eq!(reader.current_offset().as_u64(), 10);
assert_eq!(interposer.current_offset().as_u64(), 0);
assert_eq!(interposer.buffered_len(), 0);
// make sure we didn't write to the storage, even if we had capacity, since the
// current_offset doesn't match
assert!(storage.is_empty());
}
// then write 10 bytes at offset 0
{
let mut reader = Data::new(10);
let mut storage: Vec<u8> = vec![];
let mut interposer = Interposer::new(&mut storage, &mut duplex);
interposer.read_from(&mut reader).unwrap();
// make sure we consumed the reader
assert_eq!(reader.current_offset().as_u64(), 10);
assert_eq!(interposer.current_offset().as_u64(), 10);
assert_eq!(interposer.buffered_len(), 0);
// make sure we copied the entire reader
assert_eq!(storage.len(), 10);
assert!(duplex.is_consumed());
}
}
#[test]
fn skip_test() {
let mut duplex = Reassembler::default();
let mut reader = Data::new(10);
let mut checker = reader;
let mut storage: Vec<u8> = vec![];
let mut interposer = Interposer::new(&mut storage, &mut duplex);
interposer.read_from(&mut reader).unwrap();
assert_eq!(storage.len(), 10);
assert_eq!(duplex.current_offset().as_u64(), 10);
checker.receive(&[&storage[..]]);
}
#[test]
fn empty_storage_test() {
let mut duplex = Reassembler::default();
let mut reader = Data::new(10);
let mut checker = reader;
let mut storage = writer::storage::Empty;
let mut interposer = Interposer::new(&mut storage, &mut duplex);
interposer.read_from(&mut reader).unwrap();
assert_eq!(interposer.current_offset().as_u64(), 0);
assert_eq!(interposer.buffered_len(), 10);
checker.read_from(&mut interposer).unwrap();
assert_eq!(interposer.current_offset().as_u64(), 10);
assert!(interposer.buffer_is_empty());
assert_eq!(interposer.buffered_len(), 0);
assert!(interposer.is_consumed());
}
#[test]
fn partial_test() {
let mut duplex = Reassembler::default();
let mut reader = Data::new(10);
let mut checker = reader;
let mut storage: Vec<u8> = vec![];
{
let mut storage = storage.with_write_limit(9);
let mut interposer = Interposer::new(&mut storage, &mut duplex);
interposer.read_from(&mut reader).unwrap();
}
// the storage was at least half the reader
assert_eq!(storage.len(), 9);
assert_eq!(duplex.buffered_len(), 1);
// move the reassembled bytes into the checker
checker.receive(&[&storage]);
checker.read_from(&mut duplex).unwrap();
assert_eq!(duplex.current_offset().as_u64(), 10);
assert!(duplex.is_consumed());
}
}