in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java [1695:1804]
public static AggInfo getHiveAggInfo(
HiveParserASTNode aggAst,
int aggFnLstArgIndx,
HiveParserRowResolver inputRR,
HiveParserWindowingSpec.WindowFunctionSpec winFuncSpec,
HiveParserSemanticAnalyzer semanticAnalyzer,
FrameworkConfig frameworkConfig,
RelOptCluster cluster)
throws SemanticException {
AggInfo aInfo;
// 1 Convert UDAF Params to ExprNodeDesc
ArrayList<ExprNodeDesc> aggParameters = new ArrayList<>();
for (int i = 1; i <= aggFnLstArgIndx; i++) {
HiveParserASTNode paraExpr = (HiveParserASTNode) aggAst.getChild(i);
ExprNodeDesc paraExprNode = semanticAnalyzer.genExprNodeDesc(paraExpr, inputRR);
aggParameters.add(paraExprNode);
}
// 2. Is this distinct UDAF
boolean isDistinct = aggAst.getType() == HiveASTParser.TOK_FUNCTIONDI;
// 3. Determine type of UDAF
TypeInfo udafRetType = null;
// 3.1 Obtain UDAF name
String aggName = unescapeIdentifier(aggAst.getChild(0).getText());
boolean isAllColumns = false;
// 3.2 Rank functions type is 'int'/'double'
if (FunctionRegistry.isRankingFunction(aggName)) {
if (aggName.equalsIgnoreCase("percent_rank")) {
udafRetType = TypeInfoFactory.doubleTypeInfo;
} else {
udafRetType = TypeInfoFactory.intTypeInfo;
}
// set arguments for rank functions
for (OrderExpression orderExpr : winFuncSpec.windowSpec.getOrder().getExpressions()) {
aggParameters.add(
semanticAnalyzer.genExprNodeDesc(orderExpr.getExpression(), inputRR));
}
} else {
// 3.3 Try obtaining UDAF evaluators to determine the ret type
try {
isAllColumns = aggAst.getType() == HiveASTParser.TOK_FUNCTIONSTAR;
// 3.3.1 Get UDAF Evaluator
GenericUDAFEvaluator.Mode amode =
HiveParserUtils.groupByDescModeToUDAFMode(
GroupByDesc.Mode.COMPLETE, isDistinct);
GenericUDAFEvaluator genericUDAFEvaluator;
if (aggName.equalsIgnoreCase(FunctionRegistry.LEAD_FUNC_NAME)
|| aggName.equalsIgnoreCase(FunctionRegistry.LAG_FUNC_NAME)) {
ArrayList<ObjectInspector> originalParameterTypeInfos =
HiveParserUtils.getWritableObjectInspector(aggParameters);
genericUDAFEvaluator =
FunctionRegistry.getGenericWindowingEvaluator(
aggName, originalParameterTypeInfos, isDistinct, isAllColumns);
HiveParserBaseSemanticAnalyzer.GenericUDAFInfo udaf =
HiveParserUtils.getGenericUDAFInfo(
genericUDAFEvaluator, amode, aggParameters);
udafRetType = ((ListTypeInfo) udaf.returnType).getListElementTypeInfo();
} else {
genericUDAFEvaluator =
HiveParserUtils.getGenericUDAFEvaluator(
aggName,
aggParameters,
aggAst,
isDistinct,
isAllColumns,
frameworkConfig.getOperatorTable());
// 3.3.2 Get UDAF Info using UDAF Evaluator
HiveParserBaseSemanticAnalyzer.GenericUDAFInfo udaf =
HiveParserUtils.getGenericUDAFInfo(
genericUDAFEvaluator, amode, aggParameters);
if (HiveParserUtils.pivotResult(aggName)) {
udafRetType = ((ListTypeInfo) udaf.returnType).getListElementTypeInfo();
} else {
udafRetType = udaf.returnType;
}
}
} catch (Exception e) {
LOG.debug(
"CBO: Couldn't Obtain UDAF evaluators for "
+ aggName
+ ", trying to translate to GenericUDF");
}
// 3.4 Try GenericUDF translation
if (udafRetType == null) {
HiveParserTypeCheckCtx tcCtx =
new HiveParserTypeCheckCtx(inputRR, frameworkConfig, cluster);
// We allow stateful functions in the SELECT list (but nowhere else)
tcCtx.setAllowStatefulFunctions(true);
tcCtx.setAllowDistinctFunctions(false);
ExprNodeDesc exp =
semanticAnalyzer.genExprNodeDesc(
(HiveParserASTNode) aggAst.getChild(0), inputRR, tcCtx);
udafRetType = exp.getTypeInfo();
}
}
// 4. Construct AggInfo
aInfo = new AggInfo(aggParameters, udafRetType, aggName, isDistinct, isAllColumns, null);
return aInfo;
}