in src/columnar_storage/src/operator.rs [60:110]
fn merge(&self, batch: RecordBatch) -> Result<RecordBatch> {
assert!(batch.num_rows() > 0);
for idx in &self.value_idxes {
let data_type = batch.column(*idx).data_type();
ensure!(
data_type == &DataType::Binary,
"MergeOperator is only used for binary column, current:{data_type}"
);
}
debug!(batch = ?batch, "BytesMergeOperator merge");
let schema = batch.schema();
let columns = batch
.columns()
.iter()
.enumerate()
.map(|(idx, column)| {
if self.value_idxes.contains(&idx) {
// For value column, we append all elements
let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();
if binary_array.is_empty() {
return column.clone();
}
let offsets = binary_array.offsets();
let start = offsets[0] as usize;
let length = offsets[offsets.len()-1] as usize - start;
if length == 0 {
return column.clone();
}
// bytes buffer is cheap for clone.
let byte_buffer = binary_array.values().slice_with_length(start,length). clone();
debug!(byte_buffer = ?byte_buffer, offset = ?offsets, "BytesMergeOperator merge");
let offsets = OffsetBuffer::from_lengths([byte_buffer.len()]);
let concated_column = BinaryArray::new(offsets, byte_buffer, None);
Arc::new(concated_column)
} else {
// For other columns, we just take the first element since the primary key
// columns are the same.
column.slice(0, 1)
}
})
.collect();
let merged_batch = RecordBatch::try_new(schema, columns)
.context("failed to construct RecordBatch in BytesMergeOperator.")?;
Ok(merged_batch)
}