arrow/src/util/data_gen.rs (624 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Utilities to generate random arrays and batches
use std::sync::Arc;
use rand::{
distr::uniform::{SampleRange, SampleUniform},
Rng,
};
use crate::array::*;
use crate::error::{ArrowError, Result};
use crate::{
buffer::{Buffer, MutableBuffer},
datatypes::*,
};
use super::{bench_util::*, bit_util, test_util::seedable_rng};
/// Create a random [RecordBatch] from a schema
pub fn create_random_batch(
schema: SchemaRef,
size: usize,
null_density: f32,
true_density: f32,
) -> Result<RecordBatch> {
let columns = schema
.fields()
.iter()
.map(|field| create_random_array(field, size, null_density, true_density))
.collect::<Result<Vec<ArrayRef>>>()?;
RecordBatch::try_new_with_options(
schema,
columns,
&RecordBatchOptions::new().with_match_field_names(false),
)
}
/// Create a random [ArrayRef] from a [DataType] with a length,
/// null density and true density (for [BooleanArray]).
pub fn create_random_array(
field: &Field,
size: usize,
null_density: f32,
true_density: f32,
) -> Result<ArrayRef> {
// Override null density with 0.0 if the array is non-nullable
// and a primitive type in case a nested field is nullable
let primitive_null_density = match field.is_nullable() {
true => null_density,
false => 0.0,
};
use DataType::*;
Ok(match field.data_type() {
Null => Arc::new(NullArray::new(size)) as ArrayRef,
Boolean => Arc::new(create_boolean_array(
size,
primitive_null_density,
true_density,
)),
Int8 => Arc::new(create_primitive_array::<Int8Type>(
size,
primitive_null_density,
)),
Int16 => Arc::new(create_primitive_array::<Int16Type>(
size,
primitive_null_density,
)),
Int32 => Arc::new(create_primitive_array::<Int32Type>(
size,
primitive_null_density,
)),
Int64 => Arc::new(create_primitive_array::<Int64Type>(
size,
primitive_null_density,
)),
UInt8 => Arc::new(create_primitive_array::<UInt8Type>(
size,
primitive_null_density,
)),
UInt16 => Arc::new(create_primitive_array::<UInt16Type>(
size,
primitive_null_density,
)),
UInt32 => Arc::new(create_primitive_array::<UInt32Type>(
size,
primitive_null_density,
)),
UInt64 => Arc::new(create_primitive_array::<UInt64Type>(
size,
primitive_null_density,
)),
Float16 => {
return Err(ArrowError::NotYetImplemented(
"Float16 is not implemented".to_string(),
))
}
Float32 => Arc::new(create_primitive_array::<Float32Type>(
size,
primitive_null_density,
)),
Float64 => Arc::new(create_primitive_array::<Float64Type>(
size,
primitive_null_density,
)),
Timestamp(unit, tz) => match unit {
TimeUnit::Second => Arc::new(
create_random_temporal_array::<TimestampSecondType>(size, primitive_null_density)
.with_timezone_opt(tz.clone()),
),
TimeUnit::Millisecond => Arc::new(
create_random_temporal_array::<TimestampMillisecondType>(
size,
primitive_null_density,
)
.with_timezone_opt(tz.clone()),
),
TimeUnit::Microsecond => Arc::new(
create_random_temporal_array::<TimestampMicrosecondType>(
size,
primitive_null_density,
)
.with_timezone_opt(tz.clone()),
),
TimeUnit::Nanosecond => Arc::new(
create_random_temporal_array::<TimestampNanosecondType>(
size,
primitive_null_density,
)
.with_timezone_opt(tz.clone()),
),
},
Date32 => Arc::new(create_random_temporal_array::<Date32Type>(
size,
primitive_null_density,
)),
Date64 => Arc::new(create_random_temporal_array::<Date64Type>(
size,
primitive_null_density,
)),
Time32(unit) => match unit {
TimeUnit::Second => Arc::new(create_random_temporal_array::<Time32SecondType>(
size,
primitive_null_density,
)) as ArrayRef,
TimeUnit::Millisecond => Arc::new(
create_random_temporal_array::<Time32MillisecondType>(size, primitive_null_density),
),
_ => {
return Err(ArrowError::InvalidArgumentError(format!(
"Unsupported unit {unit:?} for Time32"
)))
}
},
Time64(unit) => match unit {
TimeUnit::Microsecond => Arc::new(
create_random_temporal_array::<Time64MicrosecondType>(size, primitive_null_density),
) as ArrayRef,
TimeUnit::Nanosecond => Arc::new(create_random_temporal_array::<Time64NanosecondType>(
size,
primitive_null_density,
)),
_ => {
return Err(ArrowError::InvalidArgumentError(format!(
"Unsupported unit {unit:?} for Time64"
)))
}
},
Utf8 => Arc::new(create_string_array::<i32>(size, primitive_null_density)),
LargeUtf8 => Arc::new(create_string_array::<i64>(size, primitive_null_density)),
Utf8View => Arc::new(create_string_view_array_with_len(
size,
primitive_null_density,
4,
false,
)),
Binary => Arc::new(create_binary_array::<i32>(size, primitive_null_density)),
LargeBinary => Arc::new(create_binary_array::<i64>(size, primitive_null_density)),
FixedSizeBinary(len) => Arc::new(create_fsb_array(
size,
primitive_null_density,
*len as usize,
)),
BinaryView => Arc::new(
create_string_view_array_with_len(size, primitive_null_density, 4, false)
.to_binary_view(),
),
List(_) => create_random_list_array(field, size, null_density, true_density)?,
LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
Struct(_) => create_random_struct_array(field, size, null_density, true_density)?,
d @ Dictionary(_, value_type) if crate::compute::can_cast_types(value_type, d) => {
let f = Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
);
let v = create_random_array(&f, size, null_density, true_density)?;
crate::compute::cast(&v, d)?
}
Map(_, _) => create_random_map_array(field, size, null_density, true_density)?,
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Generating random arrays not yet implemented for {other:?}"
)))
}
})
}
#[inline]
fn create_random_list_array(
field: &Field,
size: usize,
null_density: f32,
true_density: f32,
) -> Result<ArrayRef> {
// Override null density with 0.0 if the array is non-nullable
let list_null_density = match field.is_nullable() {
true => null_density,
false => 0.0,
};
let list_field;
let (offsets, child_len) = match field.data_type() {
DataType::List(f) => {
let (offsets, child_len) = create_random_offsets::<i32>(size, 0, 5);
list_field = f;
(Buffer::from(offsets.to_byte_slice()), child_len as usize)
}
DataType::LargeList(f) => {
let (offsets, child_len) = create_random_offsets::<i64>(size, 0, 5);
list_field = f;
(Buffer::from(offsets.to_byte_slice()), child_len as usize)
}
_ => {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot create list array for field {field:?}"
)))
}
};
// Create list's child data
let child_array = create_random_array(list_field, child_len, null_density, true_density)?;
let child_data = child_array.to_data();
// Create list's null buffers, if it is nullable
let null_buffer = match field.is_nullable() {
true => Some(create_random_null_buffer(size, list_null_density)),
false => None,
};
let list_data = unsafe {
ArrayData::new_unchecked(
field.data_type().clone(),
size,
None,
null_buffer,
0,
vec![offsets],
vec![child_data],
)
};
Ok(make_array(list_data))
}
#[inline]
fn create_random_struct_array(
field: &Field,
size: usize,
null_density: f32,
true_density: f32,
) -> Result<ArrayRef> {
let struct_fields = match field.data_type() {
DataType::Struct(fields) => fields,
_ => {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot create struct array for field {field:?}"
)))
}
};
let child_arrays = struct_fields
.iter()
.map(|struct_field| create_random_array(struct_field, size, null_density, true_density))
.collect::<Result<Vec<_>>>()?;
let null_buffer = match field.is_nullable() {
true => {
let nulls = arrow_buffer::BooleanBuffer::new(
create_random_null_buffer(size, null_density),
0,
size,
);
Some(nulls.into())
}
false => None,
};
Ok(Arc::new(StructArray::try_new(
struct_fields.clone(),
child_arrays,
null_buffer,
)?))
}
#[inline]
fn create_random_map_array(
field: &Field,
size: usize,
null_density: f32,
true_density: f32,
) -> Result<ArrayRef> {
// Override null density with 0.0 if the array is non-nullable
let map_null_density = match field.is_nullable() {
true => null_density,
false => 0.0,
};
let entries_field = match field.data_type() {
DataType::Map(f, _) => f,
_ => {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot create map array for field {field:?}"
)))
}
};
let (offsets, child_len) = create_random_offsets::<i32>(size, 0, 5);
let offsets = Buffer::from(offsets.to_byte_slice());
let entries = create_random_array(
entries_field,
child_len as usize,
null_density,
true_density,
)?
.to_data();
let null_buffer = match field.is_nullable() {
true => Some(create_random_null_buffer(size, map_null_density)),
false => None,
};
let map_data = unsafe {
ArrayData::new_unchecked(
field.data_type().clone(),
size,
None,
null_buffer,
0,
vec![offsets],
vec![entries],
)
};
Ok(make_array(map_data))
}
/// Generate random offsets for list arrays
fn create_random_offsets<T: OffsetSizeTrait + SampleUniform>(
size: usize,
min: T,
max: T,
) -> (Vec<T>, T) {
let rng = &mut seedable_rng();
let mut current_offset = T::zero();
let mut offsets = Vec::with_capacity(size + 1);
offsets.push(current_offset);
(0..size).for_each(|_| {
current_offset += rng.random_range(min..max);
offsets.push(current_offset);
});
(offsets, current_offset)
}
fn create_random_null_buffer(size: usize, null_density: f32) -> Buffer {
let mut rng = seedable_rng();
let mut mut_buf = MutableBuffer::new_null(size);
{
let mut_slice = mut_buf.as_slice_mut();
(0..size).for_each(|i| {
if rng.random::<f32>() >= null_density {
bit_util::set_bit(mut_slice, i)
}
})
};
mut_buf.into()
}
/// Useful for testing. The range of values are not likely to be representative of the
/// actual bounds.
pub trait RandomTemporalValue: ArrowTemporalType {
/// Returns the range of values for `impl`'d type
fn value_range() -> impl SampleRange<Self::Native>;
/// Generate a random value within the range of the type
fn gen_range<R: Rng>(rng: &mut R) -> Self::Native
where
Self::Native: SampleUniform,
{
rng.random_range(Self::value_range())
}
/// Generate a random value of the type
fn random<R: Rng>(rng: &mut R) -> Self::Native
where
Self::Native: SampleUniform,
{
Self::gen_range(rng)
}
}
impl RandomTemporalValue for TimestampSecondType {
/// Range of values for a timestamp in seconds. The range begins at the start
/// of the unix epoch and continues for 100 years.
fn value_range() -> impl SampleRange<Self::Native> {
0..60 * 60 * 24 * 365 * 100
}
}
impl RandomTemporalValue for TimestampMillisecondType {
/// Range of values for a timestamp in milliseconds. The range begins at the start
/// of the unix epoch and continues for 100 years.
fn value_range() -> impl SampleRange<Self::Native> {
0..1_000 * 60 * 60 * 24 * 365 * 100
}
}
impl RandomTemporalValue for TimestampMicrosecondType {
/// Range of values for a timestamp in microseconds. The range begins at the start
/// of the unix epoch and continues for 100 years.
fn value_range() -> impl SampleRange<Self::Native> {
0..1_000 * 1_000 * 60 * 60 * 24 * 365 * 100
}
}
impl RandomTemporalValue for TimestampNanosecondType {
/// Range of values for a timestamp in nanoseconds. The range begins at the start
/// of the unix epoch and continues for 100 years.
fn value_range() -> impl SampleRange<Self::Native> {
0..1_000 * 1_000 * 1_000 * 60 * 60 * 24 * 365 * 100
}
}
impl RandomTemporalValue for Date32Type {
/// Range of values representing the elapsed time since UNIX epoch in days. The
/// range begins at the start of the unix epoch and continues for 100 years.
fn value_range() -> impl SampleRange<Self::Native> {
0..365 * 100
}
}
impl RandomTemporalValue for Date64Type {
/// Range of values representing the elapsed time since UNIX epoch in milliseconds.
/// The range begins at the start of the unix epoch and continues for 100 years.
fn value_range() -> impl SampleRange<Self::Native> {
0..1_000 * 60 * 60 * 24 * 365 * 100
}
}
impl RandomTemporalValue for Time32SecondType {
/// Range of values representing the elapsed time since midnight in seconds. The
/// range is from 0 to 24 hours.
fn value_range() -> impl SampleRange<Self::Native> {
0..60 * 60 * 24
}
}
impl RandomTemporalValue for Time32MillisecondType {
/// Range of values representing the elapsed time since midnight in milliseconds. The
/// range is from 0 to 24 hours.
fn value_range() -> impl SampleRange<Self::Native> {
0..1_000 * 60 * 60 * 24
}
}
impl RandomTemporalValue for Time64MicrosecondType {
/// Range of values representing the elapsed time since midnight in microseconds. The
/// range is from 0 to 24 hours.
fn value_range() -> impl SampleRange<Self::Native> {
0..1_000 * 1_000 * 60 * 60 * 24
}
}
impl RandomTemporalValue for Time64NanosecondType {
/// Range of values representing the elapsed time since midnight in nanoseconds. The
/// range is from 0 to 24 hours.
fn value_range() -> impl SampleRange<Self::Native> {
0..1_000 * 1_000 * 1_000 * 60 * 60 * 24
}
}
fn create_random_temporal_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
where
T: RandomTemporalValue,
<T as ArrowPrimitiveType>::Native: SampleUniform,
{
let mut rng = seedable_rng();
(0..size)
.map(|_| {
if rng.random::<f32>() < null_density {
None
} else {
Some(T::random(&mut rng))
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_batch() {
let size = 32;
let fields = vec![
Field::new("a", DataType::Int32, true),
Field::new(
"timestamp_without_timezone",
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
Field::new(
"timestamp_with_timezone",
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
true,
),
];
let schema = Schema::new(fields);
let schema_ref = Arc::new(schema);
let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
assert_eq!(batch.schema(), schema_ref);
assert_eq!(batch.num_columns(), schema_ref.fields().len());
for array in batch.columns() {
assert_eq!(array.len(), size);
}
}
#[test]
fn test_create_batch_non_null() {
let size = 32;
let fields = vec![
Field::new("a", DataType::Int32, false),
Field::new(
"b",
DataType::List(Arc::new(Field::new_list_field(DataType::LargeUtf8, false))),
false,
),
Field::new("a", DataType::Int32, false),
];
let schema = Schema::new(fields);
let schema_ref = Arc::new(schema);
let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
assert_eq!(batch.schema(), schema_ref);
assert_eq!(batch.num_columns(), schema_ref.fields().len());
for array in batch.columns() {
assert_eq!(array.null_count(), 0);
assert_eq!(array.logical_null_count(), 0);
}
// Test that the list's child values are non-null
let b_array = batch.column(1);
let list_array = b_array.as_list::<i32>();
let child_array = list_array.values();
assert_eq!(child_array.null_count(), 0);
// There should be more values than the list, to show that it's a list
assert!(child_array.len() > list_array.len());
}
#[test]
fn test_create_struct_array() {
let size = 32;
let struct_fields = Fields::from(vec![
Field::new("b", DataType::Boolean, true),
Field::new(
"c",
DataType::LargeList(Arc::new(Field::new_list_field(
DataType::List(Arc::new(Field::new_list_field(
DataType::FixedSizeBinary(6),
true,
))),
false,
))),
true,
),
Field::new(
"d",
DataType::Struct(Fields::from(vec![
Field::new("d_x", DataType::Int32, true),
Field::new("d_y", DataType::Float32, false),
Field::new("d_z", DataType::Binary, true),
])),
true,
),
]);
let field = Field::new("struct", DataType::Struct(struct_fields), true);
let array = create_random_array(&field, size, 0.2, 0.5).unwrap();
assert_eq!(array.len(), 32);
let struct_array = array.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(struct_array.columns().len(), 3);
// Test that the nested list makes sense,
// i.e. its children's values are more than the parent, to show repetition
let col_c = struct_array.column_by_name("c").unwrap();
let col_c = col_c.as_any().downcast_ref::<LargeListArray>().unwrap();
assert_eq!(col_c.len(), size);
let col_c_list = col_c.values().as_list::<i32>();
assert!(col_c_list.len() > size);
// Its values should be FixedSizeBinary(6)
let fsb = col_c_list.values();
assert_eq!(fsb.data_type(), &DataType::FixedSizeBinary(6));
assert!(fsb.len() > col_c_list.len());
// Test nested struct
let col_d = struct_array.column_by_name("d").unwrap();
let col_d = col_d.as_any().downcast_ref::<StructArray>().unwrap();
let col_d_y = col_d.column_by_name("d_y").unwrap();
assert_eq!(col_d_y.data_type(), &DataType::Float32);
assert_eq!(col_d_y.null_count(), 0);
}
#[test]
fn test_create_list_array_nested_nullability() {
let list_field = Field::new_list(
"not_null_list",
Field::new_list_field(DataType::Boolean, true),
false,
);
let list_array = create_random_array(&list_field, 100, 0.95, 0.5).unwrap();
assert_eq!(list_array.null_count(), 0);
assert!(list_array.as_list::<i32>().values().null_count() > 0);
}
#[test]
fn test_create_struct_array_nested_nullability() {
let struct_child_fields = vec![
Field::new("null_int", DataType::Int32, true),
Field::new("int", DataType::Int32, false),
];
let struct_field = Field::new_struct("not_null_struct", struct_child_fields, false);
let struct_array = create_random_array(&struct_field, 100, 0.95, 0.5).unwrap();
assert_eq!(struct_array.null_count(), 0);
assert!(
struct_array
.as_struct()
.column_by_name("null_int")
.unwrap()
.null_count()
> 0
);
assert_eq!(
struct_array
.as_struct()
.column_by_name("int")
.unwrap()
.null_count(),
0
);
}
#[test]
fn test_create_list_array_nested_struct_nullability() {
let struct_child_fields = vec![
Field::new("null_int", DataType::Int32, true),
Field::new("int", DataType::Int32, false),
];
let list_item_field =
Field::new_list_field(DataType::Struct(struct_child_fields.into()), true);
let list_field = Field::new_list("not_null_list", list_item_field, false);
let list_array = create_random_array(&list_field, 100, 0.95, 0.5).unwrap();
assert_eq!(list_array.null_count(), 0);
assert!(list_array.as_list::<i32>().values().null_count() > 0);
assert!(
list_array
.as_list::<i32>()
.values()
.as_struct()
.column_by_name("null_int")
.unwrap()
.null_count()
> 0
);
assert_eq!(
list_array
.as_list::<i32>()
.values()
.as_struct()
.column_by_name("int")
.unwrap()
.null_count(),
0
);
}
#[test]
fn test_create_map_array() {
let map_field = Field::new_map(
"map",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, true),
false,
false,
);
let array = create_random_array(&map_field, 100, 0.8, 0.5).unwrap();
assert_eq!(array.len(), 100);
// Map field is not null
assert_eq!(array.null_count(), 0);
assert_eq!(array.logical_null_count(), 0);
// Maps have multiple values like a list, so internal arrays are longer
assert!(array.as_map().keys().len() > array.len());
assert!(array.as_map().values().len() > array.len());
// Keys are not nullable
assert_eq!(array.as_map().keys().null_count(), 0);
// Values are nullable
assert!(array.as_map().values().null_count() > 0);
assert_eq!(array.as_map().keys().data_type(), &DataType::Utf8);
assert_eq!(array.as_map().values().data_type(), &DataType::Utf8);
}
}