fn merge_batch()

in src/columnar_storage/src/read.rs [289:343]


    fn merge_batch(&mut self, batch: RecordBatch) -> Result<Option<RecordBatch>> {
        if batch.num_rows() == 0 {
            return Ok(None);
        }

        // Group rows with the same primary keys
        let mut groupby_pk_batches = Vec::new();
        let mut start_idx = 0;
        while start_idx < batch.num_rows() {
            let mut end_idx = start_idx + 1;
            while end_idx < batch.num_rows()
                && self.primary_key_eq(&batch, start_idx, &batch, end_idx)
            {
                end_idx += 1;
            }
            groupby_pk_batches.push(batch.slice(start_idx, end_idx - start_idx));
            start_idx = end_idx;
        }

        let rows_with_same_primary_keys = &groupby_pk_batches[0];
        let mut output_batches = Vec::new();
        if let Some(pending) = self.pending_batch.take() {
            if self.primary_key_eq(
                &pending,
                pending.num_rows() - 1,
                rows_with_same_primary_keys,
                0,
            ) {
                groupby_pk_batches[0] = concat_batches(
                    &self.stream.schema(),
                    [&pending, rows_with_same_primary_keys],
                )
                .context("concat batch")?;
            } else {
                output_batches.push(self.value_operator.merge(pending)?);
            }
        }

        // last batch may have overlapping rows with the next batch, so keep them in
        // pending_batch
        self.pending_batch = groupby_pk_batches.pop();

        for batch in groupby_pk_batches {
            output_batches.push(self.value_operator.merge(batch)?);
        }
        if output_batches.is_empty() {
            return Ok(None);
        }

        let mut output_batches =
            concat_batches(&self.stream.schema(), output_batches.iter()).context("concat batch")?;

        self.maybe_remove_builtin_columns(&mut output_batches);
        Ok(Some(output_batches))
    }