in src/columnar_storage/src/manifest/mod.rs [457:508]
fn test_merge_manifest() {
let root_dir = temp_dir::TempDir::new()
.unwrap()
.path()
.to_string_lossy()
.to_string();
let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
let rt = runtime.clone();
rt.block_on(async move {
let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
let manifest = Manifest::try_new(
root_dir,
store.clone(),
runtime.clone(),
ManifestConfig {
merge_interval_seconds: 1,
..Default::default()
},
)
.await
.unwrap();
// Add manifest files
for i in 0..20 {
let time_range = (i..i + 1).into();
let meta = FileMeta {
max_sequence: i as u64,
num_rows: i as u32,
size: i as u32,
time_range,
};
manifest.add_file(i as u64, meta).await.unwrap();
}
// Wait for merge manifest to finish
sleep(Duration::from_secs(2)).await;
let mut mem_ssts = manifest.ssts.read().await.clone();
let snapshot = read_snapshot(&store, &snapshot_path).await.unwrap();
let mut ssts = snapshot.into_ssts();
mem_ssts.sort_by_key(|a| a.id());
ssts.sort_by_key(|a| a.id());
assert_eq!(mem_ssts, ssts);
let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
assert!(delta_paths.is_empty());
})
}