in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [1598:1755]
private Pair<Sort, RelNode> genOBLogicalPlan(
HiveParserQB qb, RelNode srcRel, boolean outermostOB) throws SemanticException {
Sort sortRel = null;
RelNode originalOBInput = null;
HiveParserQBParseInfo qbp = qb.getParseInfo();
String dest = qbp.getClauseNames().iterator().next();
HiveParserASTNode obAST = qbp.getOrderByForClause(dest);
if (obAST != null) {
// 1. OB Expr sanity test
// in strict mode, in the presence of order by, limit must be specified
Integer limit = qb.getParseInfo().getDestLimit(dest);
if (limit == null) {
String mapRedMode =
semanticAnalyzer.getConf().getVar(HiveConf.ConfVars.HIVEMAPREDMODE);
boolean banLargeQuery =
Boolean.parseBoolean(
semanticAnalyzer
.getConf()
.get("hive.strict.checks.large.query", "false"));
if ("strict".equalsIgnoreCase(mapRedMode) || banLargeQuery) {
throw new SemanticException(
generateErrorMessage(obAST, "Order by-s without limit"));
}
}
// 2. Walk through OB exprs and extract field collations and additional
// virtual columns needed
final List<RexNode> virtualCols = new ArrayList<>();
final List<RelFieldCollation> fieldCollations = new ArrayList<>();
int fieldIndex;
List<Node> obASTExprLst = obAST.getChildren();
HiveParserASTNode obASTExpr;
HiveParserASTNode nullOrderASTExpr;
List<Pair<HiveParserASTNode, TypeInfo>> vcASTAndType = new ArrayList<>();
HiveParserRowResolver inputRR = relToRowResolver.get(srcRel);
HiveParserRowResolver outputRR = new HiveParserRowResolver();
HiveParserRexNodeConverter converter =
new HiveParserRexNodeConverter(
cluster,
srcRel.getRowType(),
relToHiveColNameCalcitePosMap.get(srcRel),
0,
false,
funcConverter);
int numSrcFields = srcRel.getRowType().getFieldCount();
for (Node node : obASTExprLst) {
// 2.1 Convert AST Expr to ExprNode
obASTExpr = (HiveParserASTNode) node;
nullOrderASTExpr = (HiveParserASTNode) obASTExpr.getChild(0);
HiveParserASTNode ref = (HiveParserASTNode) nullOrderASTExpr.getChild(0);
Map<HiveParserASTNode, ExprNodeDesc> astToExprNodeDesc =
semanticAnalyzer.genAllExprNodeDesc(ref, inputRR);
ExprNodeDesc obExprNodeDesc = astToExprNodeDesc.get(ref);
if (obExprNodeDesc == null) {
throw new SemanticException(
"Invalid order by expression: " + obASTExpr.toString());
}
// 2.2 Convert ExprNode to RexNode
RexNode rexNode = converter.convert(obExprNodeDesc).accept(funcConverter);
// 2.3 Determine the index of ob expr in child schema
// NOTE: Calcite can not take compound exprs in OB without it being
// present in the child (& hence we add a child Project Rel)
if (rexNode instanceof RexInputRef) {
fieldIndex = ((RexInputRef) rexNode).getIndex();
} else {
fieldIndex = numSrcFields + virtualCols.size();
virtualCols.add(rexNode);
vcASTAndType.add(new Pair<>(ref, obExprNodeDesc.getTypeInfo()));
}
// 2.4 Determine the Direction of order by
RelFieldCollation.Direction direction = RelFieldCollation.Direction.DESCENDING;
if (obASTExpr.getType() == HiveASTParser.TOK_TABSORTCOLNAMEASC) {
direction = RelFieldCollation.Direction.ASCENDING;
}
RelFieldCollation.NullDirection nullOrder;
if (nullOrderASTExpr.getType() == HiveASTParser.TOK_NULLS_FIRST) {
nullOrder = RelFieldCollation.NullDirection.FIRST;
} else if (nullOrderASTExpr.getType() == HiveASTParser.TOK_NULLS_LAST) {
nullOrder = RelFieldCollation.NullDirection.LAST;
} else {
throw new SemanticException(
"Unexpected null ordering option: " + nullOrderASTExpr.getType());
}
// 2.5 Add to field collations
fieldCollations.add(new RelFieldCollation(fieldIndex, direction, nullOrder));
}
// 3. Add Child Project Rel if needed, Generate Output RR, input Sel Rel
// for top constraining Sel
RelNode obInputRel = srcRel;
if (!virtualCols.isEmpty()) {
List<RexNode> originalInputRefs =
srcRel.getRowType().getFieldList().stream()
.map(input -> new RexInputRef(input.getIndex(), input.getType()))
.collect(Collectors.toList());
HiveParserRowResolver obSyntheticProjectRR = new HiveParserRowResolver();
if (!HiveParserRowResolver.add(obSyntheticProjectRR, inputRR)) {
throw new SemanticException(
"Duplicates detected when adding columns to RR: see previous message");
}
int vcolPos = inputRR.getRowSchema().getSignature().size();
for (Pair<HiveParserASTNode, TypeInfo> astTypePair : vcASTAndType) {
obSyntheticProjectRR.putExpression(
astTypePair.getKey(),
new ColumnInfo(
getColumnInternalName(vcolPos),
astTypePair.getValue(),
null,
false));
vcolPos++;
}
obInputRel =
genSelectRelNode(
CompositeList.of(originalInputRefs, virtualCols),
obSyntheticProjectRR,
srcRel);
if (outermostOB) {
if (!HiveParserRowResolver.add(outputRR, inputRR)) {
throw new SemanticException(
"Duplicates detected when adding columns to RR: see previous message");
}
} else {
if (!HiveParserRowResolver.add(outputRR, obSyntheticProjectRR)) {
throw new SemanticException(
"Duplicates detected when adding columns to RR: see previous message");
}
}
originalOBInput = srcRel;
} else {
if (!HiveParserRowResolver.add(outputRR, inputRR)) {
throw new SemanticException(
"Duplicates detected when adding columns to RR: see previous message");
}
}
// 4. Construct SortRel
RelTraitSet traitSet = cluster.traitSet();
RelCollation canonizedCollation = traitSet.canonize(RelCollations.of(fieldCollations));
sortRel = LogicalSort.create(obInputRel, canonizedCollation, null, null);
// 5. Update the maps
Map<String, Integer> hiveColNameCalcitePosMap = buildHiveToCalciteColumnMap(outputRR);
relToRowResolver.put(sortRel, outputRR);
relToHiveColNameCalcitePosMap.put(sortRel, hiveColNameCalcitePosMap);
}
return (new Pair<>(sortRel, originalOBInput));
}