in parquet/src/column/writer/mod.rs [648:736]
fn write_mini_batch(
&mut self,
values: &E::Values,
values_offset: usize,
value_indices: Option<&[usize]>,
num_levels: usize,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize> {
// Process definition levels and determine how many values to write.
let values_to_write = if self.descr.max_def_level() > 0 {
let levels = def_levels.ok_or_else(|| {
general_err!(
"Definition levels are required, because max definition level = {}",
self.descr.max_def_level()
)
})?;
let mut values_to_write = 0;
for &level in levels {
if level == self.descr.max_def_level() {
values_to_write += 1;
} else {
// We must always compute this as it is used to populate v2 pages
self.page_metrics.num_page_nulls += 1
}
}
// Update histogram
self.page_metrics.update_definition_level_histogram(levels);
self.def_levels_sink.extend_from_slice(levels);
values_to_write
} else {
num_levels
};
// Process repetition levels and determine how many rows we are about to process.
if self.descr.max_rep_level() > 0 {
// A row could contain more than one value.
let levels = rep_levels.ok_or_else(|| {
general_err!(
"Repetition levels are required, because max repetition level = {}",
self.descr.max_rep_level()
)
})?;
if !levels.is_empty() && levels[0] != 0 {
return Err(general_err!(
"Write must start at a record boundary, got non-zero repetition level of {}",
levels[0]
));
}
// Count the occasions where we start a new row
for &level in levels {
self.page_metrics.num_buffered_rows += (level == 0) as u32
}
// Update histogram
self.page_metrics.update_repetition_level_histogram(levels);
self.rep_levels_sink.extend_from_slice(levels);
} else {
// Each value is exactly one row.
// Equals to the number of values, we count nulls as well.
self.page_metrics.num_buffered_rows += num_levels as u32;
}
match value_indices {
Some(indices) => {
let indices = &indices[values_offset..values_offset + values_to_write];
self.encoder.write_gather(values, indices)?;
}
None => self.encoder.write(values, values_offset, values_to_write)?,
}
self.page_metrics.num_buffered_values += num_levels as u32;
if self.should_add_data_page() {
self.add_data_page()?;
}
if self.should_dict_fallback() {
self.dict_fallback()?;
}
Ok(values_to_write)
}