Operator visit()

in wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangMultiConditionJoinVisitor.java [56:134]


    Operator visit(WayangJoin wayangRelNode) {
        final Operator childOpLeft = wayangRelConverter.convert(wayangRelNode.getInput(0));
        final Operator childOpRight = wayangRelConverter.convert(wayangRelNode.getInput(1));
        final RexNode condition = ((Join) wayangRelNode).getCondition();
        final RexCall call = (RexCall) condition;

        //
        final List<RexCall> subConditions = call.operands.stream()
                .map(RexCall.class::cast)
                .collect(Collectors.toList());

        // calcite generates the RexInputRef indexes via looking at the union
        // field list of the left and right input of a join.
        // since the left input is always the first in this joined field list
        // we can eagerly get the fields in the left input
        final List<RexInputRef> leftTableInputRefs = subConditions.stream()
                .map(sub -> sub.getOperands().stream()
                        .map(RexInputRef.class::cast)
                        .min((left, right) -> Integer.compare(left.getIndex(), right.getIndex()))
                        .get())
                .collect(Collectors.toList());

        final Integer[] leftTableKeyIndexes = leftTableInputRefs.stream()
                .map(RexInputRef::getIndex)
                .toArray(Integer[]::new);

        // for the right table input refs, the indexes are offset by the amount of rows
        // in the left
        // input to the join
        final List<RexInputRef> rightTableInputRefs = subConditions.stream()
                .map(sub -> sub.getOperands().stream()
                        .map(RexInputRef.class::cast)
                        .max((left, right) -> Integer.compare(left.getIndex(), right.getIndex()))
                        .get())
                .collect(Collectors.toList());

        final Integer[] rightTableKeyIndexes = rightTableInputRefs.stream()
                .map(RexInputRef::getIndex)
                .map(key -> key - wayangRelNode.getLeft().getRowType().getFieldCount()) // apply offset
                .toArray(Integer[]::new);

        /*
        final List<RelDataTypeField> leftFields = Arrays.stream(leftTableKeyIndexes)
                .map(key -> wayangRelNode.getLeft().getRowType().getFieldList().get(key))
                .collect(Collectors.toList());

        final List<RelDataTypeField> rightFields = Arrays.stream(rightTableKeyIndexes)
                .map(key -> wayangRelNode.getRight().getRowType().getFieldList().get(key))
                .collect(Collectors.toList());

        final String joiningTableName = childOpLeft instanceof WayangTableScan ? childOpLeft.getName() : childOpRight.getName();
        */
        
        // if join is joining the LHS of a join condition "JOIN left ON left = right"
        // then we pick the first case, otherwise the 2nd "JOIN right ON left = right"
        final JoinOperator<Record, Record, Record> join = this.getJoinOperator(
                leftTableKeyIndexes,
                rightTableKeyIndexes,
                wayangRelNode,
                "",
                "",
                "",
                "");

        childOpLeft.connectTo(0, join, 0);
        childOpRight.connectTo(0, join, 1);

        // Join returns Tuple2 - map to a Record
        final SerializableFunction<Tuple2<Record, Record>, Record> mp = new JoinFlattenResult();

        final MapOperator<Tuple2<Record, Record>, Record> mapOperator = new MapOperator<Tuple2<Record, Record>, Record>(
                mp,
                ReflectionUtils.specify(Tuple2.class),
                Record.class);

        join.connectTo(0, mapOperator, 0);

        return mapOperator;
    }