in src/columnar_storage/src/read.rs [289:343]
fn merge_batch(&mut self, batch: RecordBatch) -> Result<Option<RecordBatch>> {
if batch.num_rows() == 0 {
return Ok(None);
}
// Group rows with the same primary keys
let mut groupby_pk_batches = Vec::new();
let mut start_idx = 0;
while start_idx < batch.num_rows() {
let mut end_idx = start_idx + 1;
while end_idx < batch.num_rows()
&& self.primary_key_eq(&batch, start_idx, &batch, end_idx)
{
end_idx += 1;
}
groupby_pk_batches.push(batch.slice(start_idx, end_idx - start_idx));
start_idx = end_idx;
}
let rows_with_same_primary_keys = &groupby_pk_batches[0];
let mut output_batches = Vec::new();
if let Some(pending) = self.pending_batch.take() {
if self.primary_key_eq(
&pending,
pending.num_rows() - 1,
rows_with_same_primary_keys,
0,
) {
groupby_pk_batches[0] = concat_batches(
&self.stream.schema(),
[&pending, rows_with_same_primary_keys],
)
.context("concat batch")?;
} else {
output_batches.push(self.value_operator.merge(pending)?);
}
}
// last batch may have overlapping rows with the next batch, so keep them in
// pending_batch
self.pending_batch = groupby_pk_batches.pop();
for batch in groupby_pk_batches {
output_batches.push(self.value_operator.merge(batch)?);
}
if output_batches.is_empty() {
return Ok(None);
}
let mut output_batches =
concat_batches(&self.stream.schema(), output_batches.iter()).context("concat batch")?;
self.maybe_remove_builtin_columns(&mut output_batches);
Ok(Some(output_batches))
}