fn test_merge_manifest()

in src/columnar_storage/src/manifest/mod.rs [457:508]


    fn test_merge_manifest() {
        let root_dir = temp_dir::TempDir::new()
            .unwrap()
            .path()
            .to_string_lossy()
            .to_string();
        let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
        let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
        let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
        let rt = runtime.clone();

        rt.block_on(async move {
            let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
            let manifest = Manifest::try_new(
                root_dir,
                store.clone(),
                runtime.clone(),
                ManifestConfig {
                    merge_interval_seconds: 1,
                    ..Default::default()
                },
            )
            .await
            .unwrap();

            // Add manifest files
            for i in 0..20 {
                let time_range = (i..i + 1).into();
                let meta = FileMeta {
                    max_sequence: i as u64,
                    num_rows: i as u32,
                    size: i as u32,
                    time_range,
                };
                manifest.add_file(i as u64, meta).await.unwrap();
            }

            // Wait for merge manifest to finish
            sleep(Duration::from_secs(2)).await;

            let mut mem_ssts = manifest.ssts.read().await.clone();
            let snapshot = read_snapshot(&store, &snapshot_path).await.unwrap();
            let mut ssts = snapshot.into_ssts();

            mem_ssts.sort_by_key(|a| a.id());
            ssts.sort_by_key(|a| a.id());
            assert_eq!(mem_ssts, ssts);

            let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
            assert!(delta_paths.is_empty());
        })
    }