fn new()

in src/columnar_storage/src/read.rs [207:249]


    fn new(
        stream: SendableRecordBatchStream,
        num_primary_keys: usize,
        value_operator: MergeOperatorRef,
        keep_builtin: bool,
    ) -> Self {
        let arrow_schema = if keep_builtin {
            let schema = stream.schema();
            let found_seq = schema.fields().iter().any(|f| f.name() == SEQ_COLUMN_NAME);
            assert!(found_seq, "Sequence column not found");
            let found_reserved = schema
                .fields()
                .iter()
                .any(|f| f.name() == RESERVED_COLUMN_NAME);
            assert!(found_reserved, "Reserved column not found");
            schema
        } else {
            let fields = stream
                .schema()
                .fields()
                .into_iter()
                .filter_map(|f| {
                    if StorageSchema::is_builtin_field(f) {
                        None
                    } else {
                        Some(f.clone())
                    }
                })
                .collect_vec();
            Arc::new(Schema::new_with_metadata(
                fields,
                stream.schema().metadata.clone(),
            ))
        };
        Self {
            stream,
            num_primary_keys,
            value_operator,
            keep_builtin,
            pending_batch: None,
            arrow_schema,
        }
    }