in parquet/src/arrow/array_reader/list_array.rs [84:231]
fn consume_batch(&mut self) -> Result<ArrayRef> {
let next_batch_array = self.item_reader.consume_batch()?;
if next_batch_array.is_empty() {
return Ok(new_empty_array(&self.data_type));
}
let def_levels = self
.item_reader
.get_def_levels()
.ok_or_else(|| general_err!("item_reader def levels are None."))?;
let rep_levels = self
.item_reader
.get_rep_levels()
.ok_or_else(|| general_err!("item_reader rep levels are None."))?;
if OffsetSize::from_usize(next_batch_array.len()).is_none() {
return Err(general_err!(
"offset of {} would overflow list array",
next_batch_array.len()
));
}
if !rep_levels.is_empty() && rep_levels[0] != 0 {
// This implies either the source data was invalid, or the leaf column
// reader did not correctly delimit semantic records
return Err(general_err!("first repetition level of batch must be 0"));
}
// A non-nullable list has a single definition level indicating if the list is empty
//
// A nullable list has two definition levels associated with it:
//
// The first identifies if the list is null
// The second identifies if the list is empty
//
// The child data returned above is padded with a value for each not-fully defined level.
// Therefore null and empty lists will correspond to a value in the child array.
//
// Whilst nulls may have a non-zero slice in the offsets array, empty lists must
// be of zero length. As a result we MUST filter out values corresponding to empty
// lists, and for consistency we do the same for nulls.
// The output offsets for the computed ListArray
let mut list_offsets: Vec<OffsetSize> = Vec::with_capacity(next_batch_array.len() + 1);
// The validity mask of the computed ListArray if nullable
let mut validity = self
.nullable
.then(|| BooleanBufferBuilder::new(next_batch_array.len()));
// The offset into the filtered child data of the current level being considered
let mut cur_offset = 0;
// Identifies the start of a run of values to copy from the source child data
let mut filter_start = None;
// The number of child values skipped due to empty lists or nulls
let mut skipped = 0;
// Builder used to construct the filtered child data, skipping empty lists and nulls
let data = next_batch_array.to_data();
let mut child_data_builder =
MutableArrayData::new(vec![&data], false, next_batch_array.len());
def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
match r.cmp(&self.rep_level) {
Ordering::Greater => {
// Repetition level greater than current => already handled by inner array
if *d < self.def_level {
return Err(general_err!(
"Encountered repetition level too large for definition level"
));
}
}
Ordering::Equal => {
// New value in the current list
cur_offset += 1;
}
Ordering::Less => {
// Create new array slice
// Already checked that this cannot overflow
list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
if *d >= self.def_level {
// Fully defined value
// Record current offset if it is None
filter_start.get_or_insert(cur_offset + skipped);
cur_offset += 1;
if let Some(validity) = validity.as_mut() {
validity.append(true)
}
} else {
// Flush the current slice of child values if any
if let Some(start) = filter_start.take() {
child_data_builder.extend(0, start, cur_offset + skipped);
}
if let Some(validity) = validity.as_mut() {
// Valid if empty list
validity.append(*d + 1 == self.def_level)
}
skipped += 1;
}
}
}
Ok(())
})?;
list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
let child_data = if skipped == 0 {
// No filtered values - can reuse original array
next_batch_array.to_data()
} else {
// One or more filtered values - must build new array
if let Some(start) = filter_start.take() {
child_data_builder.extend(0, start, cur_offset + skipped)
}
child_data_builder.freeze()
};
if cur_offset != child_data.len() {
return Err(general_err!("Failed to reconstruct list from level data"));
}
let value_offsets = Buffer::from(list_offsets.to_byte_slice());
let mut data_builder = ArrayData::builder(self.get_data_type().clone())
.len(list_offsets.len() - 1)
.add_buffer(value_offsets)
.add_child_data(child_data);
if let Some(builder) = validity {
assert_eq!(builder.len(), list_offsets.len() - 1);
data_builder = data_builder.null_bit_buffer(Some(builder.into()))
}
let list_data = unsafe { data_builder.build_unchecked() };
let result_array = GenericListArray::<OffsetSize>::from(list_data);
Ok(Arc::new(result_array))
}