in datafusion/datasource/src/file_scan_config.rs [1889:2160]
fn test_split_groups_by_statistics() -> Result<()> {
use chrono::TimeZone;
use datafusion_common::DFSchema;
use datafusion_expr::execution_props::ExecutionProps;
use object_store::{path::Path, ObjectMeta};
struct File {
name: &'static str,
date: &'static str,
statistics: Vec<Option<(f64, f64)>>,
}
impl File {
fn new(
name: &'static str,
date: &'static str,
statistics: Vec<Option<(f64, f64)>>,
) -> Self {
Self {
name,
date,
statistics,
}
}
}
struct TestCase {
name: &'static str,
file_schema: Schema,
files: Vec<File>,
sort: Vec<SortExpr>,
expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
}
use datafusion_expr::col;
let cases = vec![
TestCase {
name: "test sort",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
// same input but file '2' is in the middle
// test that we still order correctly
TestCase {
name: "test sort with files ordered differently",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "reverse sort",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(false, true)],
expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
},
// reject nullable sort columns
TestCase {
name: "no nullable sort columns",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
true, // should fail because nullable
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
},
TestCase {
name: "all three non-overlapping",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1", "2"]]),
},
TestCase {
name: "all three overlapping",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
},
TestCase {
name: "empty input",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![]),
},
TestCase {
name: "one file missing statistics",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![None]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
},
];
for case in cases {
let table_schema = Arc::new(Schema::new(
case.file_schema
.fields()
.clone()
.into_iter()
.cloned()
.chain(Some(Arc::new(Field::new(
"date".to_string(),
DataType::Utf8,
false,
))))
.collect::<Vec<_>>(),
));
let sort_order = LexOrdering::from(
case.sort
.into_iter()
.map(|expr| {
create_physical_sort_expr(
&expr,
&DFSchema::try_from(table_schema.as_ref().clone())?,
&ExecutionProps::default(),
)
})
.collect::<Result<Vec<_>>>()?,
);
let partitioned_files = FileGroup::new(
case.files.into_iter().map(From::from).collect::<Vec<_>>(),
);
let result = FileScanConfig::split_groups_by_statistics(
&table_schema,
&[partitioned_files.clone()],
&sort_order,
);
let results_by_name = result
.as_ref()
.map(|file_groups| {
file_groups
.iter()
.map(|file_group| {
file_group
.iter()
.map(|file| {
partitioned_files
.iter()
.find_map(|f| {
if f.object_meta == file.object_meta {
Some(
f.object_meta
.location
.as_ref()
.rsplit('/')
.next()
.unwrap()
.trim_end_matches(".parquet"),
)
} else {
None
}
})
.unwrap()
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
})
.map_err(|e| e.strip_backtrace().leak() as &'static str);
assert_eq!(results_by_name, case.expected_result, "{}", case.name);
}
return Ok(());
impl From<File> for PartitionedFile {
fn from(file: File) -> Self {
PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(format!(
"data/date={}/{}.parquet",
file.date, file.name
)),
last_modified: chrono::Utc.timestamp_nanos(0),
size: 0,
e_tag: None,
version: None,
},
partition_values: vec![ScalarValue::from(file.date)],
range: None,
statistics: Some(Arc::new(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: file
.statistics
.into_iter()
.map(|stats| {
stats
.map(|(min, max)| ColumnStatistics {
min_value: Precision::Exact(ScalarValue::from(
min,
)),
max_value: Precision::Exact(ScalarValue::from(
max,
)),
..Default::default()
})
.unwrap_or_default()
})
.collect::<Vec<_>>(),
})),
extensions: None,
metadata_size_hint: None,
}
}
}
}