in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [462:690]
private RelNode genJoinRelNode(
RelNode leftRel,
String leftTableAlias,
RelNode rightRel,
String rightTableAlias,
JoinType hiveJoinType,
HiveParserASTNode joinCondAst)
throws SemanticException {
HiveParserRowResolver leftRR = relToRowResolver.get(leftRel);
HiveParserRowResolver rightRR = relToRowResolver.get(rightRel);
// 1. Construct ExpressionNodeDesc representing Join Condition
RexNode joinCondRex;
List<String> namedColumns = null;
if (joinCondAst != null) {
HiveParserJoinTypeCheckCtx jCtx =
new HiveParserJoinTypeCheckCtx(
leftRR, rightRR, hiveJoinType, frameworkConfig, cluster);
jCtx.setUnparseTranslator(semanticAnalyzer.unparseTranslator);
HiveParserRowResolver combinedRR = HiveParserRowResolver.getCombinedRR(leftRR, rightRR);
if (joinCondAst.getType() == HiveASTParser.TOK_TABCOLNAME
&& !hiveJoinType.equals(JoinType.LEFTSEMI)) {
namedColumns = new ArrayList<>();
// We will transform using clause and make it look like an on-clause.
// So, lets generate a valid on-clause AST from using.
HiveParserASTNode and =
(HiveParserASTNode)
HiveASTParseDriver.ADAPTOR.create(HiveASTParser.KW_AND, "and");
HiveParserASTNode equal = null;
int count = 0;
for (Node child : joinCondAst.getChildren()) {
String columnName = ((HiveParserASTNode) child).getText();
// dealing with views
if (semanticAnalyzer.unparseTranslator != null
&& semanticAnalyzer.unparseTranslator.isEnabled()) {
semanticAnalyzer.unparseTranslator.addIdentifierTranslation(
(HiveParserASTNode) child);
}
namedColumns.add(columnName);
HiveParserASTNode left =
HiveParserASTBuilder.qualifiedName(leftTableAlias, columnName);
HiveParserASTNode right =
HiveParserASTBuilder.qualifiedName(rightTableAlias, columnName);
equal =
(HiveParserASTNode)
HiveASTParseDriver.ADAPTOR.create(HiveASTParser.EQUAL, "=");
HiveASTParseDriver.ADAPTOR.addChild(equal, left);
HiveASTParseDriver.ADAPTOR.addChild(equal, right);
HiveASTParseDriver.ADAPTOR.addChild(and, equal);
count++;
}
joinCondAst = count > 1 ? and : equal;
} else if (semanticAnalyzer.unparseTranslator != null
&& semanticAnalyzer.unparseTranslator.isEnabled()) {
semanticAnalyzer.genAllExprNodeDesc(joinCondAst, combinedRR, jCtx);
}
Map<HiveParserASTNode, ExprNodeDesc> exprNodes =
HiveParserUtils.genExprNode(joinCondAst, jCtx);
if (jCtx.getError() != null) {
throw new SemanticException(
generateErrorMessage(jCtx.getErrorSrcNode(), jCtx.getError()));
}
ExprNodeDesc joinCondExprNode = exprNodes.get(joinCondAst);
List<RelNode> inputRels = new ArrayList<>();
inputRels.add(leftRel);
inputRels.add(rightRel);
joinCondRex =
HiveParserRexNodeConverter.convert(
cluster,
joinCondExprNode,
inputRels,
relToRowResolver,
relToHiveColNameCalcitePosMap,
false,
funcConverter)
.accept(funcConverter);
} else {
joinCondRex = cluster.getRexBuilder().makeLiteral(true);
}
// 3. Construct Join Rel Node and HiveParserRowResolver for the new Join Node
boolean leftSemiJoin = false;
JoinRelType calciteJoinType;
switch (hiveJoinType) {
case LEFTOUTER:
calciteJoinType = JoinRelType.LEFT;
break;
case RIGHTOUTER:
calciteJoinType = JoinRelType.RIGHT;
break;
case FULLOUTER:
calciteJoinType = JoinRelType.FULL;
break;
case LEFTSEMI:
calciteJoinType = JoinRelType.SEMI;
leftSemiJoin = true;
break;
case INNER:
default:
calciteJoinType = JoinRelType.INNER;
break;
}
RelNode topRel;
HiveParserRowResolver topRR;
if (leftSemiJoin) {
List<RelDataTypeField> sysFieldList = new ArrayList<>();
List<RexNode> leftJoinKeys = new ArrayList<>();
List<RexNode> rightJoinKeys = new ArrayList<>();
RexNode nonEquiConds =
HiveRelOptUtil.splitHiveJoinCondition(
sysFieldList,
Arrays.asList(leftRel, rightRel),
joinCondRex,
Arrays.asList(leftJoinKeys, rightJoinKeys),
null,
null);
RelNode[] inputRels = new RelNode[] {leftRel, rightRel};
final List<Integer> leftKeys = new ArrayList<>();
final List<Integer> rightKeys = new ArrayList<>();
RexNode remainingEquiCond =
HiveParserUtils.projectNonColumnEquiConditions(
RelFactories.DEFAULT_PROJECT_FACTORY,
inputRels,
leftJoinKeys,
rightJoinKeys,
0,
leftKeys,
rightKeys);
// Adjust right input fields in nonEquiConds if previous call modified the input
if (inputRels[0] != leftRel) {
nonEquiConds =
RexUtil.shift(
nonEquiConds,
leftRel.getRowType().getFieldCount(),
inputRels[0].getRowType().getFieldCount()
- leftRel.getRowType().getFieldCount());
}
joinCondRex =
remainingEquiCond != null
? RexUtil.composeConjunction(
cluster.getRexBuilder(),
Arrays.asList(remainingEquiCond, nonEquiConds),
false)
: nonEquiConds;
topRel =
LogicalJoin.create(
inputRels[0],
inputRels[1],
Collections.emptyList(),
joinCondRex,
Collections.emptySet(),
calciteJoinType);
// Create join RR: we need to check whether we need to update left RR in case
// previous call to projectNonColumnEquiConditions updated it
if (inputRels[0] != leftRel) {
HiveParserRowResolver newLeftRR = new HiveParserRowResolver();
if (!HiveParserRowResolver.add(newLeftRR, leftRR)) {
LOG.warn("Duplicates detected when adding columns to RR: see previous message");
}
for (int i = leftRel.getRowType().getFieldCount();
i < inputRels[0].getRowType().getFieldCount();
i++) {
ColumnInfo oColInfo =
new ColumnInfo(
getColumnInternalName(i),
HiveParserTypeConverter.convert(
inputRels[0]
.getRowType()
.getFieldList()
.get(i)
.getType()),
null,
false);
newLeftRR.put(oColInfo.getTabAlias(), oColInfo.getInternalName(), oColInfo);
}
HiveParserRowResolver joinRR = new HiveParserRowResolver();
if (!HiveParserRowResolver.add(joinRR, newLeftRR)) {
LOG.warn("Duplicates detected when adding columns to RR: see previous message");
}
relToHiveColNameCalcitePosMap.put(topRel, buildHiveToCalciteColumnMap(joinRR));
relToRowResolver.put(topRel, joinRR);
// Introduce top project operator to remove additional column(s) that have been
// introduced
List<RexNode> topFields = new ArrayList<>();
List<String> topFieldNames = new ArrayList<>();
for (int i = 0; i < leftRel.getRowType().getFieldCount(); i++) {
final RelDataTypeField field = leftRel.getRowType().getFieldList().get(i);
topFields.add(
leftRel.getCluster().getRexBuilder().makeInputRef(field.getType(), i));
topFieldNames.add(field.getName());
}
topRel =
LogicalProject.create(
topRel, Collections.emptyList(), topFields, topFieldNames);
}
topRR = new HiveParserRowResolver();
if (!HiveParserRowResolver.add(topRR, leftRR)) {
LOG.warn("Duplicates detected when adding columns to RR: see previous message");
}
} else {
topRel =
LogicalJoin.create(
leftRel,
rightRel,
Collections.emptyList(),
joinCondRex,
Collections.emptySet(),
calciteJoinType);
topRR = HiveParserRowResolver.getCombinedRR(leftRR, rightRR);
if (namedColumns != null) {
List<String> tableAliases = new ArrayList<>();
tableAliases.add(leftTableAlias);
tableAliases.add(rightTableAlias);
topRR.setNamedJoinInfo(
new HiveParserNamedJoinInfo(tableAliases, namedColumns, hiveJoinType));
}
}
relToHiveColNameCalcitePosMap.put(topRel, buildHiveToCalciteColumnMap(topRR));
relToRowResolver.put(topRel, topRR);
return topRel;
}