in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [1782:1877]
private Pair<RexNode, TypeInfo> getWindowRexAndType(
HiveParserWindowingSpec.WindowExpressionSpec winExprSpec, RelNode srcRel)
throws SemanticException {
RexNode window;
if (winExprSpec instanceof HiveParserWindowingSpec.WindowFunctionSpec) {
HiveParserWindowingSpec.WindowFunctionSpec wFnSpec =
(HiveParserWindowingSpec.WindowFunctionSpec) winExprSpec;
HiveParserASTNode windowProjAst = wFnSpec.getExpression();
// TODO: do we need to get to child?
int wndSpecASTIndx = getWindowSpecIndx(windowProjAst);
// 2. Get Hive Aggregate Info
AggInfo hiveAggInfo =
getHiveAggInfo(
windowProjAst,
wndSpecASTIndx - 1,
relToRowResolver.get(srcRel),
(HiveParserWindowingSpec.WindowFunctionSpec) winExprSpec,
semanticAnalyzer,
frameworkConfig,
cluster);
// 3. Get Calcite Return type for Agg Fn
RelDataType calciteAggFnRetType =
HiveParserUtils.toRelDataType(
hiveAggInfo.getReturnType(), cluster.getTypeFactory());
// 4. Convert Agg Fn args to Calcite
Map<String, Integer> posMap = relToHiveColNameCalcitePosMap.get(srcRel);
HiveParserRexNodeConverter converter =
new HiveParserRexNodeConverter(
cluster, srcRel.getRowType(), posMap, 0, false, funcConverter);
List<RexNode> calciteAggFnArgs = new ArrayList<>();
List<RelDataType> calciteAggFnArgTypes = new ArrayList<>();
for (int i = 0; i < hiveAggInfo.getAggParams().size(); i++) {
calciteAggFnArgs.add(converter.convert(hiveAggInfo.getAggParams().get(i)));
calciteAggFnArgTypes.add(
HiveParserUtils.toRelDataType(
hiveAggInfo.getAggParams().get(i).getTypeInfo(),
cluster.getTypeFactory()));
}
// 5. Get Calcite Agg Fn
final SqlAggFunction calciteAggFn =
HiveParserSqlFunctionConverter.getCalciteAggFn(
hiveAggInfo.getUdfName(),
hiveAggInfo.isDistinct(),
calciteAggFnArgTypes,
calciteAggFnRetType);
// 6. Translate Window spec
HiveParserRowResolver inputRR = relToRowResolver.get(srcRel);
HiveParserWindowingSpec.WindowSpec wndSpec =
((HiveParserWindowingSpec.WindowFunctionSpec) winExprSpec).getWindowSpec();
List<RexNode> partitionKeys =
getPartitionKeys(
wndSpec.getPartition(),
converter,
inputRR,
new HiveParserTypeCheckCtx(inputRR, frameworkConfig, cluster),
semanticAnalyzer);
List<RexFieldCollation> orderKeys =
getOrderKeys(
wndSpec.getOrder(),
converter,
inputRR,
new HiveParserTypeCheckCtx(inputRR, frameworkConfig, cluster),
semanticAnalyzer);
RexWindowBound lowerBound = getBound(wndSpec.getWindowFrame().getStart(), cluster);
RexWindowBound upperBound = getBound(wndSpec.getWindowFrame().getEnd(), cluster);
boolean isRows =
wndSpec.getWindowFrame().getWindowType()
== HiveParserWindowingSpec.WindowType.ROWS;
window =
HiveParserUtils.makeOver(
cluster.getRexBuilder(),
calciteAggFnRetType,
calciteAggFn,
calciteAggFnArgs,
partitionKeys,
orderKeys,
lowerBound,
upperBound,
isRows,
true,
false,
false,
false);
window = window.accept(funcConverter);
} else {
throw new SemanticException("Unsupported window Spec");
}
return new Pair<>(window, HiveParserTypeConverter.convert(window.getType()));
}