netbench/src/multiplex/buffer.rs (177 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use super::Frame;
use crate::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use core::{
mem::MaybeUninit,
task::{Context, Poll},
};
use futures::ready;
use std::{collections::VecDeque, io::IoSlice};
use tokio::io::ReadBuf;
const CHUNK_CAPACITY: usize = 4096;
#[derive(Debug, Default)]
pub struct ReadBuffer {
len: usize,
tail: BytesMut,
head: VecDeque<BytesMut>,
}
impl ReadBuffer {
pub fn read<F: FnOnce(&mut ReadBuf) -> Poll<Result<()>>>(
&mut self,
read: F,
) -> Poll<Result<()>> {
let capacity = self.tail.capacity();
if capacity == 0 {
self.tail.reserve(CHUNK_CAPACITY);
} else if capacity < 32 {
let tail = core::mem::replace(&mut self.tail, BytesMut::with_capacity(CHUNK_CAPACITY));
self.head.push_back(tail);
}
let buf = self.tail.chunk_mut();
let buf = unsafe { &mut *(buf as *mut _ as *mut [MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(buf);
ready!(read(&mut buf))?;
let len = buf.filled().len();
if len == 0 {
return Ok(()).into();
}
unsafe {
self.tail.advance_mut(len);
self.len += len;
}
Ok(()).into()
}
fn check_consistency(&self) {
if cfg!(debug_assertions) {
let mut actual_len = self.tail.len();
for chunk in self.head.iter() {
actual_len += chunk.len();
}
assert_eq!(actual_len, self.remaining(), "{self:?}");
}
}
}
impl Buf for ReadBuffer {
fn remaining(&self) -> usize {
self.len
}
fn chunk(&self) -> &[u8] {
for chunk in self.head.iter() {
if !chunk.is_empty() {
return chunk.chunk();
}
}
self.tail.chunk()
}
fn advance(&mut self, mut cnt: usize) {
while cnt > 0 {
let len = if let Some(front) = self.head.front_mut() {
let len = front.len().min(cnt);
front.advance(len);
if front.is_empty() {
let _ = self.head.pop_front();
}
len
} else {
self.tail.advance(cnt);
cnt
};
cnt -= len;
self.len -= len;
}
self.check_consistency();
}
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
while let Some(mut front) = self.head.pop_front() {
if front.is_empty() {
continue;
}
self.len -= len;
if front.len() == len {
self.check_consistency();
return front.freeze();
}
let out = front.split_to(len);
self.head.push_front(front);
self.check_consistency();
return out.freeze();
}
let out = self.tail.split_to(len).freeze();
self.len -= len;
self.check_consistency();
out
}
}
#[derive(Debug, Default)]
pub struct WriteBuffer {
unfilled: BytesMut,
queue: VecDeque<Bytes>,
push_interest: bool,
}
impl WriteBuffer {
pub fn request_push(&mut self, max_queue_len: usize) -> bool {
let can_push = max_queue_len > self.queue.len();
self.push_interest |= !can_push;
can_push
}
pub fn push(&mut self, frame: Frame) {
let capacity = self.unfilled.capacity();
if capacity == 0 {
self.unfilled.reserve(CHUNK_CAPACITY);
} else if capacity < 32 {
self.unfilled = BytesMut::with_capacity(CHUNK_CAPACITY);
}
frame.write_header(&mut self.unfilled);
self.queue.push_back(self.unfilled.split().freeze());
if let Some(data) = frame.body() {
self.queue.push_back(data);
}
}
pub fn push_priority(&mut self, frame: Frame) {
let capacity = self.unfilled.capacity();
const CAPACITY: usize = 4096;
if capacity == 0 {
self.unfilled.reserve(CAPACITY);
} else if capacity < 32 {
self.unfilled = BytesMut::with_capacity(CAPACITY);
}
frame.write_header(&mut self.unfilled);
let header = self.unfilled.split().freeze();
// push the body first
if let Some(data) = frame.body() {
self.queue.push_front(data);
}
self.queue.push_front(header);
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn pop_front(&mut self) -> Option<Bytes> {
self.queue.pop_front()
}
pub fn push_front(&mut self, chunk: Bytes) {
self.queue.push_front(chunk);
}
pub fn notify(&mut self, cx: &mut Context) {
if self.push_interest {
self.push_interest = false;
cx.waker().wake_by_ref();
}
}
pub fn advance(&mut self, mut len: usize) {
while let Some(mut chunk) = self.queue.pop_front() {
let next_len = len.saturating_sub(chunk.len());
if next_len > 0 {
len = next_len;
continue;
}
if chunk.len() > len {
chunk.advance(len);
self.queue.push_front(chunk);
}
return;
}
}
pub fn chunks(&self) -> Vec<IoSlice> {
self.queue.iter().map(|v| IoSlice::new(v)).collect()
}
}