in wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangMultiConditionJoinVisitor.java [56:134]
Operator visit(WayangJoin wayangRelNode) {
final Operator childOpLeft = wayangRelConverter.convert(wayangRelNode.getInput(0));
final Operator childOpRight = wayangRelConverter.convert(wayangRelNode.getInput(1));
final RexNode condition = ((Join) wayangRelNode).getCondition();
final RexCall call = (RexCall) condition;
//
final List<RexCall> subConditions = call.operands.stream()
.map(RexCall.class::cast)
.collect(Collectors.toList());
// calcite generates the RexInputRef indexes via looking at the union
// field list of the left and right input of a join.
// since the left input is always the first in this joined field list
// we can eagerly get the fields in the left input
final List<RexInputRef> leftTableInputRefs = subConditions.stream()
.map(sub -> sub.getOperands().stream()
.map(RexInputRef.class::cast)
.min((left, right) -> Integer.compare(left.getIndex(), right.getIndex()))
.get())
.collect(Collectors.toList());
final Integer[] leftTableKeyIndexes = leftTableInputRefs.stream()
.map(RexInputRef::getIndex)
.toArray(Integer[]::new);
// for the right table input refs, the indexes are offset by the amount of rows
// in the left
// input to the join
final List<RexInputRef> rightTableInputRefs = subConditions.stream()
.map(sub -> sub.getOperands().stream()
.map(RexInputRef.class::cast)
.max((left, right) -> Integer.compare(left.getIndex(), right.getIndex()))
.get())
.collect(Collectors.toList());
final Integer[] rightTableKeyIndexes = rightTableInputRefs.stream()
.map(RexInputRef::getIndex)
.map(key -> key - wayangRelNode.getLeft().getRowType().getFieldCount()) // apply offset
.toArray(Integer[]::new);
/*
final List<RelDataTypeField> leftFields = Arrays.stream(leftTableKeyIndexes)
.map(key -> wayangRelNode.getLeft().getRowType().getFieldList().get(key))
.collect(Collectors.toList());
final List<RelDataTypeField> rightFields = Arrays.stream(rightTableKeyIndexes)
.map(key -> wayangRelNode.getRight().getRowType().getFieldList().get(key))
.collect(Collectors.toList());
final String joiningTableName = childOpLeft instanceof WayangTableScan ? childOpLeft.getName() : childOpRight.getName();
*/
// if join is joining the LHS of a join condition "JOIN left ON left = right"
// then we pick the first case, otherwise the 2nd "JOIN right ON left = right"
final JoinOperator<Record, Record, Record> join = this.getJoinOperator(
leftTableKeyIndexes,
rightTableKeyIndexes,
wayangRelNode,
"",
"",
"",
"");
childOpLeft.connectTo(0, join, 0);
childOpRight.connectTo(0, join, 1);
// Join returns Tuple2 - map to a Record
final SerializableFunction<Tuple2<Record, Record>, Record> mp = new JoinFlattenResult();
final MapOperator<Tuple2<Record, Record>, Record> mapOperator = new MapOperator<Tuple2<Record, Record>, Record>(
mp,
ReflectionUtils.specify(Tuple2.class),
Record.class);
join.connectTo(0, mapOperator, 0);
return mapOperator;
}