in parquet/src/arrow/array_reader/fixed_size_list_array.rs [78:209]
fn consume_batch(&mut self) -> Result<ArrayRef> {
let next_batch_array = self.item_reader.consume_batch()?;
if next_batch_array.len() == 0 {
return Ok(new_empty_array(&self.data_type));
}
let def_levels = self
.get_def_levels()
.ok_or_else(|| general_err!("item_reader def levels are None"))?;
let rep_levels = self
.get_rep_levels()
.ok_or_else(|| general_err!("item_reader rep levels are None"))?;
if !rep_levels.is_empty() && rep_levels[0] != 0 {
// This implies either the source data was invalid, or the leaf column
// reader did not correctly delimit semantic records
return Err(general_err!("first repetition level of batch must be 0"));
}
let mut validity = self
.nullable
.then(|| BooleanBufferBuilder::new(next_batch_array.len()));
let data = next_batch_array.to_data();
let mut child_data_builder =
MutableArrayData::new(vec![&data], true, next_batch_array.len());
// The current index into the child array entries
let mut child_idx = 0;
// The total number of rows (valid and invalid) in the list array
let mut list_len = 0;
// Start of the current run of valid values
let mut start_idx = None;
let mut row_len = 0;
def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
match r.cmp(&self.rep_level) {
Ordering::Greater => {
// Repetition level greater than current => already handled by inner array
if *d < self.def_level {
return Err(general_err!(
"Encountered repetition level too large for definition level"
));
}
}
Ordering::Equal => {
// Item inside of the current list
child_idx += 1;
row_len += 1;
}
Ordering::Less => {
// Start of new list row
list_len += 1;
// Length of the previous row should be equal to:
// - the list's fixed size (valid entries)
// - zero (null entries, start of array)
// Any other length indicates invalid data
if start_idx.is_some() && row_len != self.fixed_size {
return Err(general_err!(
"Encountered misaligned row with length {} (expected length {})",
row_len,
self.fixed_size
))
}
row_len = 0;
if *d >= self.def_level {
row_len += 1;
// Valid list entry
if let Some(validity) = validity.as_mut() {
validity.append(true);
}
// Start a run of valid rows if not already inside of one
start_idx.get_or_insert(child_idx);
} else {
// Null list entry
if let Some(start) = start_idx.take() {
// Flush pending child items
child_data_builder.extend(0, start, child_idx);
}
// Pad list with nulls
child_data_builder.extend_nulls(self.fixed_size);
if let Some(validity) = validity.as_mut() {
// Valid if empty list
validity.append(*d + 1 == self.def_level);
}
}
child_idx += 1;
}
}
Ok(())
})?;
let child_data = match start_idx {
Some(0) => {
// No null entries - can reuse original array
next_batch_array.to_data()
}
Some(start) => {
// Flush pending child items
child_data_builder.extend(0, start, child_idx);
child_data_builder.freeze()
}
None => child_data_builder.freeze(),
};
// Verify total number of elements is aligned with fixed list size
if list_len * self.fixed_size != child_data.len() {
return Err(general_err!(
"fixed-size list length must be a multiple of {} but array contains {} elements",
self.fixed_size,
child_data.len()
));
}
let mut list_builder = ArrayData::builder(self.get_data_type().clone())
.len(list_len)
.add_child_data(child_data);
if let Some(builder) = validity {
list_builder = list_builder.null_bit_buffer(Some(builder.into()));
}
let list_data = unsafe { list_builder.build_unchecked() };
let result_array = FixedSizeListArray::from(list_data);
Ok(Arc::new(result_array))
}