fn merge()

in src/columnar_storage/src/operator.rs [60:110]


    fn merge(&self, batch: RecordBatch) -> Result<RecordBatch> {
        assert!(batch.num_rows() > 0);

        for idx in &self.value_idxes {
            let data_type = batch.column(*idx).data_type();
            ensure!(
                data_type == &DataType::Binary,
                "MergeOperator is only used for binary column, current:{data_type}"
            );
        }
        debug!(batch = ?batch, "BytesMergeOperator merge");

        let schema = batch.schema();
        let columns = batch
            .columns()
            .iter()
            .enumerate()
            .map(|(idx, column)| {
                if self.value_idxes.contains(&idx) {
                    // For value column, we append all elements
                    let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();
                    if binary_array.is_empty() {
                       return column.clone();
                    }

                    let offsets = binary_array.offsets();
                    let start = offsets[0] as usize;
                    let length = offsets[offsets.len()-1] as usize - start;
                    if length == 0 {
                       return column.clone();
                    }

                    // bytes buffer is cheap for clone.
                    let byte_buffer = binary_array.values().slice_with_length(start,length). clone();
                    debug!(byte_buffer = ?byte_buffer, offset = ?offsets, "BytesMergeOperator merge");
                    let offsets = OffsetBuffer::from_lengths([byte_buffer.len()]);
                    let concated_column = BinaryArray::new(offsets, byte_buffer, None);
                    Arc::new(concated_column)
                } else {
                    // For other columns, we just take the first element since the primary key
                    // columns are the same.
                    column.slice(0, 1)
                }
            })
            .collect();

        let merged_batch = RecordBatch::try_new(schema, columns)
            .context("failed to construct RecordBatch in BytesMergeOperator.")?;

        Ok(merged_batch)
    }