in ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java [2828:3031]
private boolean validatePTFOperator(PTFOperator op, VectorizationContext vContext,
VectorPTFDesc vectorPTFDesc)
throws HiveException {
if (!isPtfVectorizationEnabled) {
setNodeIssue("Vectorization of PTF is not enabled (" +
HiveConf.ConfVars.HIVE_VECTORIZATION_PTF_ENABLED.varname + " IS false)");
return false;
}
PTFDesc ptfDesc = op.getConf();
boolean isMapSide = ptfDesc.isMapSide();
if (isMapSide) {
setOperatorIssue("PTF Mapper not supported");
return false;
}
List<Operator<? extends OperatorDesc>> ptfParents = op.getParentOperators();
if (ptfParents != null && ptfParents.size() > 0) {
Operator<? extends OperatorDesc> ptfParent = op.getParentOperators().get(0);
if (!(ptfParent instanceof ReduceSinkOperator)) {
boolean isReduceShufflePtf = false;
if (ptfParent instanceof SelectOperator) {
ptfParents = ptfParent.getParentOperators();
if (ptfParents == null || ptfParents.size() == 0) {
isReduceShufflePtf = true;
} else {
ptfParent = ptfParent.getParentOperators().get(0);
isReduceShufflePtf = (ptfParent instanceof ReduceSinkOperator);
}
}
if (!isReduceShufflePtf) {
setOperatorIssue("Only PTF directly under reduce-shuffle is supported");
return false;
}
}
}
boolean forNoop = ptfDesc.forNoop();
if (forNoop) {
setOperatorIssue("NOOP not supported");
return false;
}
boolean forWindowing = ptfDesc.forWindowing();
if (!forWindowing) {
setOperatorIssue("Windowing required");
return false;
}
PartitionedTableFunctionDef funcDef = ptfDesc.getFuncDef();
boolean isWindowTableFunctionDef = (funcDef instanceof WindowTableFunctionDef);
if (!isWindowTableFunctionDef) {
setOperatorIssue("Must be a WindowTableFunctionDef");
return false;
}
// We collect information in VectorPTFDesc that doesn't need the VectorizationContext.
// We use this information for validation. Later when creating the vector operator
// we create an additional object VectorPTFInfo.
try {
createVectorPTFDesc(
op, ptfDesc, vContext, vectorPTFDesc, vectorizedPTFMaxMemoryBufferingBatchCount);
} catch (HiveException e) {
setOperatorIssue("exception: " + VectorizationContext.getStackTraceAsSingleLine(e));
return false;
}
// Output columns ok?
String[] outputColumnNames = vectorPTFDesc.getOutputColumnNames();
TypeInfo[] outputTypeInfos = vectorPTFDesc.getOutputTypeInfos();
final int outputCount = outputColumnNames.length;
for (int i = 0; i < outputCount; i++) {
String typeName = outputTypeInfos[i].getTypeName();
boolean ret = validateDataType(typeName, VectorExpressionDescriptor.Mode.PROJECTION, /* allowComplex */ false);
if (!ret) {
setExpressionIssue("PTF Output Columns", "Data type " + typeName + " of column " + outputColumnNames[i] + " not supported");
return false;
}
}
boolean[] distinctEvaluator = vectorPTFDesc.getEvaluatorsAreDistinct();
String[] evaluatorFunctionNames = vectorPTFDesc.getEvaluatorFunctionNames();
final int count = evaluatorFunctionNames.length;
WindowFrameDef[] evaluatorWindowFrameDefs = vectorPTFDesc.getEvaluatorWindowFrameDefs();
List<ExprNodeDesc>[] evaluatorInputExprNodeDescLists = vectorPTFDesc.getEvaluatorInputExprNodeDescLists();
for (int i = 0; i < count; i++) {
String functionName = evaluatorFunctionNames[i].toLowerCase();
SupportedFunctionType supportedFunctionType = VectorPTFDesc.supportedFunctionsMap.get(functionName);
if (supportedFunctionType == null) {
setOperatorIssue(functionName + " not in supported functions " + VectorPTFDesc.supportedFunctionNames);
return false;
}
if (distinctEvaluator[i] && !supportedFunctionType.isSupportDistinct()) {
setOperatorIssue(functionName + " distinct is not supported ");
return false;
}
WindowFrameDef windowFrameDef = evaluatorWindowFrameDefs[i];
List<ExprNodeDesc> exprNodeDescList = evaluatorInputExprNodeDescLists[i];
final boolean isSingleParameter =
(exprNodeDescList != null &&
exprNodeDescList.size() == 1);
final ExprNodeDesc singleExprNodeDesc =
(isSingleParameter ? exprNodeDescList.get(0) : null);
final TypeInfo singleTypeInfo =
(isSingleParameter ? singleExprNodeDesc.getTypeInfo() : null);
final PrimitiveCategory singlePrimitiveCategory =
(singleTypeInfo instanceof PrimitiveTypeInfo ?
((PrimitiveTypeInfo) singleTypeInfo).getPrimitiveCategory() : null);
switch (windowFrameDef.getWindowType()) {
case RANGE:
if (!windowFrameDef.getEnd().isCurrentRow()) {
setOperatorIssue(functionName + " only CURRENT ROW end frame is supported for RANGE");
return false;
}
break;
case ROWS:
{
boolean isRowEndCurrent =
(windowFrameDef.getEnd().isCurrentRow() &&
(supportedFunctionType == SupportedFunctionType.AVG ||
supportedFunctionType == SupportedFunctionType.MAX ||
supportedFunctionType == SupportedFunctionType.MIN ||
supportedFunctionType == SupportedFunctionType.SUM) &&
isSingleParameter &&
singlePrimitiveCategory != null);
if (!isRowEndCurrent && !windowFrameDef.isEndUnbounded()) {
setOperatorIssue(
functionName + " UNBOUNDED end frame is required for ROWS window type");
return false;
}
}
break;
default:
throw new RuntimeException("Unexpected window type " + windowFrameDef.getWindowType());
}
// RANK/DENSE_RANK don't care about columns.
if (supportedFunctionType != SupportedFunctionType.RANK &&
supportedFunctionType != SupportedFunctionType.DENSE_RANK) {
if (exprNodeDescList != null) {
// LEAD and LAG now supports multiple arguments in vectorized mode
if (exprNodeDescList.size() > 1 && supportedFunctionType != SupportedFunctionType.LAG
&& supportedFunctionType != SupportedFunctionType.LEAD) {
setOperatorIssue(
"More than 1 argument expression of aggregation function " + functionName);
return false;
}
ExprNodeDesc exprNodeDesc = exprNodeDescList.get(0);
if (containsLeadLag(exprNodeDesc)) {
setOperatorIssue("lead and lag function not supported in argument expression of aggregation function " + functionName);
return false;
}
if (supportedFunctionType != SupportedFunctionType.COUNT) {
// COUNT does not care about column types. The rest do.
TypeInfo typeInfo = exprNodeDesc.getTypeInfo();
Category category = typeInfo.getCategory();
boolean isSupportedType;
if (category != Category.PRIMITIVE) {
isSupportedType = false;
} else {
ColumnVector.Type colVecType =
VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
switch (colVecType) {
case LONG:
case DOUBLE:
case DECIMAL:
isSupportedType = true;
break;
default:
isSupportedType = false;
break;
}
}
if (!isSupportedType) {
setOperatorIssue(typeInfo.getTypeName() + " data type not supported in argument expression of aggregation function " + functionName);
return false;
}
}
}
}
if (vectorPTFDesc.getOrderExprNodeDescs().length > 1) {
/*
* Currently, we need to rule out here all cases where a range boundary scanner can run,
* basically: 1. bounded start 2. bounded end which is not current row
*/
if (windowFrameDef.getWindowType() == WindowType.RANGE
&& (!windowFrameDef.isStartUnbounded()
|| !(windowFrameDef.getEnd().isCurrentRow() || windowFrameDef.isEndUnbounded()))) {
setOperatorIssue(
"Multi-column ordered RANGE boundary scanner is not supported in vectorized mode (window: "
+ windowFrameDef + ")");
return false;
}
}
}
return true;
}