private void validateJoinQuery()

in samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java [198:293]


  private void validateJoinQuery(LogicalJoin join, JoinInputNode.InputType inputTypeOnLeft,
      JoinInputNode.InputType inputTypeOnRight) {
    JoinRelType joinRelType = join.getJoinType();

    if (joinRelType.compareTo(JoinRelType.INNER) != 0 && joinRelType.compareTo(JoinRelType.LEFT) != 0
        && joinRelType.compareTo(JoinRelType.RIGHT) != 0) {
      throw new SamzaException("Query with only INNER and LEFT/RIGHT OUTER join are supported.");
    }

    boolean isTablePosOnLeft = inputTypeOnLeft != JoinInputNode.InputType.STREAM;
    boolean isTablePosOnRight = inputTypeOnRight != JoinInputNode.InputType.STREAM;

    if (!isTablePosOnLeft && !isTablePosOnRight) {
      throw new SamzaException("Invalid query with both sides of join being denoted as 'stream'. "
          + "Stream-stream join is not yet supported. " + dumpRelPlanForNode(join));
    }

    if (isTablePosOnLeft && isTablePosOnRight) {
      throw new SamzaException("Invalid query with both sides of join being denoted as 'table'. " +
          dumpRelPlanForNode(join));
    }

    if (joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnLeft) {
      throw new SamzaException("Invalid query for outer left join. Left side of the join should be a 'stream' and "
          + "right side of join should be a 'table'. " + dumpRelPlanForNode(join));
    }

    if (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && isTablePosOnRight) {
      throw new SamzaException("Invalid query for outer right join. Left side of the join should be a 'table' and "
          + "right side of join should be a 'stream'. " + dumpRelPlanForNode(join));
    }

    final List<RexNode> conjunctionList = new ArrayList<>();
    decomposeAndValidateConjunction(join.getCondition(), conjunctionList);

    if (conjunctionList.isEmpty()) {
      throw new SamzaException("Query results in a cross join, which is not supported. Please optimize the query."
          + " It is expected that the joins should include JOIN ON operator in the sql query.");
    }
    //TODO Not sure why we can not allow literal as part of the join condition will revisit this in another scope
    conjunctionList.forEach(rexNode -> rexNode.accept(new RexShuttle() {
      @Override
      public RexNode visitLiteral(RexLiteral literal) {
        throw new SamzaException(
            "Join Condition can not allow literal " + literal.toString() + " join node" + join.getDigest());
      }
    }));
    final JoinInputNode.InputType rootTableInput = isTablePosOnRight ? inputTypeOnRight : inputTypeOnLeft;
    if (rootTableInput.compareTo(JoinInputNode.InputType.REMOTE_TABLE) != 0) {
      // it is not a remote table all is good we do not have to validate the project on key Column
      return;
    }

    /*
    For remote Table we need to validate The join Condition and The project that is above remote table scan.
     - As of today Filter need to be exactly one equi-join using the __key__ column (see SAMZA-2554)
     - The Project on the top of the remote table has to contain only simple input references to any of the column used in the join.
    */

    // First let's collect the ref of columns used by the join condition.
    List<RexInputRef> refCollector = new ArrayList<>();
    join.getCondition().accept(new RexShuttle() {
      @Override
      public RexNode visitInputRef(RexInputRef inputRef) {
        refCollector.add(inputRef);
        return inputRef;
      }
    });
    // start index of the Remote table within the Join Row
    final int tableStartIndex = isTablePosOnRight ? join.getLeft().getRowType().getFieldCount() : 0;
    // end index of the Remote table withing the Join Row
    final int tableEndIndex =
        isTablePosOnRight ? join.getRowType().getFieldCount() : join.getLeft().getRowType().getFieldCount();

    List<Integer> tableRefsIdx = refCollector.stream()
        .map(x -> x.getIndex())
        .filter(x -> tableStartIndex <= x && x < tableEndIndex) // collect all the refs form table side
        .map(x -> x - tableStartIndex) // re-adjust the offset
        .sorted()
        .collect(Collectors.toList()); // we have a list with all the input from table side with 0 based index.

    // Validate the Condition must contain a ref to remote table primary key column.

    if (conjunctionList.size() != 1 || tableRefsIdx.size() != 1) {
      //TODO We can relax this by allowing another filter to be evaluated post lookup see SAMZA-2554
      throw new SamzaException(
          "Invalid query for join condition must contain exactly one predicate for remote table on __key__ column "
              + dumpRelPlanForNode(join));
    }

    // Validate the Project, follow each input and ensure that it is a simple ref with no rexCall in the way.
    if (!isValidRemoteJoinRef(tableRefsIdx.get(0), isTablePosOnRight ? join.getRight() : join.getLeft())) {
      throw new SamzaException("Invalid query for join condition can not have an expression and must be reference "
          + SamzaSqlRelMessage.KEY_NAME + " column " + dumpRelPlanForNode(join));
    }
  }