in crates/core/src/merge/record_merger.rs [310:354]
fn test_merge_records_nulls() {
let schema = create_test_schema(true);
// First batch with some null timestamps
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["s1", "s1", "s1"])),
Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
Arc::new(Int32Array::from(vec![10, 20, 30])),
],
)
.unwrap();
// Second batch with updates and nulls
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["s2", "s2"])),
Arc::new(StringArray::from(vec!["k1", "k2"])),
Arc::new(Int32Array::from(vec![None, Some(5)])),
Arc::new(Int32Array::from(vec![40, 50])),
],
)
.unwrap();
let configs = create_configs("OVERWRITE_WITH_LATEST", true, Some("ts"));
let merger = RecordMerger::new(Arc::new(configs));
let merged = merger
.merge_record_batches(&schema, &[batch1, batch2])
.unwrap();
assert_eq!(merged.num_rows(), 3);
let result = get_sorted_rows(&merged);
assert_eq!(
result,
vec![
("s1".to_string(), "k1".to_string(), 1, 10), // Keep original since ts is null in 2nd batch
("s2".to_string(), "k2".to_string(), 5, 50), // Take second value due to higher ts
("s1".to_string(), "k3".to_string(), 3, 30), // Unchanged
]
);
}