in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java [1129:1254]
public static RexNode projectNonColumnEquiConditions(
RelFactories.ProjectFactory factory,
RelNode[] inputRels,
List<RexNode> leftJoinKeys,
List<RexNode> rightJoinKeys,
int systemColCount,
List<Integer> leftKeys,
List<Integer> rightKeys) {
RelNode leftRel = inputRels[0];
RelNode rightRel = inputRels[1];
RexBuilder rexBuilder = leftRel.getCluster().getRexBuilder();
RexNode outJoinCond = null;
int origLeftInputSize = leftRel.getRowType().getFieldCount();
int origRightInputSize = rightRel.getRowType().getFieldCount();
List<RexNode> newLeftFields = new ArrayList<>();
List<String> newLeftFieldNames = new ArrayList<>();
List<RexNode> newRightFields = new ArrayList<>();
List<String> newRightFieldNames = new ArrayList<>();
int leftKeyCount = leftJoinKeys.size();
int i;
for (i = 0; i < origLeftInputSize; i++) {
final RelDataTypeField field = leftRel.getRowType().getFieldList().get(i);
newLeftFields.add(rexBuilder.makeInputRef(field.getType(), i));
newLeftFieldNames.add(field.getName());
}
for (i = 0; i < origRightInputSize; i++) {
final RelDataTypeField field = rightRel.getRowType().getFieldList().get(i);
newRightFields.add(rexBuilder.makeInputRef(field.getType(), i));
newRightFieldNames.add(field.getName());
}
ImmutableBitSet.Builder origColEqCondsPosBuilder = ImmutableBitSet.builder();
int newKeyCount = 0;
List<Pair<Integer, Integer>> origColEqConds = new ArrayList<>();
for (i = 0; i < leftKeyCount; i++) {
RexNode leftKey = leftJoinKeys.get(i);
RexNode rightKey = rightJoinKeys.get(i);
if (leftKey instanceof RexInputRef && rightKey instanceof RexInputRef) {
origColEqConds.add(
Pair.of(
((RexInputRef) leftKey).getIndex(),
((RexInputRef) rightKey).getIndex()));
origColEqCondsPosBuilder.set(i);
} else {
newLeftFields.add(leftKey);
newLeftFieldNames.add(null);
newRightFields.add(rightKey);
newRightFieldNames.add(null);
newKeyCount++;
}
}
ImmutableBitSet origColEqCondsPos = origColEqCondsPosBuilder.build();
for (i = 0; i < origColEqConds.size(); i++) {
Pair<Integer, Integer> p = origColEqConds.get(i);
int condPos = origColEqCondsPos.nth(i);
RexNode leftKey = leftJoinKeys.get(condPos);
RexNode rightKey = rightJoinKeys.get(condPos);
leftKeys.add(p.left);
rightKeys.add(p.right);
RexNode cond =
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(leftKey.getType(), systemColCount + p.left),
rexBuilder.makeInputRef(
rightKey.getType(),
systemColCount + origLeftInputSize + newKeyCount + p.right));
if (outJoinCond == null) {
outJoinCond = cond;
} else {
outJoinCond = rexBuilder.makeCall(SqlStdOperatorTable.AND, outJoinCond, cond);
}
}
if (newKeyCount == 0) {
return outJoinCond;
}
int newLeftOffset = systemColCount + origLeftInputSize;
int newRightOffset = systemColCount + origLeftInputSize + origRightInputSize + newKeyCount;
for (i = 0; i < newKeyCount; i++) {
leftKeys.add(origLeftInputSize + i);
rightKeys.add(origRightInputSize + i);
RexNode cond =
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(
newLeftFields.get(origLeftInputSize + i).getType(),
newLeftOffset + i),
rexBuilder.makeInputRef(
newRightFields.get(origRightInputSize + i).getType(),
newRightOffset + i));
if (outJoinCond == null) {
outJoinCond = cond;
} else {
outJoinCond = rexBuilder.makeCall(SqlStdOperatorTable.AND, outJoinCond, cond);
}
}
// added project if need to produce new keys than the original input fields
if (newKeyCount > 0) {
leftRel =
factory.createProject(
leftRel,
Collections.emptyList(),
newLeftFields,
SqlValidatorUtil.uniquify(newLeftFieldNames, false));
rightRel =
factory.createProject(
rightRel,
Collections.emptyList(),
newRightFields,
SqlValidatorUtil.uniquify(newRightFieldNames, false));
}
inputRels[0] = leftRel;
inputRels[1] = rightRel;
return outJoinCond;
}