fn write_mini_batch()

in parquet/src/column/writer/mod.rs [648:736]


    fn write_mini_batch(
        &mut self,
        values: &E::Values,
        values_offset: usize,
        value_indices: Option<&[usize]>,
        num_levels: usize,
        def_levels: Option<&[i16]>,
        rep_levels: Option<&[i16]>,
    ) -> Result<usize> {
        // Process definition levels and determine how many values to write.
        let values_to_write = if self.descr.max_def_level() > 0 {
            let levels = def_levels.ok_or_else(|| {
                general_err!(
                    "Definition levels are required, because max definition level = {}",
                    self.descr.max_def_level()
                )
            })?;

            let mut values_to_write = 0;
            for &level in levels {
                if level == self.descr.max_def_level() {
                    values_to_write += 1;
                } else {
                    // We must always compute this as it is used to populate v2 pages
                    self.page_metrics.num_page_nulls += 1
                }
            }

            // Update histogram
            self.page_metrics.update_definition_level_histogram(levels);

            self.def_levels_sink.extend_from_slice(levels);
            values_to_write
        } else {
            num_levels
        };

        // Process repetition levels and determine how many rows we are about to process.
        if self.descr.max_rep_level() > 0 {
            // A row could contain more than one value.
            let levels = rep_levels.ok_or_else(|| {
                general_err!(
                    "Repetition levels are required, because max repetition level = {}",
                    self.descr.max_rep_level()
                )
            })?;

            if !levels.is_empty() && levels[0] != 0 {
                return Err(general_err!(
                    "Write must start at a record boundary, got non-zero repetition level of {}",
                    levels[0]
                ));
            }

            // Count the occasions where we start a new row
            for &level in levels {
                self.page_metrics.num_buffered_rows += (level == 0) as u32
            }

            // Update histogram
            self.page_metrics.update_repetition_level_histogram(levels);

            self.rep_levels_sink.extend_from_slice(levels);
        } else {
            // Each value is exactly one row.
            // Equals to the number of values, we count nulls as well.
            self.page_metrics.num_buffered_rows += num_levels as u32;
        }

        match value_indices {
            Some(indices) => {
                let indices = &indices[values_offset..values_offset + values_to_write];
                self.encoder.write_gather(values, indices)?;
            }
            None => self.encoder.write(values, values_offset, values_to_write)?,
        }

        self.page_metrics.num_buffered_values += num_levels as u32;

        if self.should_add_data_page() {
            self.add_data_page()?;
        }

        if self.should_dict_fallback() {
            self.dict_fallback()?;
        }

        Ok(values_to_write)
    }