in cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc [617:734]
bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WindowRel& windowRel) {
if (windowRel.has_input() && !validate(windowRel.input())) {
LOG_VALIDATION_MSG("WindowRel input fails to validate.");
return false;
}
// Get and validate the input types from extension.
if (!windowRel.has_advanced_extension()) {
LOG_VALIDATION_MSG("Input types are expected in WindowRel.");
return false;
}
const auto& extension = windowRel.advanced_extension();
TypePtr inputRowType;
std::vector<TypePtr> types;
if (!parseVeloxType(extension, inputRowType) || !flattenSingleLevel(inputRowType, types)) {
LOG_VALIDATION_MSG("Validation failed for input types in WindowRel.");
return false;
}
if (types.empty()) {
// See: https://github.com/apache/incubator-gluten/issues/7600.
LOG_VALIDATION_MSG("Validation failed for empty input schema in WindowRel.");
return false;
}
int32_t inputPlanNodeId = 0;
std::vector<std::string> names;
names.reserve(types.size());
for (auto colIdx = 0; colIdx < types.size(); colIdx++) {
names.emplace_back(SubstraitParser::makeNodeName(inputPlanNodeId, colIdx));
}
auto rowType = std::make_shared<RowType>(std::move(names), std::move(types));
// Validate WindowFunction
std::vector<std::string> funcSpecs;
funcSpecs.reserve(windowRel.measures().size());
for (const auto& smea : windowRel.measures()) {
const auto& windowFunction = smea.measure();
funcSpecs.emplace_back(planConverter_.findFuncSpec(windowFunction.function_reference()));
SubstraitParser::parseType(windowFunction.output_type());
for (const auto& arg : windowFunction.arguments()) {
auto typeCase = arg.value().rex_type_case();
switch (typeCase) {
case ::substrait::Expression::RexTypeCase::kSelection:
case ::substrait::Expression::RexTypeCase::kLiteral:
break;
default:
LOG_VALIDATION_MSG("Only field or constant is supported in window functions.");
return false;
}
}
// Validate BoundType and Frame Type
switch (windowFunction.window_type()) {
case ::substrait::WindowType::ROWS:
case ::substrait::WindowType::RANGE:
break;
default:
LOG_VALIDATION_MSG(
"the window type only support ROWS and RANGE, and the input type is " +
std::to_string(windowFunction.window_type()));
return false;
}
bool boundTypeSupported =
validateBoundType(windowFunction.upper_bound()) && validateBoundType(windowFunction.lower_bound());
if (!boundTypeSupported) {
LOG_VALIDATION_MSG(
"Found unsupported Bound Type: upper " + std::to_string(windowFunction.upper_bound().kind_case()) +
", lower " + std::to_string(windowFunction.lower_bound().kind_case()));
return false;
}
}
// Validate groupby expression
const auto& groupByExprs = windowRel.partition_expressions();
std::vector<core::TypedExprPtr> expressions;
expressions.reserve(groupByExprs.size());
for (const auto& expr : groupByExprs) {
auto expression = exprConverter_->toVeloxExpr(expr, rowType);
auto exprField = dynamic_cast<const core::FieldAccessTypedExpr*>(expression.get());
if (exprField == nullptr) {
LOG_VALIDATION_MSG("Only field is supported for partition key in Window Operator!");
return false;
} else {
expressions.emplace_back(expression);
}
}
// Try to compile the expressions. If there is any unregistred funciton or
// mismatched type, exception will be thrown.
exec::ExprSet exprSet(std::move(expressions), execCtx_.get());
// Validate Sort expression
const auto& sorts = windowRel.sorts();
for (const auto& sort : sorts) {
switch (sort.direction()) {
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_LAST:
case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_FIRST:
case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_LAST:
break;
default:
LOG_VALIDATION_MSG("in windowRel, unsupported Sort direction " + std::to_string(sort.direction()));
return false;
}
if (sort.has_expr()) {
auto expression = exprConverter_->toVeloxExpr(sort.expr(), rowType);
auto exprField = dynamic_cast<const core::FieldAccessTypedExpr*>(expression.get());
if (!exprField) {
LOG_VALIDATION_MSG("in windowRel, the sorting key in Sort Operator only support field.");
return false;
}
exec::ExprSet exprSet1({std::move(expression)}, execCtx_.get());
}
}
return true;
}