fn test_storage_write_and_scan()

in src/columnar_storage/src/storage.rs [392:491]


    fn test_storage_write_and_scan() {
        let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", Int64));
        let root_dir = temp_dir::TempDir::new().unwrap();
        let store = Arc::new(LocalFileSystem::new());
        let runtimes = build_runtimes();
        runtimes.sst_compact_runtime.clone().block_on(async move {
            let storage = ObjectBasedStorage::try_new(
                root_dir.path().to_string_lossy().to_string(),
                Duration::from_hours(2),
                store,
                schema.clone(),
                2, // num_primary_keys
                StorageConfig::default(),
                runtimes,
            )
            .await
            .unwrap();

            let batch = record_batch!(
                ("pk1", UInt8, vec![11, 11, 9, 10, 5]),
                ("pk2", UInt8, vec![100, 100, 1, 2, 3]),
                ("value", Int64, vec![2, 7, 4, 6, 1])
            )
            .unwrap();
            storage
                .write(WriteRequest {
                    batch,
                    time_range: (1..10).into(),
                    enable_check: true,
                })
                .await
                .unwrap();

            let batch = record_batch!(
                ("pk1", UInt8, vec![11, 11, 9, 10]),
                ("pk2", UInt8, vec![100, 99, 1, 2]),
                ("value", Int64, vec![22, 77, 44, 66])
            )
            .unwrap();
            storage
                .write(WriteRequest {
                    batch,
                    time_range: (10..20).into(),
                    enable_check: true,
                })
                .await
                .unwrap();

            let result_stream = storage
                .scan(ScanRequest {
                    range: TimeRange::new(Timestamp(0), Timestamp::MAX),
                    predicate: vec![],
                    projections: None,
                })
                .await
                .unwrap();
            let expected_batch = [
                record_batch!(
                    ("pk1", UInt8, vec![5, 9, 10, 11]),
                    ("pk2", UInt8, vec![3, 1, 2, 99]),
                    ("value", Int64, vec![1, 44, 66, 77])
                )
                .unwrap(),
                record_batch!(
                    ("pk1", UInt8, vec![11]),
                    ("pk2", UInt8, vec![100]),
                    ("value", Int64, vec![22])
                )
                .unwrap(),
            ];

            check_stream(result_stream, expected_batch).await;

            // test with predicate
            let expr = col("pk1").eq(lit(11_u8));
            let result_stream = storage
                .scan(ScanRequest {
                    range: TimeRange::new(Timestamp(0), Timestamp::MAX),
                    predicate: vec![expr],
                    projections: None,
                })
                .await
                .unwrap();
            let expected_batch = [
                record_batch!(
                    ("pk1", UInt8, vec![11]),
                    ("pk2", UInt8, vec![99]),
                    ("value", Int64, vec![77])
                )
                .unwrap(),
                record_batch!(
                    ("pk1", UInt8, vec![11]),
                    ("pk2", UInt8, vec![100]),
                    ("value", Int64, vec![22])
                )
                .unwrap(),
            ];
            check_stream(result_stream, expected_batch).await;
        });
    }