datafusion/physical-expr/src/analysis.rs (159 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Interval and selectivity in [`AnalysisContext`]
use crate::expressions::Column;
use crate::intervals::cp_solver::PropagationResult;
use crate::intervals::{cardinality_ratio, ExprIntervalGraph, Interval, IntervalBound};
use crate::utils::collect_columns;
use crate::PhysicalExpr;
use arrow::datatypes::Schema;
use datafusion_common::{ColumnStatistics, DataFusionError, Result, ScalarValue};
use std::fmt::Debug;
use std::sync::Arc;
/// The shared context used during the analysis of an expression. Includes
/// the boundaries for all known columns.
#[derive(Clone, Debug, PartialEq)]
pub struct AnalysisContext {
// A list of known column boundaries, ordered by the index
// of the column in the current schema.
pub boundaries: Option<Vec<ExprBoundaries>>,
/// The estimated percentage of rows that this expression would select, if
/// it were to be used as a boolean predicate on a filter. The value will be
/// between 0.0 (selects nothing) and 1.0 (selects everything).
pub selectivity: Option<f64>,
}
impl AnalysisContext {
pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
Self {
boundaries: Some(boundaries),
selectivity: None,
}
}
pub fn with_selectivity(mut self, selectivity: f64) -> Self {
self.selectivity = Some(selectivity);
self
}
/// Create a new analysis context from column statistics.
pub fn from_statistics(
input_schema: &Schema,
statistics: &[ColumnStatistics],
) -> Self {
let mut column_boundaries = vec![];
for (idx, stats) in statistics.iter().enumerate() {
column_boundaries.push(ExprBoundaries::from_column(
stats,
input_schema.fields()[idx].name().clone(),
idx,
));
}
Self::new(column_boundaries)
}
}
/// Represents the boundaries of the resulting value from a physical expression,
/// if it were to be an expression, if it were to be evaluated.
#[derive(Clone, Debug, PartialEq)]
pub struct ExprBoundaries {
pub column: Column,
/// Minimum and maximum values this expression can have.
pub interval: Interval,
/// Maximum number of distinct values this expression can produce, if known.
pub distinct_count: Option<usize>,
}
impl ExprBoundaries {
/// Create a new `ExprBoundaries` object from column level statistics.
pub fn from_column(stats: &ColumnStatistics, col: String, index: usize) -> Self {
Self {
column: Column::new(&col, index),
interval: Interval::new(
IntervalBound::new(
stats.min_value.clone().unwrap_or(ScalarValue::Null),
false,
),
IntervalBound::new(
stats.max_value.clone().unwrap_or(ScalarValue::Null),
false,
),
),
distinct_count: stats.distinct_count,
}
}
}
/// Attempts to refine column boundaries and compute a selectivity value.
///
/// The function accepts boundaries of the input columns in the `context` parameter.
/// It then tries to tighten these boundaries based on the provided `expr`.
/// The resulting selectivity value is calculated by comparing the initial and final boundaries.
/// The computation assumes that the data within the column is uniformly distributed and not sorted.
///
/// # Arguments
///
/// * `context` - The context holding input column boundaries.
/// * `expr` - The expression used to shrink the column boundaries.
///
/// # Returns
///
/// * `AnalysisContext` constructed by pruned boundaries and a selectivity value.
pub fn analyze(
expr: &Arc<dyn PhysicalExpr>,
context: AnalysisContext,
) -> Result<AnalysisContext> {
let target_boundaries = context.boundaries.ok_or_else(|| {
DataFusionError::Internal("No column exists at the input to filter".to_string())
})?;
let mut graph = ExprIntervalGraph::try_new(expr.clone())?;
let columns: Vec<Arc<dyn PhysicalExpr>> = collect_columns(expr)
.into_iter()
.map(|c| Arc::new(c) as Arc<dyn PhysicalExpr>)
.collect();
let target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)> =
graph.gather_node_indices(columns.as_slice());
let mut target_indices_and_boundaries: Vec<(usize, Interval)> =
target_expr_and_indices
.iter()
.filter_map(|(expr, i)| {
target_boundaries.iter().find_map(|bound| {
expr.as_any()
.downcast_ref::<Column>()
.filter(|expr_column| bound.column.eq(*expr_column))
.map(|_| (*i, bound.interval.clone()))
})
})
.collect();
match graph.update_ranges(&mut target_indices_and_boundaries)? {
PropagationResult::Success => {
shrink_boundaries(expr, graph, target_boundaries, target_expr_and_indices)
}
PropagationResult::Infeasible => {
Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
}
PropagationResult::CannotPropagate => {
Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
}
}
}
/// If the `PropagationResult` indicates success, this function calculates the
/// selectivity value by comparing the initial and final column boundaries.
/// Following this, it constructs and returns a new `AnalysisContext` with the
/// updated parameters.
fn shrink_boundaries(
expr: &Arc<dyn PhysicalExpr>,
mut graph: ExprIntervalGraph,
mut target_boundaries: Vec<ExprBoundaries>,
target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
) -> Result<AnalysisContext> {
let initial_boundaries = target_boundaries.clone();
target_expr_and_indices.iter().for_each(|(expr, i)| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if let Some(bound) = target_boundaries
.iter_mut()
.find(|bound| bound.column.eq(column))
{
bound.interval = graph.get_interval(*i);
};
}
});
let graph_nodes = graph.gather_node_indices(&[expr.clone()]);
let (_, root_index) = graph_nodes.first().ok_or_else(|| {
DataFusionError::Internal("Error in constructing predicate graph".to_string())
})?;
let final_result = graph.get_interval(*root_index);
let selectivity = calculate_selectivity(
&final_result.lower.value,
&final_result.upper.value,
&target_boundaries,
&initial_boundaries,
)?;
if !(0.0..=1.0).contains(&selectivity) {
return Err(DataFusionError::Internal(format!(
"Selectivity is out of limit: {}",
selectivity
)));
}
Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity))
}
/// This function calculates the filter predicate's selectivity by comparing
/// the initial and pruned column boundaries. Selectivity is defined as the
/// ratio of rows in a table that satisfy the filter's predicate.
///
/// An exact propagation result at the root, i.e. `[true, true]` or `[false, false]`,
/// leads to early exit (returning a selectivity value of either 1.0 or 0.0). In such
/// a case, `[true, true]` indicates that all data values satisfy the predicate (hence,
/// selectivity is 1.0), and `[false, false]` suggests that no data value meets the
/// predicate (therefore, selectivity is 0.0).
fn calculate_selectivity(
lower_value: &ScalarValue,
upper_value: &ScalarValue,
target_boundaries: &[ExprBoundaries],
initial_boundaries: &[ExprBoundaries],
) -> Result<f64> {
match (lower_value, upper_value) {
(ScalarValue::Boolean(Some(true)), ScalarValue::Boolean(Some(true))) => Ok(1.0),
(ScalarValue::Boolean(Some(false)), ScalarValue::Boolean(Some(false))) => Ok(0.0),
_ => {
// Since the intervals are assumed uniform and the values
// are not correlated, we need to multiply the selectivities
// of multiple columns to get the overall selectivity.
target_boundaries.iter().enumerate().try_fold(
1.0,
|acc, (i, ExprBoundaries { interval, .. })| {
let temp =
cardinality_ratio(&initial_boundaries[i].interval, interval)?;
Ok(acc * temp)
},
)
}
}
}