core/src/raw/oio/buf/flex_buf.rs (60 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
/// FlexBuf is a buffer that support frozen bytes and reuse existing allocated memory.
///
/// It's useful when we want to freeze the buffer and reuse the memory for the next buffer.
pub struct FlexBuf {
/// Already allocated memory size of `buf`.
cap: usize,
/// Already written bytes length inside `buf`.
len: usize,
buf: BytesMut,
frozen: Option<Bytes>,
}
impl FlexBuf {
/// Initializes a new `FlexBuf` with the given capacity.
pub fn new(cap: usize) -> Self {
FlexBuf {
cap,
len: 0,
buf: BytesMut::with_capacity(cap),
frozen: None,
}
}
/// Put slice into flex buf.
///
/// Return 0 means the buffer is frozen.
pub fn put(&mut self, bs: &[u8]) -> usize {
if self.frozen.is_some() {
return 0;
}
let n = (self.cap - self.len).min(bs.len());
self.buf.put_slice(&bs[..n]);
self.len += n;
if self.len >= self.cap {
let frozen = self.buf.split();
self.len = 0;
self.frozen = Some(frozen.freeze());
}
n
}
/// Freeze the buffer no matter it's full or not.
///
/// It's a no-op if the buffer has already been frozen.
pub fn freeze(&mut self) {
if self.len == 0 {
return;
}
let frozen = self.buf.split();
self.len = 0;
self.frozen = Some(frozen.freeze());
}
/// Get the frozen buffer.
///
/// Return `None` if the buffer is not frozen.
///
/// # Notes
///
/// This operation did nothing to the buffer. We use `&mut self` just for make
/// the API consistent with other APIs.
pub fn get(&mut self) -> Option<Bytes> {
self.frozen.clone()
}
// Advance the frozen buffer.
///
/// # Panics
///
/// Panic if the buffer is not frozen.
pub fn advance(&mut self, cnt: usize) {
debug_assert!(self.len == 0, "The buffer must be empty during advance");
let Some(bs) = self.frozen.as_mut() else {
unreachable!("It must be a bug to advance on not frozen buffer")
};
bs.advance(cnt);
if bs.is_empty() {
self.clean()
}
}
/// Cleanup the buffer, reset to the initial state.
#[inline]
pub fn clean(&mut self) {
self.frozen = None;
// This reserve cloud be cheap since we can reuse already allocated memory.
// (if all references to the frozen buffer are dropped)
self.buf.reserve(self.cap);
}
}