in crates/iceberg/src/spec/snapshot_summary.rs [866:1011]
fn test_snapshot_summary_collector_merge() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);
let partition_spec = Arc::new(
PartitionSpec::builder(schema.clone())
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(2)
.name("year".to_string())
.transform(Transform::Identity)
.build()])
.unwrap()
.with_spec_id(1)
.build()
.unwrap(),
);
let mut summary_one = SnapshotSummaryCollector::default();
let mut summary_two = SnapshotSummaryCollector::default();
summary_one.add_file(
&DataFile {
content: DataContentType::Data,
file_path: "test.parquet".into(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(vec![]),
record_count: 10,
file_size_in_bytes: 100,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
partition_spec_id: 0,
first_row_id: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
},
schema.clone(),
partition_spec.clone(),
);
summary_two.add_file(
&DataFile {
content: DataContentType::Data,
file_path: "test.parquet".into(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(vec![]),
record_count: 20,
file_size_in_bytes: 200,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
partition_spec_id: 0,
first_row_id: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
},
schema.clone(),
partition_spec.clone(),
);
summary_one.merge(summary_two);
let props = summary_one.build();
assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
assert_eq!(props.get(ADDED_RECORDS).unwrap(), "30");
let mut summary_three = SnapshotSummaryCollector::default();
let mut summary_four = SnapshotSummaryCollector::default();
summary_three.add_manifest(&ManifestFile {
manifest_path: "test.manifest".to_string(),
manifest_length: 0,
partition_spec_id: 0,
content: ManifestContentType::Data,
sequence_number: 0,
min_sequence_number: 0,
added_snapshot_id: 0,
added_files_count: Some(1),
existing_files_count: Some(0),
deleted_files_count: Some(0),
added_rows_count: Some(5),
existing_rows_count: Some(0),
deleted_rows_count: Some(0),
partitions: Vec::new(),
key_metadata: Vec::new(),
});
summary_four.add_file(
&DataFile {
content: DataContentType::Data,
file_path: "test.parquet".into(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(vec![]),
record_count: 1,
file_size_in_bytes: 10,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
partition_spec_id: 0,
first_row_id: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
},
schema.clone(),
partition_spec.clone(),
);
summary_three.merge(summary_four);
let props = summary_three.build();
assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
assert_eq!(props.get(ADDED_RECORDS).unwrap(), "6");
assert!(props
.iter()
.all(|(k, _)| !k.starts_with(CHANGED_PARTITION_PREFIX)));
}