in src/columnar_storage/src/read.rs [207:249]
fn new(
stream: SendableRecordBatchStream,
num_primary_keys: usize,
value_operator: MergeOperatorRef,
keep_builtin: bool,
) -> Self {
let arrow_schema = if keep_builtin {
let schema = stream.schema();
let found_seq = schema.fields().iter().any(|f| f.name() == SEQ_COLUMN_NAME);
assert!(found_seq, "Sequence column not found");
let found_reserved = schema
.fields()
.iter()
.any(|f| f.name() == RESERVED_COLUMN_NAME);
assert!(found_reserved, "Reserved column not found");
schema
} else {
let fields = stream
.schema()
.fields()
.into_iter()
.filter_map(|f| {
if StorageSchema::is_builtin_field(f) {
None
} else {
Some(f.clone())
}
})
.collect_vec();
Arc::new(Schema::new_with_metadata(
fields,
stream.schema().metadata.clone(),
))
};
Self {
stream,
num_primary_keys,
value_operator,
keep_builtin,
pending_batch: None,
arrow_schema,
}
}