in parquet/src/record/triplet.rs [291:358]
fn read_next(&mut self) -> Result<bool> {
self.curr_triplet_index += 1;
// A loop is required to handle the case of a batch size of 1, as in such a case
// on reaching the end of a record, read_records will return `Ok((1, 0, 0))`
// and therefore not advance `self.triplets_left`
while self.curr_triplet_index >= self.triplets_left {
let (records_read, values_read, levels_read) = {
self.values.clear();
if let Some(x) = &mut self.def_levels {
x.clear()
}
if let Some(x) = &mut self.rep_levels {
x.clear()
}
// Buffer triplets
self.reader.read_records(
self.batch_size,
self.def_levels.as_mut(),
self.rep_levels.as_mut(),
&mut self.values,
)?
};
// No more values or levels to read
if records_read == 0 && values_read == 0 && levels_read == 0 {
self.has_next = false;
return Ok(false);
}
// We never read values more than levels
if levels_read == 0 || values_read == levels_read {
// There are no definition levels to read, column is required
// or definition levels match values, so it does not require spacing
self.curr_triplet_index = 0;
self.triplets_left = values_read;
} else if values_read < levels_read {
// Add spacing for triplets.
// The idea is setting values for positions in def_levels when current
// definition level equals to maximum definition level.
// Values and levels are guaranteed to line up, because of
// the column reader method.
// Note: if values_read == 0, then spacing will not be triggered
let mut idx = values_read;
let def_levels = self.def_levels.as_ref().unwrap();
self.values.resize(levels_read, T::T::default());
for i in 0..levels_read {
if def_levels[levels_read - i - 1] == self.max_def_level {
idx -= 1; // This is done to avoid usize becoming a negative value
self.values.swap(levels_read - i - 1, idx);
}
}
self.curr_triplet_index = 0;
self.triplets_left = levels_read;
} else {
return Err(general_err!(
"Spacing of values/levels is wrong, values_read: {}, levels_read: {}",
values_read,
levels_read
));
}
}
self.has_next = true;
Ok(true)
}