in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [1226:1396]
private RelNode genGBLogicalPlan(HiveParserQB qb, RelNode srcRel) throws SemanticException {
RelNode gbRel = null;
HiveParserQBParseInfo qbp = qb.getParseInfo();
// 1. Gather GB Expressions (AST) (GB + Aggregations)
// NOTE: Multi Insert is not supported
String detsClauseName = qbp.getClauseNames().iterator().next();
HiveParserASTNode selExprList = qb.getParseInfo().getSelForClause(detsClauseName);
HiveParserSubQueryUtils.checkForTopLevelSubqueries(selExprList);
if (selExprList.getToken().getType() == HiveASTParser.TOK_SELECTDI
&& selExprList.getChildCount() == 1
&& selExprList.getChild(0).getChildCount() == 1) {
HiveParserASTNode node = (HiveParserASTNode) selExprList.getChild(0).getChild(0);
if (node.getToken().getType() == HiveASTParser.TOK_ALLCOLREF) {
srcRel = genSelectLogicalPlan(qb, srcRel, srcRel, null, null);
HiveParserRowResolver rr = relToRowResolver.get(srcRel);
qbp.setSelExprForClause(detsClauseName, HiveParserUtils.genSelectDIAST(rr));
}
}
// Select DISTINCT + windowing; GBy handled by genSelectForWindowing
if (selExprList.getToken().getType() == HiveASTParser.TOK_SELECTDI
&& !qb.getAllWindowingSpecs().isEmpty()) {
return null;
}
List<HiveParserASTNode> gbAstExprs = getGroupByForClause(qbp, detsClauseName);
HashMap<String, HiveParserASTNode> aggregationTrees =
qbp.getAggregationExprsForClause(detsClauseName);
boolean hasGrpByAstExprs = !gbAstExprs.isEmpty();
boolean hasAggregationTrees = aggregationTrees != null && !aggregationTrees.isEmpty();
final boolean cubeRollupGrpSetPresent =
!qbp.getDestRollups().isEmpty()
|| !qbp.getDestGroupingSets().isEmpty()
|| !qbp.getDestCubes().isEmpty();
// 2. Sanity check
if (semanticAnalyzer.getConf().getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
&& qbp.getDistinctFuncExprsForClause(detsClauseName).size() > 1) {
throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.getMsg());
}
if (hasGrpByAstExprs || hasAggregationTrees) {
ArrayList<ExprNodeDesc> gbExprNodeDescs = new ArrayList<>();
ArrayList<String> outputColNames = new ArrayList<>();
// 3. Input, Output Row Resolvers
HiveParserRowResolver inputRR = relToRowResolver.get(srcRel);
HiveParserRowResolver outputRR = new HiveParserRowResolver();
outputRR.setIsExprResolver(true);
if (hasGrpByAstExprs) {
// 4. Construct GB Keys (ExprNode)
for (HiveParserASTNode gbAstExpr : gbAstExprs) {
Map<HiveParserASTNode, ExprNodeDesc> astToExprNodeDesc =
semanticAnalyzer.genAllExprNodeDesc(gbAstExpr, inputRR);
ExprNodeDesc grpbyExprNDesc = astToExprNodeDesc.get(gbAstExpr);
if (grpbyExprNDesc == null) {
throw new SemanticException(
"Invalid Column Reference: " + gbAstExpr.dump());
}
addToGBExpr(
outputRR,
inputRR,
gbAstExpr,
grpbyExprNDesc,
gbExprNodeDescs,
outputColNames);
}
}
// 5. GroupingSets, Cube, Rollup
int numGroupCols = gbExprNodeDescs.size();
List<Integer> groupingSets = null;
if (cubeRollupGrpSetPresent) {
if (qbp.getDestRollups().contains(detsClauseName)) {
groupingSets = getGroupingSetsForRollup(gbAstExprs.size());
} else if (qbp.getDestCubes().contains(detsClauseName)) {
groupingSets = getGroupingSetsForCube(gbAstExprs.size());
} else if (qbp.getDestGroupingSets().contains(detsClauseName)) {
groupingSets = getGroupingSets(gbAstExprs, qbp, detsClauseName);
}
}
// 6. Construct aggregation function Info
ArrayList<AggInfo> aggInfos = new ArrayList<>();
if (hasAggregationTrees) {
for (HiveParserASTNode value : aggregationTrees.values()) {
// 6.1 Determine type of UDAF
// This is the GenericUDAF name
String aggName = unescapeIdentifier(value.getChild(0).getText());
boolean isDistinct = value.getType() == HiveASTParser.TOK_FUNCTIONDI;
boolean isAllColumns = value.getType() == HiveASTParser.TOK_FUNCTIONSTAR;
// 6.2 Convert UDAF Params to ExprNodeDesc
ArrayList<ExprNodeDesc> aggParameters = new ArrayList<>();
for (int i = 1; i < value.getChildCount(); i++) {
HiveParserASTNode paraExpr = (HiveParserASTNode) value.getChild(i);
ExprNodeDesc paraExprNode =
semanticAnalyzer.genExprNodeDesc(paraExpr, inputRR);
aggParameters.add(paraExprNode);
}
GenericUDAFEvaluator.Mode aggMode =
HiveParserUtils.groupByDescModeToUDAFMode(
GroupByDesc.Mode.COMPLETE, isDistinct);
GenericUDAFEvaluator genericUDAFEvaluator =
HiveParserUtils.getGenericUDAFEvaluator(
aggName,
aggParameters,
value,
isDistinct,
isAllColumns,
frameworkConfig.getOperatorTable());
assert (genericUDAFEvaluator != null);
HiveParserBaseSemanticAnalyzer.GenericUDAFInfo udaf =
HiveParserUtils.getGenericUDAFInfo(
genericUDAFEvaluator, aggMode, aggParameters);
String aggAlias = null;
if (value.getParent().getType() == HiveASTParser.TOK_SELEXPR
&& value.getParent().getChildCount() == 2) {
aggAlias =
unescapeIdentifier(
value.getParent().getChild(1).getText().toLowerCase());
}
AggInfo aggInfo =
new AggInfo(
aggParameters,
udaf.returnType,
aggName,
isDistinct,
isAllColumns,
aggAlias);
aggInfos.add(aggInfo);
String field =
aggAlias == null
? getColumnInternalName(numGroupCols + aggInfos.size() - 1)
: aggAlias;
outputColNames.add(field);
outputRR.putExpression(
value, new ColumnInfo(field, aggInfo.getReturnType(), "", false));
}
}
// 7. If GroupingSets, Cube, Rollup were used, we account grouping__id
// GROUPING__ID is also required by the GROUPING function, so let's always add it for
// grouping sets
if (groupingSets != null && !groupingSets.isEmpty()) {
String field = getColumnInternalName(numGroupCols + aggInfos.size());
outputColNames.add(field);
outputRR.put(
null,
VirtualColumn.GROUPINGID.getName(),
new ColumnInfo(
field,
// flink grouping_id's return type is bigint
TypeInfoFactory.longTypeInfo,
null,
true));
}
// 8. We create the group_by operator
gbRel = genGBRelNode(gbExprNodeDescs, aggInfos, groupingSets, srcRel);
relToHiveColNameCalcitePosMap.put(gbRel, buildHiveToCalciteColumnMap(outputRR));
relToRowResolver.put(gbRel, outputRR);
}
return gbRel;
}