in native/core/src/execution/planner.rs [1919:2087]
fn create_window_expr<'a>(
&'a self,
spark_expr: &'a spark_operator::WindowExpr,
input_schema: SchemaRef,
partition_by: &[Arc<dyn PhysicalExpr>],
sort_exprs: &[PhysicalSortExpr],
) -> Result<Arc<dyn WindowExpr>, ExecutionError> {
let window_func_name: String;
let window_args: Vec<Arc<dyn PhysicalExpr>>;
if let Some(func) = &spark_expr.built_in_window_function {
match &func.expr_struct {
Some(ExprStruct::ScalarFunc(f)) => {
window_func_name = f.func.clone();
window_args = f
.args
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&input_schema)))
.collect::<Result<Vec<_>, ExecutionError>>()?;
}
other => {
return Err(GeneralError(format!(
"{other:?} not supported for window function"
)))
}
};
} else if let Some(agg_func) = &spark_expr.agg_func {
let result = self.process_agg_func(agg_func, Arc::clone(&input_schema))?;
window_func_name = result.0;
window_args = result.1;
} else {
return Err(GeneralError(
"Both func and agg_func are not set".to_string(),
));
}
let window_func = match self.find_df_window_function(&window_func_name) {
Some(f) => f,
_ => {
return Err(GeneralError(format!(
"{window_func_name} not supported for window function"
)))
}
};
let spark_window_frame = match spark_expr
.spec
.as_ref()
.and_then(|inner| inner.frame_specification.as_ref())
{
Some(frame) => frame,
_ => {
return Err(ExecutionError::DeserializeError(
"Cannot deserialize window frame".to_string(),
))
}
};
let units = match spark_window_frame.frame_type() {
WindowFrameType::Rows => WindowFrameUnits::Rows,
WindowFrameType::Range => WindowFrameUnits::Range,
};
let lower_bound: WindowFrameBound = match spark_window_frame
.lower_bound
.as_ref()
.and_then(|inner| inner.lower_frame_bound_struct.as_ref())
{
Some(l) => match l {
LowerFrameBoundStruct::UnboundedPreceding(_) => match units {
WindowFrameUnits::Rows => {
WindowFrameBound::Preceding(ScalarValue::UInt64(None))
}
WindowFrameUnits::Range => {
WindowFrameBound::Preceding(ScalarValue::Int64(None))
}
WindowFrameUnits::Groups => {
return Err(GeneralError(
"WindowFrameUnits::Groups is not supported.".to_string(),
));
}
},
LowerFrameBoundStruct::Preceding(offset) => {
let offset_value = offset.offset.abs();
match units {
WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(
Some(offset_value as u64),
)),
WindowFrameUnits::Range => {
WindowFrameBound::Preceding(ScalarValue::Int64(Some(offset_value)))
}
WindowFrameUnits::Groups => {
return Err(GeneralError(
"WindowFrameUnits::Groups is not supported.".to_string(),
));
}
}
}
LowerFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow,
},
None => match units {
WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
WindowFrameUnits::Range => WindowFrameBound::Preceding(ScalarValue::Int64(None)),
WindowFrameUnits::Groups => {
return Err(GeneralError(
"WindowFrameUnits::Groups is not supported.".to_string(),
));
}
},
};
let upper_bound: WindowFrameBound = match spark_window_frame
.upper_bound
.as_ref()
.and_then(|inner| inner.upper_frame_bound_struct.as_ref())
{
Some(u) => match u {
UpperFrameBoundStruct::UnboundedFollowing(_) => match units {
WindowFrameUnits::Rows => {
WindowFrameBound::Following(ScalarValue::UInt64(None))
}
WindowFrameUnits::Range => {
WindowFrameBound::Following(ScalarValue::Int64(None))
}
WindowFrameUnits::Groups => {
return Err(GeneralError(
"WindowFrameUnits::Groups is not supported.".to_string(),
));
}
},
UpperFrameBoundStruct::Following(offset) => match units {
WindowFrameUnits::Rows => {
WindowFrameBound::Following(ScalarValue::UInt64(Some(offset.offset as u64)))
}
WindowFrameUnits::Range => {
WindowFrameBound::Following(ScalarValue::Int64(Some(offset.offset)))
}
WindowFrameUnits::Groups => {
return Err(GeneralError(
"WindowFrameUnits::Groups is not supported.".to_string(),
));
}
},
UpperFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow,
},
None => match units {
WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)),
WindowFrameUnits::Range => WindowFrameBound::Following(ScalarValue::Int64(None)),
WindowFrameUnits::Groups => {
return Err(GeneralError(
"WindowFrameUnits::Groups is not supported.".to_string(),
));
}
},
};
let window_frame = WindowFrame::new_bounds(units, lower_bound, upper_bound);
datafusion::physical_plan::windows::create_window_expr(
&window_func,
window_func_name,
&window_args,
partition_by,
&LexOrdering::new(sort_exprs.to_vec()),
window_frame.into(),
input_schema.as_ref(),
false, // TODO: Ignore nulls
)
.map_err(|e| ExecutionError::DataFusionError(e.to_string()))
}