public Operator visitJoin()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java [1818:2007]


  public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
    List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());

    Operator leftChild = node.getLeftChild().accept(this, context);
    Operator rightChild = node.getRightChild().accept(this, context);

    ImmutableMap<Symbol, Integer> leftColumnNamesMap =
        makeLayoutFromOutputSymbols(node.getLeftChild().getOutputSymbols());
    int[] leftOutputSymbolIdx = new int[node.getLeftOutputSymbols().size()];
    for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
      Integer index = leftColumnNamesMap.get(node.getLeftOutputSymbols().get(i));
      if (index == null) {
        throw new IllegalStateException(
            "Left child of JoinNode doesn't contain LeftOutputSymbol "
                + node.getLeftOutputSymbols().get(i));
      }
      leftOutputSymbolIdx[i] = index;
    }

    ImmutableMap<Symbol, Integer> rightColumnNamesMap =
        makeLayoutFromOutputSymbols(node.getRightChild().getOutputSymbols());
    int[] rightOutputSymbolIdx = new int[node.getRightOutputSymbols().size()];
    for (int i = 0; i < rightOutputSymbolIdx.length; i++) {
      Integer index = rightColumnNamesMap.get(node.getRightOutputSymbols().get(i));
      if (index == null) {
        throw new IllegalStateException(
            "Right child of JoinNode doesn't contain RightOutputSymbol "
                + node.getLeftOutputSymbols().get(i));
      }
      rightOutputSymbolIdx[i] = index;
    }

    // cross join does not need time column
    if (node.isCrossJoin()) {
      OperatorContext operatorContext =
          context
              .getDriverContext()
              .addOperatorContext(
                  context.getNextOperatorId(),
                  node.getPlanNodeId(),
                  SimpleNestedLoopCrossJoinOperator.class.getSimpleName());
      return new SimpleNestedLoopCrossJoinOperator(
          operatorContext,
          leftChild,
          rightChild,
          leftOutputSymbolIdx,
          rightOutputSymbolIdx,
          dataTypes);
    }

    semanticCheckForJoin(node);

    JoinNode.AsofJoinClause asofJoinClause = node.getAsofCriteria().orElse(null);
    int equiSize = node.getCriteria().size();
    int size = equiSize + (asofJoinClause == null ? 0 : 1);
    int[] leftJoinKeyPositions = new int[size];
    for (int i = 0; i < equiSize; i++) {
      Integer leftJoinKeyPosition = leftColumnNamesMap.get(node.getCriteria().get(i).getLeft());
      if (leftJoinKeyPosition == null) {
        throw new IllegalStateException("Left child of JoinNode doesn't contain left join key.");
      }
      leftJoinKeyPositions[i] = leftJoinKeyPosition;
    }

    List<Type> joinKeyTypes = new ArrayList<>(size);
    int[] rightJoinKeyPositions = new int[size];
    for (int i = 0; i < equiSize; i++) {
      Integer rightJoinKeyPosition = rightColumnNamesMap.get(node.getCriteria().get(i).getRight());
      if (rightJoinKeyPosition == null) {
        throw new IllegalStateException("Right child of JoinNode doesn't contain right join key.");
      }
      rightJoinKeyPositions[i] = rightJoinKeyPosition;

      Type leftJoinKeyType =
          context.getTypeProvider().getTableModelType(node.getCriteria().get(i).getLeft());
      checkIfJoinKeyTypeMatches(
          leftJoinKeyType,
          context.getTypeProvider().getTableModelType(node.getCriteria().get(i).getRight()));
      joinKeyTypes.add(leftJoinKeyType);
    }

    if (asofJoinClause != null) {
      Integer leftAsofJoinKeyPosition = leftColumnNamesMap.get(asofJoinClause.getLeft());
      if (leftAsofJoinKeyPosition == null) {
        throw new IllegalStateException(
            "Left child of JoinNode doesn't contain left ASOF main join key.");
      }
      leftJoinKeyPositions[equiSize] = leftAsofJoinKeyPosition;
      Integer rightAsofJoinKeyPosition = rightColumnNamesMap.get(asofJoinClause.getRight());
      if (rightAsofJoinKeyPosition == null) {
        throw new IllegalStateException(
            "Right child of JoinNode doesn't contain right ASOF main join key.");
      }
      rightJoinKeyPositions[equiSize] = rightAsofJoinKeyPosition;

      if (context.getTypeProvider().getTableModelType(asofJoinClause.getLeft()) != TIMESTAMP) {
        throw new IllegalStateException("Type of left ASOF Join key is not TIMESTAMP");
      }
      if (context.getTypeProvider().getTableModelType(asofJoinClause.getRight()) != TIMESTAMP) {
        throw new IllegalStateException("Type of right ASOF Join key is not TIMESTAMP");
      }

      ComparisonExpression.Operator asofOperator = asofJoinClause.getOperator();

      if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
        OperatorContext operatorContext =
            context
                .getDriverContext()
                .addOperatorContext(
                    context.getNextOperatorId(),
                    node.getPlanNodeId(),
                    AsofMergeSortInnerJoinOperator.class.getSimpleName());
        return new AsofMergeSortInnerJoinOperator(
            operatorContext,
            leftChild,
            leftJoinKeyPositions,
            leftOutputSymbolIdx,
            rightChild,
            rightJoinKeyPositions,
            rightOutputSymbolIdx,
            JoinKeyComparatorFactory.getAsofComparators(
                joinKeyTypes,
                asofOperator == ComparisonExpression.Operator.LESS_THAN_OR_EQUAL
                    || asofOperator == ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL,
                !asofJoinClause.isOperatorContainsGreater()),
            dataTypes);
      } else {
        throw new IllegalStateException("Unsupported ASOF join type: " + node.getJoinType());
      }
    }

    if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
      OperatorContext operatorContext =
          context
              .getDriverContext()
              .addOperatorContext(
                  context.getNextOperatorId(),
                  node.getPlanNodeId(),
                  MergeSortInnerJoinOperator.class.getSimpleName());
      return new MergeSortInnerJoinOperator(
          operatorContext,
          leftChild,
          leftJoinKeyPositions,
          leftOutputSymbolIdx,
          rightChild,
          rightJoinKeyPositions,
          rightOutputSymbolIdx,
          JoinKeyComparatorFactory.getComparators(joinKeyTypes, true),
          dataTypes);
    } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.FULL) {
      OperatorContext operatorContext =
          context
              .getDriverContext()
              .addOperatorContext(
                  context.getNextOperatorId(),
                  node.getPlanNodeId(),
                  MergeSortFullOuterJoinOperator.class.getSimpleName());
      return new MergeSortFullOuterJoinOperator(
          operatorContext,
          leftChild,
          leftJoinKeyPositions,
          leftOutputSymbolIdx,
          rightChild,
          rightJoinKeyPositions,
          rightOutputSymbolIdx,
          JoinKeyComparatorFactory.getComparators(joinKeyTypes, true),
          dataTypes,
          joinKeyTypes.stream().map(this::buildUpdateLastRowFunction).collect(Collectors.toList()));
    } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.LEFT) {
      OperatorContext operatorContext =
          context
              .getDriverContext()
              .addOperatorContext(
                  context.getNextOperatorId(),
                  node.getPlanNodeId(),
                  MergeSortLeftJoinOperator.class.getSimpleName());
      return new MergeSortLeftJoinOperator(
          operatorContext,
          leftChild,
          leftJoinKeyPositions,
          leftOutputSymbolIdx,
          rightChild,
          rightJoinKeyPositions,
          rightOutputSymbolIdx,
          JoinKeyComparatorFactory.getComparators(joinKeyTypes, true),
          dataTypes);
    }

    throw new IllegalStateException("Unsupported join type: " + node.getJoinType());
  }