in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [308:460]
private RelNode genSetOpLogicalPlan(
HiveParserQBExpr.Opcode opcode,
String alias,
String leftalias,
RelNode leftRel,
String rightalias,
RelNode rightRel)
throws SemanticException {
// 1. Get Row Resolvers, Column map for original left and right input of SetOp Rel
HiveParserRowResolver leftRR = relToRowResolver.get(leftRel);
HiveParserRowResolver rightRR = relToRowResolver.get(rightRel);
HashMap<String, ColumnInfo> leftMap = leftRR.getFieldMap(leftalias);
HashMap<String, ColumnInfo> rightMap = rightRR.getFieldMap(rightalias);
// 2. Validate that SetOp is feasible according to Hive (by using type info from RR)
if (leftMap.size() != rightMap.size()) {
throw new SemanticException("Schema of both sides of union should match.");
}
// 3. construct SetOp Output RR using original left & right Input
HiveParserRowResolver setOpOutRR = new HiveParserRowResolver();
Iterator<Map.Entry<String, ColumnInfo>> lIter = leftMap.entrySet().iterator();
Iterator<Map.Entry<String, ColumnInfo>> rIter = rightMap.entrySet().iterator();
while (lIter.hasNext()) {
Map.Entry<String, ColumnInfo> lEntry = lIter.next();
Map.Entry<String, ColumnInfo> rEntry = rIter.next();
ColumnInfo lInfo = lEntry.getValue();
ColumnInfo rInfo = rEntry.getValue();
String field = lEntry.getKey();
// try widening conversion, otherwise fail union
TypeInfo commonTypeInfo =
FunctionRegistry.getCommonClassForUnionAll(lInfo.getType(), rInfo.getType());
if (commonTypeInfo == null) {
HiveParserASTNode tabRef =
getQB().getAliases().isEmpty()
? null
: getQB().getParseInfo()
.getSrcForAlias(getQB().getAliases().get(0));
throw new SemanticException(
generateErrorMessage(
tabRef,
"Schema of both sides of setop should match: Column "
+ field
+ " is of type "
+ lInfo.getType().getTypeName()
+ " on first table and type "
+ rInfo.getType().getTypeName()
+ " on second table"));
}
ColumnInfo setOpColInfo = new ColumnInfo(lInfo);
setOpColInfo.setType(commonTypeInfo);
setOpOutRR.put(alias, field, setOpColInfo);
}
// 4. Determine which columns requires cast on left/right input (Calcite requires exact
// types on both sides of SetOp)
boolean leftNeedsTypeCast = false;
boolean rightNeedsTypeCast = false;
List<RexNode> leftProjs = new ArrayList<>();
List<RexNode> rightProjs = new ArrayList<>();
List<RelDataTypeField> leftFields = leftRel.getRowType().getFieldList();
List<RelDataTypeField> rightFields = rightRel.getRowType().getFieldList();
for (int i = 0; i < leftFields.size(); i++) {
RelDataType leftFieldType = leftFields.get(i).getType();
RelDataType rightFieldType = rightFields.get(i).getType();
if (!leftFieldType.equals(rightFieldType)) {
RelDataType unionFieldType =
HiveParserUtils.toRelDataType(
setOpOutRR.getColumnInfos().get(i).getType(),
cluster.getTypeFactory());
if (!unionFieldType.equals(leftFieldType)) {
leftNeedsTypeCast = true;
}
leftProjs.add(
cluster.getRexBuilder()
.ensureType(
unionFieldType,
cluster.getRexBuilder().makeInputRef(leftFieldType, i),
true));
if (!unionFieldType.equals(rightFieldType)) {
rightNeedsTypeCast = true;
}
rightProjs.add(
cluster.getRexBuilder()
.ensureType(
unionFieldType,
cluster.getRexBuilder().makeInputRef(rightFieldType, i),
true));
} else {
leftProjs.add(
cluster.getRexBuilder()
.ensureType(
leftFieldType,
cluster.getRexBuilder().makeInputRef(leftFieldType, i),
true));
rightProjs.add(
cluster.getRexBuilder()
.ensureType(
rightFieldType,
cluster.getRexBuilder().makeInputRef(rightFieldType, i),
true));
}
}
// 5. Introduce Project Rel above original left/right inputs if cast is needed for type
// parity
if (leftNeedsTypeCast) {
leftRel =
LogicalProject.create(
leftRel,
Collections.emptyList(),
leftProjs,
leftRel.getRowType().getFieldNames());
}
if (rightNeedsTypeCast) {
rightRel =
LogicalProject.create(
rightRel,
Collections.emptyList(),
rightProjs,
rightRel.getRowType().getFieldNames());
}
// 6. Construct SetOp Rel
List<RelNode> leftAndRight = Arrays.asList(leftRel, rightRel);
SetOp setOpRel;
switch (opcode) {
case UNION:
setOpRel = LogicalUnion.create(leftAndRight, true);
break;
case INTERSECT:
setOpRel = LogicalIntersect.create(leftAndRight, false);
break;
case INTERSECTALL:
setOpRel = LogicalIntersect.create(leftAndRight, true);
break;
case EXCEPT:
setOpRel = LogicalMinus.create(leftAndRight, false);
break;
case EXCEPTALL:
setOpRel = LogicalMinus.create(leftAndRight, true);
break;
default:
throw new SemanticException("Unsupported set operator " + opcode.toString());
}
relToRowResolver.put(setOpRel, setOpOutRR);
relToHiveColNameCalcitePosMap.put(setOpRel, buildHiveToCalciteColumnMap(setOpOutRR));
return setOpRel;
}