arrow-buffer/src/buffer/run.rs (126 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use crate::buffer::ScalarBuffer; use crate::ArrowNativeType; /// A slice-able buffer of monotonically increasing, positive integers used to store run-ends /// /// # Logical vs Physical /// /// A [`RunEndBuffer`] is used to encode runs of the same value, the index of each run is /// called the physical index. The logical index is then the corresponding index in the logical /// run-encoded array, i.e. a single run of length `3`, would have the logical indices `0..3`. /// /// Each value in [`RunEndBuffer::values`] is the cumulative length of all runs in the /// logical array, up to that physical index. /// /// Consider a [`RunEndBuffer`] containing `[3, 4, 6]`. The maximum physical index is `2`, /// as there are `3` values, and the maximum logical index is `5`, as the maximum run end /// is `6`. The physical indices are therefore `[0, 0, 0, 1, 2, 2]` /// /// ```text /// ┌─────────┐ ┌─────────┐ ┌─────────┐ /// │ 3 │ │ 0 │ ─┬──────▶ │ 0 │ /// ├─────────┤ ├─────────┤ │ ├─────────┤ /// │ 4 │ │ 1 │ ─┤ ┌────▶ │ 1 │ /// ├─────────┤ ├─────────┤ │ │ ├─────────┤ /// │ 6 │ │ 2 │ ─┘ │ ┌──▶ │ 2 │ /// └─────────┘ ├─────────┤ │ │ └─────────┘ /// run ends │ 3 │ ───┘ │ physical indices /// ├─────────┤ │ /// │ 4 │ ─────┤ /// ├─────────┤ │ /// │ 5 │ ─────┘ /// └─────────┘ /// logical indices /// ``` /// /// # Slicing /// /// In order to provide zero-copy slicing, this container stores a separate offset and length /// /// For example, a [`RunEndBuffer`] containing values `[3, 6, 8]` with offset and length `4` would /// describe the physical indices `1, 1, 2, 2` /// /// For example, a [`RunEndBuffer`] containing values `[6, 8, 9]` with offset `2` and length `5` /// would describe the physical indices `0, 0, 0, 0, 1` /// /// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout #[derive(Debug, Clone)] pub struct RunEndBuffer<E: ArrowNativeType> { run_ends: ScalarBuffer<E>, len: usize, offset: usize, } impl<E> RunEndBuffer<E> where E: ArrowNativeType, { /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], an `offset` and `len` /// /// # Panics /// /// - `buffer` does not contain strictly increasing values greater than zero /// - the last value of `buffer` is less than `offset + len` pub fn new(run_ends: ScalarBuffer<E>, offset: usize, len: usize) -> Self { assert!( run_ends.windows(2).all(|w| w[0] < w[1]), "run-ends not strictly increasing" ); if len != 0 { assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends"); let end = E::from_usize(offset.saturating_add(len)).unwrap(); assert!( *run_ends.first().unwrap() > E::usize_as(0), "run-ends not greater than 0" ); assert!( *run_ends.last().unwrap() >= end, "slice beyond bounds of run-ends" ); } Self { run_ends, offset, len, } } /// Create a new [`RunEndBuffer`] from an [`ScalarBuffer`], an `offset` and `len` /// /// # Safety /// /// - `buffer` must contain strictly increasing values greater than zero /// - The last value of `buffer` must be greater than or equal to `offset + len` pub unsafe fn new_unchecked( run_ends: ScalarBuffer<E>, offset: usize, len: usize, ) -> Self { Self { run_ends, offset, len, } } /// Returns the logical offset into the run-ends stored by this buffer #[inline] pub fn offset(&self) -> usize { self.offset } /// Returns the logical length of the run-ends stored by this buffer #[inline] pub fn len(&self) -> usize { self.len } /// Returns true if this buffer is empty #[inline] pub fn is_empty(&self) -> bool { self.len == 0 } /// Returns the values of this [`RunEndBuffer`] not including any offset #[inline] pub fn values(&self) -> &[E] { &self.run_ends } /// Returns the maximum run-end encoded in the underlying buffer #[inline] pub fn max_value(&self) -> usize { self.values().last().copied().unwrap_or_default().as_usize() } /// Performs a binary search to find the physical index for the given logical index /// /// The result is arbitrary if `logical_index >= self.len()` pub fn get_physical_index(&self, logical_index: usize) -> usize { let logical_index = E::usize_as(self.offset + logical_index); let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap(); match self.run_ends.binary_search_by(cmp) { Ok(idx) => idx + 1, Err(idx) => idx, } } /// Returns the physical index at which the logical array starts pub fn get_start_physical_index(&self) -> usize { if self.offset == 0 || self.len == 0 { return 0; } // Fallback to binary search self.get_physical_index(0) } /// Returns the physical index at which the logical array ends pub fn get_end_physical_index(&self) -> usize { if self.len == 0 { return 0; } if self.max_value() == self.offset + self.len { return self.values().len() - 1; } // Fallback to binary search self.get_physical_index(self.len - 1) } /// Slices this [`RunEndBuffer`] by the provided `offset` and `length` pub fn slice(&self, offset: usize, len: usize) -> Self { assert!( offset.saturating_add(len) <= self.len, "the length + offset of the sliced RunEndBuffer cannot exceed the existing length" ); Self { run_ends: self.run_ends.clone(), offset: self.offset + offset, len, } } /// Returns the inner [`ScalarBuffer`] pub fn inner(&self) -> &ScalarBuffer<E> { &self.run_ends } /// Returns the inner [`ScalarBuffer`], consuming self pub fn into_inner(self) -> ScalarBuffer<E> { self.run_ends } } #[cfg(test)] mod tests { use crate::buffer::RunEndBuffer; #[test] fn test_zero_length_slice() { let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4); assert_eq!(buffer.get_start_physical_index(), 0); assert_eq!(buffer.get_end_physical_index(), 1); assert_eq!(buffer.get_physical_index(3), 1); for offset in 0..4 { let sliced = buffer.slice(offset, 0); assert_eq!(sliced.get_start_physical_index(), 0); assert_eq!(sliced.get_end_physical_index(), 0); } let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0); assert_eq!(buffer.get_start_physical_index(), 0); assert_eq!(buffer.get_end_physical_index(), 0); } }