in crates/core/src/table/mod.rs [1531:1600]
fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> {
for base_url in SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
let hudi_table = Table::new_blocking(base_url.path())?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
.iter()
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
assert_eq!(commit_timestamps.len(), 3);
let first_commit = commit_timestamps[0];
let second_commit = commit_timestamps[1];
let third_commit = commit_timestamps[2];
// read records changed from the beginning to the 1st commit
let records = hudi_table
.read_incremental_records_blocking("19700101000000", Some(first_commit))?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", true),],
"Should return 3 records inserted in the 1st commit"
);
// read records changed from the 1st to the 2nd commit
let records = hudi_table
.read_incremental_records_blocking(first_commit, Some(second_commit))?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(1, "Alice", false), (4, "Diana", true),],
"Should return 2 records inserted or updated in the 2nd commit"
);
// read records changed from the 2nd to the 3rd commit
let records = hudi_table
.read_incremental_records_blocking(second_commit, Some(third_commit))?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(4, "Diana", false),],
"Should return 1 record insert-overwritten in the 3rd commit"
);
// read records changed from the 1st commit
let records = hudi_table.read_incremental_records_blocking(first_commit, None)?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(4, "Diana", false),],
"Should return 1 record insert-overwritten in the 3rd commit"
);
// read records changed from the 3rd commit
let records = hudi_table.read_incremental_records_blocking(third_commit, None)?;
assert!(
records.is_empty(),
"Should return 0 record as it's the latest commit"
);
}
Ok(())
}