arrow-row/src/lib.rs (1,717 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! A comparable row-oriented representation of a collection of [`Array`]. //! //! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared], //! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort]. //! This makes the row format ideal for implementing efficient multi-column sorting, //! grouping, aggregation, windowing and more, as described in more detail //! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/). //! //! For example, given three input [`Array`], [`RowConverter`] creates byte //! sequences that [compare] the same as when using [`lexsort`]. //! //! ```text //! ┌─────┐ ┌─────┐ ┌─────┐ //! │ │ │ │ │ │ //! ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐ ┏━━━━━━━━━━━━━┓ //! │ │ │ │ │ │ ─────────────▶┃ ┃ //! ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘ ┗━━━━━━━━━━━━━┛ //! │ │ │ │ │ │ //! └─────┘ └─────┘ └─────┘ //! ... //! ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐ ┏━━━━━━━━┓ //! │ │ │ │ │ │ ─────────────▶┃ ┃ //! └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘ ┗━━━━━━━━┛ //! UInt64 Utf8 F64 //! //! Input Arrays Row Format //! (Columns) //! ``` //! //! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison //! to be meaningful._ //! //! # Basic Example //! ``` //! # use std::sync::Arc; //! # use arrow_row::{RowConverter, SortField}; //! # use arrow_array::{ArrayRef, Int32Array, StringArray}; //! # use arrow_array::cast::{AsArray, as_string_array}; //! # use arrow_array::types::Int32Type; //! # use arrow_schema::DataType; //! //! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef; //! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef; //! let arrays = vec![a1, a2]; //! //! // Convert arrays to rows //! let converter = RowConverter::new(vec![ //! SortField::new(DataType::Int32), //! SortField::new(DataType::Utf8), //! ]).unwrap(); //! let rows = converter.convert_columns(&arrays).unwrap(); //! //! // Compare rows //! for i in 0..4 { //! assert!(rows.row(i) <= rows.row(i + 1)); //! } //! assert_eq!(rows.row(3), rows.row(4)); //! //! // Convert rows back to arrays //! let converted = converter.convert_rows(&rows).unwrap(); //! assert_eq!(arrays, converted); //! //! // Compare rows from different arrays //! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef; //! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef; //! let arrays = vec![a1, a2]; //! let rows2 = converter.convert_columns(&arrays).unwrap(); //! //! assert!(rows.row(4) < rows2.row(0)); //! assert!(rows.row(4) < rows2.row(1)); //! //! // Convert selection of rows back to arrays //! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)]; //! let converted = converter.convert_rows(selection).unwrap(); //! let c1 = converted[0].as_primitive::<Int32Type>(); //! assert_eq!(c1.values(), &[-1, 4, 0, 3]); //! //! let c2 = converted[1].as_string::<i32>(); //! let c2_values: Vec<_> = c2.iter().flatten().collect(); //! assert_eq!(&c2_values, &["a", "f", "c", "e"]); //! ``` //! //! # Lexsort //! //! The row format can also be used to implement a fast multi-column / lexicographic sort //! //! ``` //! # use arrow_row::{RowConverter, SortField}; //! # use arrow_array::{ArrayRef, UInt32Array}; //! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array { //! let fields = arrays //! .iter() //! .map(|a| SortField::new(a.data_type().clone())) //! .collect(); //! let converter = RowConverter::new(fields).unwrap(); //! let rows = converter.convert_columns(arrays).unwrap(); //! let mut sort: Vec<_> = rows.iter().enumerate().collect(); //! sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); //! UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32)) //! } //! ``` //! //! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts //! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort //! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf //! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html //! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html //! [compared]: PartialOrd //! [compare]: PartialOrd #![doc( html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg", html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg" )] #![cfg_attr(docsrs, feature(doc_auto_cfg))] #![warn(missing_docs)] use std::cmp::Ordering; use std::hash::{Hash, Hasher}; use std::sync::Arc; use arrow_array::cast::*; use arrow_array::types::ArrowDictionaryKeyType; use arrow_array::*; use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::*; use variable::{decode_binary_view, decode_string_view}; use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive}; use crate::variable::{decode_binary, decode_string}; mod fixed; mod list; mod variable; /// Converts [`ArrayRef`] columns into a [row-oriented](self) format. /// /// *Note: The encoding of the row format may change from release to release.* /// /// ## Overview /// /// The row format is a variable length byte sequence created by /// concatenating the encoded form of each column. The encoding for /// each column depends on its datatype (and sort options). /// /// The encoding is carefully designed in such a way that escaping is /// unnecessary: it is never ambiguous as to whether a byte is part of /// a sentinel (e.g. null) or a value. /// /// ## Unsigned Integer Encoding /// /// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding /// to the integer's length. /// /// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the /// integer. /// /// ```text /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ /// 3 │03│00│00│00│ │01│00│00│00│03│ /// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ /// 258 │02│01│00│00│ │01│00│00│01│02│ /// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ /// 23423 │7F│5B│00│00│ │01│00│00│5B│7F│ /// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ /// NULL │??│??│??│??│ │00│00│00│00│00│ /// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ /// /// 32-bit (4 bytes) Row Format /// Value Little Endian /// ``` /// /// ## Signed Integer Encoding /// /// Signed integers have their most significant sign bit flipped, and are then encoded in the /// same manner as an unsigned integer. /// /// ```text /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ /// 5 │05│00│00│00│ │05│00│00│80│ │01│80│00│00│05│ /// └──┴──┴──┴──┘ └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ /// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ /// -5 │FB│FF│FF│FF│ │FB│FF│FF│7F│ │01│7F│FF│FF│FB│ /// └──┴──┴──┴──┘ └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ /// /// Value 32-bit (4 bytes) High bit flipped Row Format /// Little Endian /// ``` /// /// ## Float Encoding /// /// Floats are converted from IEEE 754 representation to a signed integer representation /// by flipping all bar the sign bit if they are negative. /// /// They are then encoded in the same manner as a signed integer. /// /// ## Fixed Length Bytes Encoding /// /// Fixed length bytes are encoded in the same fashion as primitive types above. /// /// For a fixed length array of length `n`: /// /// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes /// /// A valid value is encoded as `1_u8` followed by the value bytes /// /// ## Variable Length Bytes (including Strings) Encoding /// /// A null is encoded as a `0_u8`. /// /// An empty byte array is encoded as `1_u8`. /// /// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array /// encoded using a block based scheme described below. /// /// The byte array is broken up into fixed-width blocks, each block is written in turn /// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes /// with `0_u8` and written to the output, followed by the un-padded length in bytes /// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent /// blocks using a length of 32, this is to reduce space amplification for small strings. /// /// Note the following example encodings use a block size of 4 bytes for brevity: /// /// ```text /// ┌───┬───┬───┬───┬───┬───┐ /// "MEEP" │02 │'M'│'E'│'E'│'P'│04 │ /// └───┴───┴───┴───┴───┴───┘ /// /// ┌───┐ /// "" │01 | /// └───┘ /// /// NULL ┌───┐ /// │00 │ /// └───┘ /// /// "Defenestration" ┌───┬───┬───┬───┬───┬───┐ /// │02 │'D'│'e'│'f'│'e'│FF │ /// └───┼───┼───┼───┼───┼───┤ /// │'n'│'e'│'s'│'t'│FF │ /// ├───┼───┼───┼───┼───┤ /// │'r'│'a'│'t'│'r'│FF │ /// ├───┼───┼───┼───┼───┤ /// │'a'│'t'│'i'│'o'│FF │ /// ├───┼───┼───┼───┼───┤ /// │'n'│00 │00 │00 │01 │ /// └───┴───┴───┴───┴───┘ /// ``` /// /// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional /// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. /// /// ## Dictionary Encoding /// /// Dictionaries are hydrated to their underlying values /// /// ## Struct Encoding /// /// A null is encoded as a `0_u8`. /// /// A valid value is encoded as `1_u8` followed by the row encoding of each child. /// /// This encoding effectively flattens the schema in a depth-first fashion. /// /// For example /// /// ```text /// ┌───────┬────────────────────────┬───────┐ /// │ Int32 │ Struct[Int32, Float32] │ Int32 │ /// └───────┴────────────────────────┴───────┘ /// ``` /// /// Is encoded as /// /// ```text /// ┌───────┬───────────────┬───────┬─────────┬───────┐ /// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │ /// └───────┴───────────────┴───────┴─────────┴───────┘ /// ``` /// /// ## List Encoding /// /// Lists are encoded by first encoding all child elements to the row format. /// /// A list value is then encoded as the concatenation of each of the child elements, /// separately encoded using the variable length encoding described above, followed /// by the variable length encoding of an empty byte array. /// /// For example given: /// /// ```text /// [1_u8, 2_u8, 3_u8] /// [1_u8, null] /// [] /// null /// ``` /// /// The elements would be converted to: /// /// ```text /// ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ /// 1 │01│01│ 2 │01│02│ 3 │01│03│ 1 │01│01│ null │00│00│ /// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ ///``` /// /// Which would be encoded as /// /// ```text /// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ /// [1_u8, 2_u8, 3_u8] │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│ /// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ /// └──── 1_u8 ────┘ └──── 2_u8 ────┘ └──── 3_u8 ────┘ /// /// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ /// [1_u8, null] │02│01│01│00│00│02│02│00│00│00│00│02│01│ /// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ /// └──── 1_u8 ────┘ └──── null ────┘ /// ///``` /// /// With `[]` represented by an empty byte array, and `null` a null byte array. /// /// # Ordering /// /// ## Float Ordering /// /// Floats are totally ordered in accordance to the `totalOrder` predicate as defined /// in the IEEE 754 (2008 revision) floating point standard. /// /// The ordering established by this does not always agree with the /// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, /// they consider negative and positive zero equal, while this does not /// /// ## Null Ordering /// /// The encoding described above will order nulls first, this can be inverted by representing /// nulls as `0xFF_u8` instead of `0_u8` /// /// ## Reverse Column Ordering /// /// The order of a given column can be reversed by negating the encoded bytes of non-null values /// /// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing /// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing #[derive(Debug)] pub struct RowConverter { fields: Arc<[SortField]>, /// State for codecs codecs: Vec<Codec>, } #[derive(Debug)] enum Codec { /// No additional codec state is necessary Stateless, /// A row converter for the dictionary values /// and the encoding of a row containing only nulls Dictionary(RowConverter, OwnedRow), /// A row converter for the child fields /// and the encoding of a row containing only nulls Struct(RowConverter, OwnedRow), /// A row converter for the child field List(RowConverter), } impl Codec { fn new(sort_field: &SortField) -> Result<Self, ArrowError> { match &sort_field.data_type { DataType::Dictionary(_, values) => { let sort_field = SortField::new_with_options(values.as_ref().clone(), sort_field.options); let converter = RowConverter::new(vec![sort_field])?; let null_array = new_null_array(values.as_ref(), 1); let nulls = converter.convert_columns(&[null_array])?; let owned = OwnedRow { data: nulls.buffer.into(), config: nulls.config, }; Ok(Self::Dictionary(converter, owned)) } d if !d.is_nested() => Ok(Self::Stateless), DataType::List(f) | DataType::LargeList(f) => { // The encoded contents will be inverted if descending is set to true // As such we set `descending` to false and negate nulls first if it // it set to true let options = SortOptions { descending: false, nulls_first: sort_field.options.nulls_first != sort_field.options.descending, }; let field = SortField::new_with_options(f.data_type().clone(), options); let converter = RowConverter::new(vec![field])?; Ok(Self::List(converter)) } DataType::Struct(f) => { let sort_fields = f .iter() .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options)) .collect(); let converter = RowConverter::new(sort_fields)?; let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect(); let nulls = converter.convert_columns(&nulls)?; let owned = OwnedRow { data: nulls.buffer.into(), config: nulls.config, }; Ok(Self::Struct(converter, owned)) } _ => Err(ArrowError::NotYetImplemented(format!( "not yet implemented: {:?}", sort_field.data_type ))), } } fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> { match self { Codec::Stateless => Ok(Encoder::Stateless), Codec::Dictionary(converter, nulls) => { let values = array.as_any_dictionary().values().clone(); let rows = converter.convert_columns(&[values])?; Ok(Encoder::Dictionary(rows, nulls.row())) } Codec::Struct(converter, null) => { let v = as_struct_array(array); let rows = converter.convert_columns(v.columns())?; Ok(Encoder::Struct(rows, null.row())) } Codec::List(converter) => { let values = match array.data_type() { DataType::List(_) => as_list_array(array).values(), DataType::LargeList(_) => as_large_list_array(array).values(), _ => unreachable!(), }; let rows = converter.convert_columns(&[values.clone()])?; Ok(Encoder::List(rows)) } } } fn size(&self) -> usize { match self { Codec::Stateless => 0, Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(), Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(), Codec::List(converter) => converter.size(), } } } #[derive(Debug)] enum Encoder<'a> { /// No additional encoder state is necessary Stateless, /// The encoding of the child array and the encoding of a null row Dictionary(Rows, Row<'a>), /// The row encoding of the child arrays and the encoding of a null row /// /// It is necessary to encode to a temporary [`Rows`] to avoid serializing /// values that are masked by a null in the parent StructArray, otherwise /// this would establish an ordering between semantically null values Struct(Rows, Row<'a>), /// The row encoding of the child array List(Rows), } /// Configure the data type and sort order for a given column #[derive(Debug, Clone, PartialEq, Eq)] pub struct SortField { /// Sort options options: SortOptions, /// Data type data_type: DataType, } impl SortField { /// Create a new column with the given data type pub fn new(data_type: DataType) -> Self { Self::new_with_options(data_type, Default::default()) } /// Create a new column with the given data type and [`SortOptions`] pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self { Self { options, data_type } } /// Return size of this instance in bytes. /// /// Includes the size of `Self`. pub fn size(&self) -> usize { self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>() } } impl RowConverter { /// Create a new [`RowConverter`] with the provided schema pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> { if !Self::supports_fields(&fields) { return Err(ArrowError::NotYetImplemented(format!( "Row format support not yet implemented for: {fields:?}" ))); } let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?; Ok(Self { fields: fields.into(), codecs, }) } /// Check if the given fields are supported by the row format. pub fn supports_fields(fields: &[SortField]) -> bool { fields.iter().all(|x| Self::supports_datatype(&x.data_type)) } fn supports_datatype(d: &DataType) -> bool { match d { _ if !d.is_nested() => true, DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => { Self::supports_datatype(f.data_type()) } DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())), _ => false, } } /// Convert [`ArrayRef`] columns into [`Rows`] /// /// See [`Row`] for information on when [`Row`] can be compared /// /// # Panics /// /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`] pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> { let num_rows = columns.first().map(|x| x.len()).unwrap_or(0); let mut rows = self.empty_rows(num_rows, 0); self.append(&mut rows, columns)?; Ok(rows) } /// Convert [`ArrayRef`] columns appending to an existing [`Rows`] /// /// See [`Row`] for information on when [`Row`] can be compared /// /// # Panics /// /// Panics if /// * The schema of `columns` does not match that provided to [`RowConverter::new`] /// * The provided [`Rows`] were not created by this [`RowConverter`] /// /// ``` /// # use std::sync::Arc; /// # use std::collections::HashSet; /// # use arrow_array::cast::AsArray; /// # use arrow_array::StringArray; /// # use arrow_row::{Row, RowConverter, SortField}; /// # use arrow_schema::DataType; /// # /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); /// let a1 = StringArray::from(vec!["hello", "world"]); /// let a2 = StringArray::from(vec!["a", "a", "hello"]); /// /// let mut rows = converter.empty_rows(5, 128); /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap(); /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap(); /// /// let back = converter.convert_rows(&rows).unwrap(); /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect(); /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]); /// ``` pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> { assert!( Arc::ptr_eq(&rows.config.fields, &self.fields), "rows were not produced by this RowConverter" ); if columns.len() != self.fields.len() { return Err(ArrowError::InvalidArgumentError(format!( "Incorrect number of arrays provided to RowConverter, expected {} got {}", self.fields.len(), columns.len() ))); } let encoders = columns .iter() .zip(&self.codecs) .zip(self.fields.iter()) .map(|((column, codec), field)| { if !column.data_type().equals_datatype(&field.data_type) { return Err(ArrowError::InvalidArgumentError(format!( "RowConverter column schema mismatch, expected {} got {}", field.data_type, column.data_type() ))); } codec.encoder(column.as_ref()) }) .collect::<Result<Vec<_>, _>>()?; let write_offset = rows.num_rows(); let lengths = row_lengths(columns, &encoders); // We initialize the offsets shifted down by one row index. // // As the rows are appended to the offsets will be incremented to match // // For example, consider the case of 3 rows of length 3, 4, and 6 respectively. // The offsets would be initialized to `0, 0, 3, 7` // // Writing the first row entirely would yield `0, 3, 3, 7` // The second, `0, 3, 7, 7` // The third, `0, 3, 7, 13` // // This would be the final offsets for reading // // In this way offsets tracks the position during writing whilst eventually serving // as identifying the offsets of the written rows rows.offsets.reserve(lengths.len()); let mut cur_offset = rows.offsets[write_offset]; for l in lengths { rows.offsets.push(cur_offset); cur_offset = cur_offset.checked_add(l).expect("overflow"); } // Note this will not zero out any trailing data in `rows.buffer`, // e.g. resulting from a call to `Rows::clear`, relying instead on the // encoders not assuming a zero-initialized buffer rows.buffer.resize(cur_offset, 0); for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) { // We encode a column at a time to minimise dispatch overheads encode_column( &mut rows.buffer, &mut rows.offsets[write_offset..], column.as_ref(), field.options, &encoder, ) } if cfg!(debug_assertions) { assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len()); rows.offsets .windows(2) .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic")); } Ok(()) } /// Convert [`Rows`] columns into [`ArrayRef`] /// /// # Panics /// /// Panics if the rows were not produced by this [`RowConverter`] pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError> where I: IntoIterator<Item = Row<'a>>, { let mut validate_utf8 = false; let mut rows: Vec<_> = rows .into_iter() .map(|row| { assert!( Arc::ptr_eq(&row.config.fields, &self.fields), "rows were not produced by this RowConverter" ); validate_utf8 |= row.config.validate_utf8; row.data }) .collect(); // SAFETY // We have validated that the rows came from this [`RowConverter`] // and therefore must be valid unsafe { self.convert_raw(&mut rows, validate_utf8) } } /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with /// a total length of `data_capacity` /// /// This can be used to buffer a selection of [`Row`] /// /// ``` /// # use std::sync::Arc; /// # use std::collections::HashSet; /// # use arrow_array::cast::AsArray; /// # use arrow_array::StringArray; /// # use arrow_row::{Row, RowConverter, SortField}; /// # use arrow_schema::DataType; /// # /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); /// /// // Convert to row format and deduplicate /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap(); /// let mut distinct_rows = converter.empty_rows(3, 100); /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3); /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row)); /// /// // Note: we could skip buffering and feed the filtered iterator directly /// // into convert_rows, this is done for demonstration purposes only /// let distinct = converter.convert_rows(&distinct_rows).unwrap(); /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect(); /// assert_eq!(&values, &["hello", "world", "a"]); /// ``` pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows { let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1)); offsets.push(0); Rows { offsets, buffer: Vec::with_capacity(data_capacity), config: RowConfig { fields: self.fields.clone(), validate_utf8: false, }, } } /// Create a new [Rows] instance from the given binary data. /// /// ``` /// # use std::sync::Arc; /// # use std::collections::HashSet; /// # use arrow_array::cast::AsArray; /// # use arrow_array::StringArray; /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField}; /// # use arrow_schema::DataType; /// # /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap(); /// /// // We can convert rows into binary format and back in batch. /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect(); /// let binary = rows.try_into_binary().expect("known-small array"); /// let converted = converter.from_binary(binary.clone()); /// assert!(converted.iter().eq(values.iter().map(|r| r.row()))); /// ``` /// /// # Panics /// /// This function expects the passed [BinaryArray] to contain valid row data as produced by this /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may /// panic if the data is malformed. pub fn from_binary(&self, array: BinaryArray) -> Rows { assert_eq!( array.null_count(), 0, "can't construct Rows instance from array with nulls" ); Rows { buffer: array.values().to_vec(), offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(), config: RowConfig { fields: Arc::clone(&self.fields), validate_utf8: true, }, } } /// Convert raw bytes into [`ArrayRef`] /// /// # Safety /// /// `rows` must contain valid data for this [`RowConverter`] unsafe fn convert_raw( &self, rows: &mut [&[u8]], validate_utf8: bool, ) -> Result<Vec<ArrayRef>, ArrowError> { self.fields .iter() .zip(&self.codecs) .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8)) .collect() } /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes pub fn parser(&self) -> RowParser { RowParser::new(Arc::clone(&self.fields)) } /// Returns the size of this instance in bytes /// /// Includes the size of `Self`. pub fn size(&self) -> usize { std::mem::size_of::<Self>() + self.fields.iter().map(|x| x.size()).sum::<usize>() + self.codecs.capacity() * std::mem::size_of::<Codec>() + self.codecs.iter().map(Codec::size).sum::<usize>() } } /// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`] #[derive(Debug)] pub struct RowParser { config: RowConfig, } impl RowParser { fn new(fields: Arc<[SortField]>) -> Self { Self { config: RowConfig { fields, validate_utf8: true, }, } } /// Creates a [`Row`] from the provided `bytes`. /// /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> { Row { data: bytes, config: &self.config, } } } /// The config of a given set of [`Row`] #[derive(Debug, Clone)] struct RowConfig { /// The schema for these rows fields: Arc<[SortField]>, /// Whether to run UTF-8 validation when converting to arrow arrays validate_utf8: bool, } /// A row-oriented representation of arrow data, that is normalized for comparison. /// /// See the [module level documentation](self) and [`RowConverter`] for more details. #[derive(Debug)] pub struct Rows { /// Underlying row bytes buffer: Vec<u8>, /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]` offsets: Vec<usize>, /// The config for these rows config: RowConfig, } impl Rows { /// Append a [`Row`] to this [`Rows`] pub fn push(&mut self, row: Row<'_>) { assert!( Arc::ptr_eq(&row.config.fields, &self.config.fields), "row was not produced by this RowConverter" ); self.config.validate_utf8 |= row.config.validate_utf8; self.buffer.extend_from_slice(row.data); self.offsets.push(self.buffer.len()) } /// Returns the row at index `row` pub fn row(&self, row: usize) -> Row<'_> { assert!(row + 1 < self.offsets.len()); unsafe { self.row_unchecked(row) } } /// Returns the row at `index` without bounds checking /// /// # Safety /// Caller must ensure that `index` is less than the number of offsets (#rows + 1) pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> { let end = unsafe { self.offsets.get_unchecked(index + 1) }; let start = unsafe { self.offsets.get_unchecked(index) }; let data = unsafe { self.buffer.get_unchecked(*start..*end) }; Row { data, config: &self.config, } } /// Sets the length of this [`Rows`] to 0 pub fn clear(&mut self) { self.offsets.truncate(1); self.buffer.clear(); } /// Returns the number of [`Row`] in this [`Rows`] pub fn num_rows(&self) -> usize { self.offsets.len() - 1 } /// Returns an iterator over the [`Row`] in this [`Rows`] pub fn iter(&self) -> RowsIter<'_> { self.into_iter() } /// Returns the size of this instance in bytes /// /// Includes the size of `Self`. pub fn size(&self) -> usize { // Size of fields is accounted for as part of RowConverter std::mem::size_of::<Self>() + self.buffer.len() + self.offsets.len() * std::mem::size_of::<usize>() } /// Create a [BinaryArray] from the [Rows] data without reallocating the /// underlying bytes. /// /// /// ``` /// # use std::sync::Arc; /// # use std::collections::HashSet; /// # use arrow_array::cast::AsArray; /// # use arrow_array::StringArray; /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField}; /// # use arrow_schema::DataType; /// # /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap(); /// /// // We can convert rows into binary format and back. /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect(); /// let binary = rows.try_into_binary().expect("known-small array"); /// let parser = converter.parser(); /// let parsed: Vec<OwnedRow> = /// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect(); /// assert_eq!(values, parsed); /// ``` /// /// # Errors /// /// This function will return an error if there is more data than can be stored in /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB. pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> { if self.buffer.len() > i32::MAX as usize { return Err(ArrowError::InvalidArgumentError(format!( "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray", self.buffer.len() ))); } // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well. let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as)); // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer. let array = unsafe { BinaryArray::new_unchecked( OffsetBuffer::new_unchecked(offsets_scalar), Buffer::from_vec(self.buffer), None, ) }; Ok(array) } } impl<'a> IntoIterator for &'a Rows { type Item = Row<'a>; type IntoIter = RowsIter<'a>; fn into_iter(self) -> Self::IntoIter { RowsIter { rows: self, start: 0, end: self.num_rows(), } } } /// An iterator over [`Rows`] #[derive(Debug)] pub struct RowsIter<'a> { rows: &'a Rows, start: usize, end: usize, } impl<'a> Iterator for RowsIter<'a> { type Item = Row<'a>; fn next(&mut self) -> Option<Self::Item> { if self.end == self.start { return None; } // SAFETY: We have checked that `start` is less than `end` let row = unsafe { self.rows.row_unchecked(self.start) }; self.start += 1; Some(row) } fn size_hint(&self) -> (usize, Option<usize>) { let len = self.len(); (len, Some(len)) } } impl ExactSizeIterator for RowsIter<'_> { fn len(&self) -> usize { self.end - self.start } } impl DoubleEndedIterator for RowsIter<'_> { fn next_back(&mut self) -> Option<Self::Item> { if self.end == self.start { return None; } // Safety: We have checked that `start` is less than `end` let row = unsafe { self.rows.row_unchecked(self.end) }; self.end -= 1; Some(row) } } /// A comparable representation of a row. /// /// See the [module level documentation](self) for more details. /// /// Two [`Row`] can only be compared if they both belong to [`Rows`] /// returned by calls to [`RowConverter::convert_columns`] on the same /// [`RowConverter`]. If different [`RowConverter`]s are used, any /// ordering established by comparing the [`Row`] is arbitrary. #[derive(Debug, Copy, Clone)] pub struct Row<'a> { data: &'a [u8], config: &'a RowConfig, } impl<'a> Row<'a> { /// Create owned version of the row to detach it from the shared [`Rows`]. pub fn owned(&self) -> OwnedRow { OwnedRow { data: self.data.into(), config: self.config.clone(), } } /// The row's bytes, with the lifetime of the underlying data. pub fn data(&self) -> &'a [u8] { self.data } } // Manually derive these as don't wish to include `fields` impl PartialEq for Row<'_> { #[inline] fn eq(&self, other: &Self) -> bool { self.data.eq(other.data) } } impl Eq for Row<'_> {} impl PartialOrd for Row<'_> { #[inline] fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) } } impl Ord for Row<'_> { #[inline] fn cmp(&self, other: &Self) -> Ordering { self.data.cmp(other.data) } } impl Hash for Row<'_> { #[inline] fn hash<H: Hasher>(&self, state: &mut H) { self.data.hash(state) } } impl AsRef<[u8]> for Row<'_> { #[inline] fn as_ref(&self) -> &[u8] { self.data } } /// Owned version of a [`Row`] that can be moved/cloned freely. /// /// This contains the data for the one specific row (not the entire buffer of all rows). #[derive(Debug, Clone)] pub struct OwnedRow { data: Box<[u8]>, config: RowConfig, } impl OwnedRow { /// Get borrowed [`Row`] from owned version. /// /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`]. pub fn row(&self) -> Row<'_> { Row { data: &self.data, config: &self.config, } } } // Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here. impl PartialEq for OwnedRow { #[inline] fn eq(&self, other: &Self) -> bool { self.row().eq(&other.row()) } } impl Eq for OwnedRow {} impl PartialOrd for OwnedRow { #[inline] fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) } } impl Ord for OwnedRow { #[inline] fn cmp(&self, other: &Self) -> Ordering { self.row().cmp(&other.row()) } } impl Hash for OwnedRow { #[inline] fn hash<H: Hasher>(&self, state: &mut H) { self.row().hash(state) } } impl AsRef<[u8]> for OwnedRow { #[inline] fn as_ref(&self) -> &[u8] { &self.data } } /// Returns the null sentinel, negated if `invert` is true #[inline] fn null_sentinel(options: SortOptions) -> u8 { match options.nulls_first { true => 0, false => 0xFF, } } /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> { use fixed::FixedLengthEncoding; let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); let mut lengths = vec![0; num_rows]; for (array, encoder) in cols.iter().zip(encoders) { match encoder { Encoder::Stateless => { downcast_primitive_array! { array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), DataType::Null => {}, DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN), DataType::Binary => as_generic_binary_array::<i32>(array) .iter() .zip(lengths.iter_mut()) .for_each(|(slice, length)| *length += variable::encoded_len(slice)), DataType::LargeBinary => as_generic_binary_array::<i64>(array) .iter() .zip(lengths.iter_mut()) .for_each(|(slice, length)| *length += variable::encoded_len(slice)), DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| { *length += variable::encoded_len(slice) }), DataType::Utf8 => array.as_string::<i32>() .iter() .zip(lengths.iter_mut()) .for_each(|(slice, length)| { *length += variable::encoded_len(slice.map(|x| x.as_bytes())) }), DataType::LargeUtf8 => array.as_string::<i64>() .iter() .zip(lengths.iter_mut()) .for_each(|(slice, length)| { *length += variable::encoded_len(slice.map(|x| x.as_bytes())) }), DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| { *length += variable::encoded_len(slice.map(|x| x.as_bytes())) }), DataType::FixedSizeBinary(len) => { let len = len.to_usize().unwrap(); lengths.iter_mut().for_each(|x| *x += 1 + len) } _ => unimplemented!("unsupported data type: {}", array.data_type()), } } Encoder::Dictionary(values, null) => { downcast_dictionary_array! { array => { for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { *length += match v { Some(k) => values.row(k.as_usize()).data.len(), None => null.data.len(), } } } _ => unreachable!(), } } Encoder::Struct(rows, null) => { let array = as_struct_array(array); lengths.iter_mut().enumerate().for_each(|(idx, length)| { match array.is_valid(idx) { true => *length += 1 + rows.row(idx).as_ref().len(), false => *length += 1 + null.data.len(), } }); } Encoder::List(rows) => match array.data_type() { DataType::List(_) => { list::compute_lengths(&mut lengths, rows, as_list_array(array)) } DataType::LargeList(_) => { list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) } _ => unreachable!(), }, } } lengths } /// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses fn encode_column( data: &mut [u8], offsets: &mut [usize], column: &dyn Array, opts: SortOptions, encoder: &Encoder<'_>, ) { match encoder { Encoder::Stateless => { downcast_primitive_array! { column => { if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){ fixed::encode(data, offsets, column.values(), nulls, opts) } else { fixed::encode_not_null(data, offsets, column.values(), opts) } } DataType::Null => {} DataType::Boolean => { if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){ fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts) } else { fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts) } } DataType::Binary => { variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts) } DataType::BinaryView => { variable::encode(data, offsets, column.as_binary_view().iter(), opts) } DataType::LargeBinary => { variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts) } DataType::Utf8 => variable::encode( data, offsets, column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())), opts, ), DataType::LargeUtf8 => variable::encode( data, offsets, column.as_string::<i64>() .iter() .map(|x| x.map(|x| x.as_bytes())), opts, ), DataType::Utf8View => variable::encode( data, offsets, column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())), opts, ), DataType::FixedSizeBinary(_) => { let array = column.as_any().downcast_ref().unwrap(); fixed::encode_fixed_size_binary(data, offsets, array, opts) } _ => unimplemented!("unsupported data type: {}", column.data_type()), } } Encoder::Dictionary(values, nulls) => { downcast_dictionary_array! { column => encode_dictionary_values(data, offsets, column, values, nulls), _ => unreachable!() } } Encoder::Struct(rows, null) => { let array = as_struct_array(column); let null_sentinel = null_sentinel(opts); offsets .iter_mut() .skip(1) .enumerate() .for_each(|(idx, offset)| { let (row, sentinel) = match array.is_valid(idx) { true => (rows.row(idx), 0x01), false => (*null, null_sentinel), }; let end_offset = *offset + 1 + row.as_ref().len(); data[*offset] = sentinel; data[*offset + 1..end_offset].copy_from_slice(row.as_ref()); *offset = end_offset; }) } Encoder::List(rows) => match column.data_type() { DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)), DataType::LargeList(_) => { list::encode(data, offsets, rows, opts, as_large_list_array(column)) } _ => unreachable!(), }, } } /// Encode dictionary values not preserving the dictionary encoding pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>( data: &mut [u8], offsets: &mut [usize], column: &DictionaryArray<K>, values: &Rows, null: &Row<'_>, ) { for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) { let row = match k { Some(k) => values.row(k.as_usize()).data, None => null.data, }; let end_offset = *offset + row.len(); data[*offset..end_offset].copy_from_slice(row); *offset = end_offset; } } macro_rules! decode_primitive_helper { ($t:ty, $rows:ident, $data_type:ident, $options:ident) => { Arc::new(decode_primitive::<$t>($rows, $data_type, $options)) }; } /// Decodes a the provided `field` from `rows` /// /// # Safety /// /// Rows must contain valid data for the provided field unsafe fn decode_column( field: &SortField, rows: &mut [&[u8]], codec: &Codec, validate_utf8: bool, ) -> Result<ArrayRef, ArrowError> { let options = field.options; let array: ArrayRef = match codec { Codec::Stateless => { let data_type = field.data_type.clone(); downcast_primitive! { data_type => (decode_primitive_helper, rows, data_type, options), DataType::Null => Arc::new(NullArray::new(rows.len())), DataType::Boolean => Arc::new(decode_bool(rows, options)), DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)), DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)), DataType::BinaryView => Arc::new(decode_binary_view(rows, options)), DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)), DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)), DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)), DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)), _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type))) } } Codec::Dictionary(converter, _) => { let cols = converter.convert_raw(rows, validate_utf8)?; cols.into_iter().next().unwrap() } Codec::Struct(converter, _) => { let (null_count, nulls) = fixed::decode_nulls(rows); rows.iter_mut().for_each(|row| *row = &row[1..]); let children = converter.convert_raw(rows, validate_utf8)?; let child_data = children.iter().map(|c| c.to_data()).collect(); let builder = ArrayDataBuilder::new(field.data_type.clone()) .len(rows.len()) .null_count(null_count) .null_bit_buffer(Some(nulls)) .child_data(child_data); Arc::new(StructArray::from(builder.build_unchecked())) } Codec::List(converter) => match &field.data_type { DataType::List(_) => { Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?) } DataType::LargeList(_) => { Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?) } _ => unreachable!(), }, }; Ok(array) } #[cfg(test)] mod tests { use rand::distr::uniform::SampleUniform; use rand::distr::{Distribution, StandardUniform}; use rand::{rng, Rng}; use arrow_array::builder::*; use arrow_array::types::*; use arrow_array::*; use arrow_buffer::{i256, NullBuffer}; use arrow_buffer::{Buffer, OffsetBuffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; use arrow_ord::sort::{LexicographicalComparator, SortColumn}; use super::*; #[test] fn test_fixed_width() { let cols = [ Arc::new(Int16Array::from_iter([ Some(1), Some(2), None, Some(-5), Some(2), Some(2), Some(0), ])) as ArrayRef, Arc::new(Float32Array::from_iter([ Some(1.3), Some(2.5), None, Some(4.), Some(0.1), Some(-4.), Some(-0.), ])) as ArrayRef, ]; let converter = RowConverter::new(vec![ SortField::new(DataType::Int16), SortField::new(DataType::Float32), ]) .unwrap(); let rows = converter.convert_columns(&cols).unwrap(); assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]); assert_eq!( rows.buffer, &[ 1, 128, 1, // 1, 191, 166, 102, 102, // 1, 128, 2, // 1, 192, 32, 0, 0, // 0, 0, 0, // 0, 0, 0, 0, 0, // 1, 127, 251, // 1, 192, 128, 0, 0, // 1, 128, 2, // 1, 189, 204, 204, 205, // 1, 128, 2, // 1, 63, 127, 255, 255, // 1, 128, 0, // 1, 127, 255, 255, 255 // ] ); assert!(rows.row(3) < rows.row(6)); assert!(rows.row(0) < rows.row(1)); assert!(rows.row(3) < rows.row(0)); assert!(rows.row(4) < rows.row(1)); assert!(rows.row(5) < rows.row(4)); let back = converter.convert_rows(&rows).unwrap(); for (expected, actual) in cols.iter().zip(&back) { assert_eq!(expected, actual); } } #[test] fn test_decimal128() { let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128( DECIMAL128_MAX_PRECISION, 7, ))]) .unwrap(); let col = Arc::new( Decimal128Array::from_iter([ None, Some(i128::MIN), Some(-13), Some(46_i128), Some(5456_i128), Some(i128::MAX), ]) .with_precision_and_scale(38, 7) .unwrap(), ) as ArrayRef; let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); for i in 0..rows.num_rows() - 1 { assert!(rows.row(i) < rows.row(i + 1)); } let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); assert_eq!(col.as_ref(), back[0].as_ref()) } #[test] fn test_decimal256() { let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256( DECIMAL256_MAX_PRECISION, 7, ))]) .unwrap(); let col = Arc::new( Decimal256Array::from_iter([ None, Some(i256::MIN), Some(i256::from_parts(0, -1)), Some(i256::from_parts(u128::MAX, -1)), Some(i256::from_parts(u128::MAX, 0)), Some(i256::from_parts(0, 46_i128)), Some(i256::from_parts(5, 46_i128)), Some(i256::MAX), ]) .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7) .unwrap(), ) as ArrayRef; let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); for i in 0..rows.num_rows() - 1 { assert!(rows.row(i) < rows.row(i + 1)); } let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); assert_eq!(col.as_ref(), back[0].as_ref()) } #[test] fn test_bool() { let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap(); let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef; let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(2) > rows.row(0)); assert!(rows.row(1) > rows.row(0)); let cols = converter.convert_rows(&rows).unwrap(); assert_eq!(&cols[0], &col); let converter = RowConverter::new(vec![SortField::new_with_options( DataType::Boolean, SortOptions::default().desc().with_nulls_first(false), )]) .unwrap(); let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(2) < rows.row(0)); assert!(rows.row(1) < rows.row(0)); let cols = converter.convert_rows(&rows).unwrap(); assert_eq!(&cols[0], &col); } #[test] fn test_timezone() { let a = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string()); let d = a.data_type().clone(); let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap(); let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); assert_eq!(back[0].data_type(), &d); // Test dictionary let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new(); a.append(34).unwrap(); a.append_null(); a.append(345).unwrap(); // Construct dictionary with a timezone let dict = a.finish(); let values = TimestampNanosecondArray::from(dict.values().to_data()); let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00"))); let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into())); let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone())); assert_eq!(dict_with_tz.data_type(), &d); let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); let rows = converter .convert_columns(&[Arc::new(dict_with_tz) as _]) .unwrap(); let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); assert_eq!(back[0].data_type(), &v); } #[test] fn test_null_encoding() { let col = Arc::new(NullArray::new(10)); let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap(); let rows = converter.convert_columns(&[col]).unwrap(); assert_eq!(rows.num_rows(), 10); assert_eq!(rows.row(1).data.len(), 0); } #[test] fn test_variable_width() { let col = Arc::new(StringArray::from_iter([ Some("hello"), Some("he"), None, Some("foo"), Some(""), ])) as ArrayRef; let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); assert!(rows.row(1) < rows.row(0)); assert!(rows.row(2) < rows.row(4)); assert!(rows.row(3) < rows.row(0)); assert!(rows.row(3) < rows.row(1)); let cols = converter.convert_rows(&rows).unwrap(); assert_eq!(&cols[0], &col); let col = Arc::new(BinaryArray::from_iter([ None, Some(vec![0_u8; 0]), Some(vec![0_u8; 6]), Some(vec![0_u8; variable::MINI_BLOCK_SIZE]), Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]), Some(vec![0_u8; variable::BLOCK_SIZE]), Some(vec![0_u8; variable::BLOCK_SIZE + 1]), Some(vec![1_u8; 6]), Some(vec![1_u8; variable::MINI_BLOCK_SIZE]), Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]), Some(vec![1_u8; variable::BLOCK_SIZE]), Some(vec![1_u8; variable::BLOCK_SIZE + 1]), Some(vec![0xFF_u8; 6]), Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]), Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]), Some(vec![0xFF_u8; variable::BLOCK_SIZE]), Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]), ])) as ArrayRef; let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); for i in 0..rows.num_rows() { for j in i + 1..rows.num_rows() { assert!( rows.row(i) < rows.row(j), "{} < {} - {:?} < {:?}", i, j, rows.row(i), rows.row(j) ); } } let cols = converter.convert_rows(&rows).unwrap(); assert_eq!(&cols[0], &col); let converter = RowConverter::new(vec![SortField::new_with_options( DataType::Binary, SortOptions::default().desc().with_nulls_first(false), )]) .unwrap(); let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); for i in 0..rows.num_rows() { for j in i + 1..rows.num_rows() { assert!( rows.row(i) > rows.row(j), "{} > {} - {:?} > {:?}", i, j, rows.row(i), rows.row(j) ); } } let cols = converter.convert_rows(&rows).unwrap(); assert_eq!(&cols[0], &col); } /// If `exact` is false performs a logical comparison between a and dictionary-encoded b fn dictionary_eq(a: &dyn Array, b: &dyn Array) { match b.data_type() { DataType::Dictionary(_, v) => { assert_eq!(a.data_type(), v.as_ref()); let b = arrow_cast::cast(b, v).unwrap(); assert_eq!(a, b.as_ref()) } _ => assert_eq!(a, b), } } #[test] fn test_string_dictionary() { let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([ Some("foo"), Some("hello"), Some("he"), None, Some("hello"), Some(""), Some("hello"), Some("hello"), ])) as ArrayRef; let field = SortField::new(a.data_type().clone()); let converter = RowConverter::new(vec![field]).unwrap(); let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); assert!(rows_a.row(3) < rows_a.row(5)); assert!(rows_a.row(2) < rows_a.row(1)); assert!(rows_a.row(0) < rows_a.row(1)); assert!(rows_a.row(3) < rows_a.row(0)); assert_eq!(rows_a.row(1), rows_a.row(4)); assert_eq!(rows_a.row(1), rows_a.row(6)); assert_eq!(rows_a.row(1), rows_a.row(7)); let cols = converter.convert_rows(&rows_a).unwrap(); dictionary_eq(&cols[0], &a); let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([ Some("hello"), None, Some("cupcakes"), ])) as ArrayRef; let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap(); assert_eq!(rows_a.row(1), rows_b.row(0)); assert_eq!(rows_a.row(3), rows_b.row(1)); assert!(rows_b.row(2) < rows_a.row(0)); let cols = converter.convert_rows(&rows_b).unwrap(); dictionary_eq(&cols[0], &b); let converter = RowConverter::new(vec![SortField::new_with_options( a.data_type().clone(), SortOptions::default().desc().with_nulls_first(false), )]) .unwrap(); let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); assert!(rows_c.row(3) > rows_c.row(5)); assert!(rows_c.row(2) > rows_c.row(1)); assert!(rows_c.row(0) > rows_c.row(1)); assert!(rows_c.row(3) > rows_c.row(0)); let cols = converter.convert_rows(&rows_c).unwrap(); dictionary_eq(&cols[0], &a); let converter = RowConverter::new(vec![SortField::new_with_options( a.data_type().clone(), SortOptions::default().desc().with_nulls_first(true), )]) .unwrap(); let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); assert!(rows_c.row(3) < rows_c.row(5)); assert!(rows_c.row(2) > rows_c.row(1)); assert!(rows_c.row(0) > rows_c.row(1)); assert!(rows_c.row(3) < rows_c.row(0)); let cols = converter.convert_rows(&rows_c).unwrap(); dictionary_eq(&cols[0], &a); } #[test] fn test_struct() { // Test basic let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef; let a_f = Arc::new(Field::new("int", DataType::Int32, false)); let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef; let u_f = Arc::new(Field::new("s", DataType::Utf8, false)); let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef; let sort_fields = vec![SortField::new(s1.data_type().clone())]; let converter = RowConverter::new(sort_fields).unwrap(); let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap(); for (a, b) in r1.iter().zip(r1.iter().skip(1)) { assert!(a < b); } let back = converter.convert_rows(&r1).unwrap(); assert_eq!(back.len(), 1); assert_eq!(&back[0], &s1); // Test struct nullability let data = s1 .to_data() .into_builder() .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010]))) .null_count(2) .build() .unwrap(); let s2 = Arc::new(StructArray::from(data)) as ArrayRef; let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap(); assert_eq!(r2.row(0), r2.row(2)); // Nulls equal assert!(r2.row(0) < r2.row(1)); // Nulls first assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null assert_eq!(r1.row(1), r2.row(1)); // Values equal let back = converter.convert_rows(&r2).unwrap(); assert_eq!(back.len(), 1); assert_eq!(&back[0], &s2); back[0].to_data().validate_full().unwrap(); } #[test] fn test_primitive_dictionary() { let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new(); builder.append(2).unwrap(); builder.append(3).unwrap(); builder.append(0).unwrap(); builder.append_null(); builder.append(5).unwrap(); builder.append(3).unwrap(); builder.append(-1).unwrap(); let a = builder.finish(); let data_type = a.data_type().clone(); let columns = [Arc::new(a) as ArrayRef]; let field = SortField::new(data_type.clone()); let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&columns).unwrap(); assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(0)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(6) < rows.row(2)); assert!(rows.row(3) < rows.row(6)); } #[test] fn test_dictionary_nulls() { let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data(); let keys = Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data(); let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32)); let data = keys .into_builder() .data_type(data_type.clone()) .child_data(vec![values]) .build() .unwrap(); let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef]; let field = SortField::new(data_type.clone()); let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&columns).unwrap(); assert_eq!(rows.row(0), rows.row(1)); assert_eq!(rows.row(3), rows.row(4)); assert_eq!(rows.row(4), rows.row(5)); assert!(rows.row(3) < rows.row(0)); } #[test] #[should_panic(expected = "Encountered non UTF-8 data")] fn test_invalid_utf8() { let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _; let rows = converter.convert_columns(&[array]).unwrap(); let binary_row = rows.row(0); let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); let parser = converter.parser(); let utf8_row = parser.parse(binary_row.as_ref()); converter.convert_rows(std::iter::once(utf8_row)).unwrap(); } #[test] #[should_panic(expected = "Encountered non UTF-8 data")] fn test_invalid_utf8_array() { let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _; let rows = converter.convert_columns(&[array]).unwrap(); let binary_rows = rows.try_into_binary().expect("known-small rows"); let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); let parsed = converter.from_binary(binary_rows); converter.convert_rows(parsed.iter()).unwrap(); } #[test] #[should_panic(expected = "index out of bounds")] fn test_invalid_empty() { let binary_row: &[u8] = &[]; let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); let parser = converter.parser(); let utf8_row = parser.parse(binary_row.as_ref()); converter.convert_rows(std::iter::once(utf8_row)).unwrap(); } #[test] #[should_panic(expected = "index out of bounds")] fn test_invalid_empty_array() { let row: &[u8] = &[]; let binary_rows = BinaryArray::from(vec![row]); let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); let parsed = converter.from_binary(binary_rows); converter.convert_rows(parsed.iter()).unwrap(); } #[test] #[should_panic(expected = "index out of bounds")] fn test_invalid_truncated() { let binary_row: &[u8] = &[0x02]; let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); let parser = converter.parser(); let utf8_row = parser.parse(binary_row.as_ref()); converter.convert_rows(std::iter::once(utf8_row)).unwrap(); } #[test] #[should_panic(expected = "index out of bounds")] fn test_invalid_truncated_array() { let row: &[u8] = &[0x02]; let binary_rows = BinaryArray::from(vec![row]); let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); let parsed = converter.from_binary(binary_rows); converter.convert_rows(parsed.iter()).unwrap(); } #[test] #[should_panic(expected = "rows were not produced by this RowConverter")] fn test_different_converter() { let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)])); let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); let rows = converter.convert_columns(&[values]).unwrap(); let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); let _ = converter.convert_rows(&rows); } fn test_single_list<O: OffsetSizeTrait>() { let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new()); builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(32); builder.append(true); builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(12); builder.append(true); builder.values().append_value(32); builder.values().append_value(52); builder.append(true); builder.values().append_value(32); // MASKED builder.values().append_value(52); // MASKED builder.append(false); builder.values().append_value(32); builder.values().append_null(); builder.append(true); builder.append(true); let list = Arc::new(builder.finish()) as ArrayRef; let d = list.data_type().clone(); let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12] assert!(rows.row(3) < rows.row(2)); // null < [32, 42] assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42] assert!(rows.row(5) < rows.row(2)); // [] < [32, 42] assert!(rows.row(3) < rows.row(5)); // null < [] let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); back[0].to_data().validate_full().unwrap(); assert_eq!(&back[0], &list); let options = SortOptions::default().asc().with_nulls_first(false); let field = SortField::new_with_options(d.clone(), options); let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12] assert!(rows.row(3) > rows.row(2)); // null > [32, 42] assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42] assert!(rows.row(5) < rows.row(2)); // [] < [32, 42] assert!(rows.row(3) > rows.row(5)); // null > [] let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); back[0].to_data().validate_full().unwrap(); assert_eq!(&back[0], &list); let options = SortOptions::default().desc().with_nulls_first(false); let field = SortField::new_with_options(d.clone(), options); let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12] assert!(rows.row(3) > rows.row(2)); // null > [32, 42] assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42] assert!(rows.row(5) > rows.row(2)); // [] > [32, 42] assert!(rows.row(3) > rows.row(5)); // null > [] let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); back[0].to_data().validate_full().unwrap(); assert_eq!(&back[0], &list); let options = SortOptions::default().desc().with_nulls_first(true); let field = SortField::new_with_options(d, options); let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12] assert!(rows.row(3) < rows.row(2)); // null < [32, 42] assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42] assert!(rows.row(5) > rows.row(2)); // [] > [32, 42] assert!(rows.row(3) < rows.row(5)); // null < [] let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); back[0].to_data().validate_full().unwrap(); assert_eq!(&back[0], &list); } fn test_nested_list<O: OffsetSizeTrait>() { let mut builder = GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new())); builder.values().values().append_value(1); builder.values().values().append_value(2); builder.values().append(true); builder.values().values().append_value(1); builder.values().values().append_null(); builder.values().append(true); builder.append(true); builder.values().values().append_value(1); builder.values().values().append_null(); builder.values().append(true); builder.values().values().append_value(1); builder.values().values().append_null(); builder.values().append(true); builder.append(true); builder.values().values().append_value(1); builder.values().values().append_null(); builder.values().append(true); builder.values().append(false); builder.append(true); builder.append(false); builder.values().values().append_value(1); builder.values().values().append_value(2); builder.values().append(true); builder.append(true); let list = Arc::new(builder.finish()) as ArrayRef; let d = list.data_type().clone(); // [ // [[1, 2], [1, null]], // [[1, null], [1, null]], // [[1, null], null] // null // [[1, 2]] // ] let options = SortOptions::default().asc().with_nulls_first(true); let field = SortField::new_with_options(d.clone(), options); let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); assert!(rows.row(1) > rows.row(2)); assert!(rows.row(2) > rows.row(3)); assert!(rows.row(4) < rows.row(0)); assert!(rows.row(4) > rows.row(1)); let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); back[0].to_data().validate_full().unwrap(); assert_eq!(&back[0], &list); let options = SortOptions::default().desc().with_nulls_first(true); let field = SortField::new_with_options(d.clone(), options); let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); assert!(rows.row(1) > rows.row(2)); assert!(rows.row(2) > rows.row(3)); assert!(rows.row(4) > rows.row(0)); assert!(rows.row(4) > rows.row(1)); let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); back[0].to_data().validate_full().unwrap(); assert_eq!(&back[0], &list); let options = SortOptions::default().desc().with_nulls_first(false); let field = SortField::new_with_options(d, options); let converter = RowConverter::new(vec![field]).unwrap(); let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) < rows.row(1)); assert!(rows.row(1) < rows.row(2)); assert!(rows.row(2) < rows.row(3)); assert!(rows.row(4) > rows.row(0)); assert!(rows.row(4) < rows.row(1)); let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); back[0].to_data().validate_full().unwrap(); assert_eq!(&back[0], &list); } #[test] fn test_list() { test_single_list::<i32>(); test_nested_list::<i32>(); } #[test] fn test_large_list() { test_single_list::<i64>(); test_nested_list::<i64>(); } fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K> where K: ArrowPrimitiveType, StandardUniform: Distribution<K::Native>, { let mut rng = rng(); (0..len) .map(|_| rng.random_bool(valid_percent).then(|| rng.random())) .collect() } fn generate_strings<O: OffsetSizeTrait>( len: usize, valid_percent: f64, ) -> GenericStringArray<O> { let mut rng = rng(); (0..len) .map(|_| { rng.random_bool(valid_percent).then(|| { let len = rng.random_range(0..100); let bytes = (0..len).map(|_| rng.random_range(0..128)).collect(); String::from_utf8(bytes).unwrap() }) }) .collect() } fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray { let mut rng = rng(); (0..len) .map(|_| { rng.random_bool(valid_percent).then(|| { let len = rng.random_range(0..100); let bytes = (0..len).map(|_| rng.random_range(0..128)).collect(); String::from_utf8(bytes).unwrap() }) }) .collect() } fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray { let mut rng = rng(); (0..len) .map(|_| { rng.random_bool(valid_percent).then(|| { let len = rng.random_range(0..100); let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect(); bytes }) }) .collect() } fn generate_dictionary<K>( values: ArrayRef, len: usize, valid_percent: f64, ) -> DictionaryArray<K> where K: ArrowDictionaryKeyType, K::Native: SampleUniform, { let mut rng = rng(); let min_key = K::Native::from_usize(0).unwrap(); let max_key = K::Native::from_usize(values.len()).unwrap(); let keys: PrimitiveArray<K> = (0..len) .map(|_| { rng.random_bool(valid_percent) .then(|| rng.random_range(min_key..max_key)) }) .collect(); let data_type = DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone())); let data = keys .into_data() .into_builder() .data_type(data_type) .add_child_data(values.to_data()) .build() .unwrap(); DictionaryArray::from(data) } fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray { let mut rng = rng(); let width = rng.random_range(0..20); let mut builder = FixedSizeBinaryBuilder::new(width); let mut b = vec![0; width as usize]; for _ in 0..len { match rng.random_bool(valid_percent) { true => { b.iter_mut().for_each(|x| *x = rng.random()); builder.append_value(&b).unwrap(); } false => builder.append_null(), } } builder.finish() } fn generate_struct(len: usize, valid_percent: f64) -> StructArray { let mut rng = rng(); let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent))); let a = generate_primitive_array::<Int32Type>(len, valid_percent); let b = generate_strings::<i32>(len, valid_percent); let fields = Fields::from(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Utf8, true), ]); let values = vec![Arc::new(a) as _, Arc::new(b) as _]; StructArray::new(fields, values, Some(nulls)) } fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray where F: FnOnce(usize) -> ArrayRef, { let mut rng = rng(); let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10))); let values_len = offsets.last().unwrap().to_usize().unwrap(); let values = values(values_len); let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent))); let field = Arc::new(Field::new_list_field(values.data_type().clone(), true)); ListArray::new(field, offsets, values, Some(nulls)) } fn generate_column(len: usize) -> ArrayRef { let mut rng = rng(); match rng.random_range(0..16) { 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)), 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)), 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)), 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)), 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)), 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)), 6 => Arc::new(generate_strings::<i32>(len, 0.8)), 7 => Arc::new(generate_dictionary::<Int64Type>( // Cannot test dictionaries containing null values because of #2687 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)), len, 0.8, )), 8 => Arc::new(generate_dictionary::<Int64Type>( // Cannot test dictionaries containing null values because of #2687 Arc::new(generate_primitive_array::<Int64Type>( rng.random_range(1..len), 1.0, )), len, 0.8, )), 9 => Arc::new(generate_fixed_size_binary(len, 0.8)), 10 => Arc::new(generate_struct(len, 0.8)), 11 => Arc::new(generate_list(len, 0.8, |values_len| { Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8)) })), 12 => Arc::new(generate_list(len, 0.8, |values_len| { Arc::new(generate_strings::<i32>(values_len, 0.8)) })), 13 => Arc::new(generate_list(len, 0.8, |values_len| { Arc::new(generate_struct(values_len, 0.8)) })), 14 => Arc::new(generate_string_view(len, 0.8)), 15 => Arc::new(generate_byte_view(len, 0.8)), _ => unreachable!(), } } fn print_row(cols: &[SortColumn], row: usize) -> String { let t: Vec<_> = cols .iter() .map(|x| match x.values.is_valid(row) { true => { let opts = FormatOptions::default().with_null("NULL"); let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap(); formatter.value(row).to_string() } false => "NULL".to_string(), }) .collect(); t.join(",") } fn print_col_types(cols: &[SortColumn]) -> String { let t: Vec<_> = cols .iter() .map(|x| x.values.data_type().to_string()) .collect(); t.join(",") } #[test] #[cfg_attr(miri, ignore)] fn fuzz_test() { for _ in 0..100 { let mut rng = rng(); let num_columns = rng.random_range(1..5); let len = rng.random_range(5..100); let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect(); let options: Vec<_> = (0..num_columns) .map(|_| SortOptions { descending: rng.random_bool(0.5), nulls_first: rng.random_bool(0.5), }) .collect(); let sort_columns: Vec<_> = options .iter() .zip(&arrays) .map(|(o, c)| SortColumn { values: Arc::clone(c), options: Some(*o), }) .collect(); let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); let columns: Vec<SortField> = options .into_iter() .zip(&arrays) .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) .collect(); let converter = RowConverter::new(columns).unwrap(); let rows = converter.convert_columns(&arrays).unwrap(); for i in 0..len { for j in 0..len { let row_i = rows.row(i); let row_j = rows.row(j); let row_cmp = row_i.cmp(&row_j); let lex_cmp = comparator.compare(i, j); assert_eq!( row_cmp, lex_cmp, "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}", print_row(&sort_columns, i), print_row(&sort_columns, j), row_i, row_j, print_col_types(&sort_columns) ); } } let back = converter.convert_rows(&rows).unwrap(); for (actual, expected) in back.iter().zip(&arrays) { actual.to_data().validate_full().unwrap(); dictionary_eq(actual, expected) } // Check that we can convert let rows = rows.try_into_binary().expect("reasonable size"); let parser = converter.parser(); let back = converter .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes")))) .unwrap(); for (actual, expected) in back.iter().zip(&arrays) { actual.to_data().validate_full().unwrap(); dictionary_eq(actual, expected) } let rows = converter.from_binary(rows); let back = converter.convert_rows(&rows).unwrap(); for (actual, expected) in back.iter().zip(&arrays) { actual.to_data().validate_full().unwrap(); dictionary_eq(actual, expected) } } } #[test] fn test_clear() { let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); let mut rows = converter.empty_rows(3, 128); let first = Int32Array::from(vec![None, Some(2), Some(4)]); let second = Int32Array::from(vec![Some(2), None, Some(4)]); let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef]; for array in arrays.iter() { rows.clear(); converter.append(&mut rows, &[array.clone()]).unwrap(); let back = converter.convert_rows(&rows).unwrap(); assert_eq!(&back[0], array); } let mut rows_expected = converter.empty_rows(3, 128); converter.append(&mut rows_expected, &arrays[1..]).unwrap(); for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() { assert_eq!( actual, expected, "For row {}: expected {:?}, actual: {:?}", i, expected, actual ); } } #[test] fn test_append_codec_dictionary_binary() { use DataType::*; // Dictionary RowConverter let converter = RowConverter::new(vec![SortField::new(Dictionary( Box::new(Int32), Box::new(Binary), ))]) .unwrap(); let mut rows = converter.empty_rows(4, 128); let keys = Int32Array::from_iter_values([0, 1, 2, 3]); let values = BinaryArray::from(vec![ Some("a".as_bytes()), Some(b"b"), Some(b"c"), Some(b"d"), ]); let dict_array = DictionaryArray::new(keys, Arc::new(values)); rows.clear(); let array = Arc::new(dict_array) as ArrayRef; converter.append(&mut rows, &[array.clone()]).unwrap(); let back = converter.convert_rows(&rows).unwrap(); dictionary_eq(&back[0], &array); } #[test] fn test_list_prefix() { let mut a = ListBuilder::new(Int8Builder::new()); a.append_value([None]); a.append_value([None, None]); let a = a.finish(); let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap(); assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less); } }