RowVectorPtr MergeJoin::doGetOutput()

in velox/exec/MergeJoin.cpp [403:594]


RowVectorPtr MergeJoin::doGetOutput() {
  // Check if we ran out of space in the output vector in the middle of the
  // match.
  if (leftMatch_ && leftMatch_->cursor) {
    VELOX_CHECK(rightMatch_ && rightMatch_->cursor);

    // Not all rows from the last match fit in the output. Continue producing
    // results from the current match.
    if (addToOutput()) {
      return std::move(output_);
    }
  }

  // There is no output-in-progress match, but there could be incomplete
  // match.
  if (leftMatch_) {
    VELOX_CHECK(rightMatch_);

    if (input_) {
      // Look for continuation of a match on the left and/or right sides.
      if (!findEndOfMatch(leftMatch_.value(), input_, leftKeys_)) {
        // Continue looking for the end of the match.
        input_ = nullptr;
        return nullptr;
      }

      if (leftMatch_->inputs.back() == input_) {
        index_ = leftMatch_->endIndex;
      }
    } else if (noMoreInput_) {
      leftMatch_->complete = true;
    } else {
      // Need more input.
      return nullptr;
    }

    if (rightInput_) {
      if (!findEndOfMatch(rightMatch_.value(), rightInput_, rightKeys_)) {
        // Continue looking for the end of the match.
        rightInput_ = nullptr;
        return nullptr;
      }
      if (rightMatch_->inputs.back() == rightInput_) {
        rightIndex_ = rightMatch_->endIndex;
      }
    } else if (noMoreRightInput_) {
      rightMatch_->complete = true;
    } else {
      // Need more input.
      return nullptr;
    }
  }

  // There is no output-in-progress match, but there can be a complete match
  // ready for output.
  if (leftMatch_) {
    VELOX_CHECK(leftMatch_->complete);
    VELOX_CHECK(rightMatch_ && rightMatch_->complete);

    if (addToOutput()) {
      return std::move(output_);
    }
  }

  if (!input_ || !rightInput_) {
    if (isLeftJoin(joinType_)) {
      if (input_ && noMoreRightInput_) {
        prepareOutput();
        while (true) {
          if (outputSize_ == outputBatchSize_) {
            return std::move(output_);
          }

          addOutputRowForLeftJoin();

          ++index_;
          if (index_ == input_->size()) {
            // Ran out of rows on the left side.
            input_ = nullptr;
            return nullptr;
          }
        }
      }

      if (noMoreInput_ && output_) {
        output_->resize(outputSize_);
        return std::move(output_);
      }
    } else {
      if (noMoreInput_ || noMoreRightInput_) {
        if (output_) {
          output_->resize(outputSize_);
          return std::move(output_);
        }
        input_ = nullptr;
      }
    }

    return nullptr;
  }

  // Look for a new match starting with index_ row on the left and rightIndex_
  // row on the right.
  auto compareResult = compare();

  for (;;) {
    // Catch up input_ with rightInput_.
    while (compareResult < 0) {
      if (isLeftJoin(joinType_)) {
        prepareOutput();

        if (outputSize_ == outputBatchSize_) {
          return std::move(output_);
        }

        addOutputRowForLeftJoin();
      }

      ++index_;
      if (index_ == input_->size()) {
        // Ran out of rows on the left side.
        input_ = nullptr;
        return nullptr;
      }
      compareResult = compare();
    }

    // Catch up rightInput_ with input_.
    while (compareResult > 0) {
      ++rightIndex_;
      if (rightIndex_ == rightInput_->size()) {
        // Ran out of rows on the right side.
        rightInput_ = nullptr;
        return nullptr;
      }
      compareResult = compare();
    }

    if (compareResult == 0) {
      // Found a match. Identify all rows on the left and right that have the
      // matching keys.
      vector_size_t endIndex = index_ + 1;
      while (endIndex < input_->size() && compareLeft(endIndex) == 0) {
        ++endIndex;
      }

      if (endIndex == input_->size()) {
        // Matches continue in subsequent input. Load all lazies.
        loadColumns(input_, *operatorCtx_->execCtx());
      }
      leftMatch_ = Match{
          {input_}, index_, endIndex, endIndex < input_->size(), std::nullopt};

      vector_size_t endRightIndex = rightIndex_ + 1;
      while (endRightIndex < rightInput_->size() &&
             compareRight(endRightIndex) == 0) {
        ++endRightIndex;
      }

      rightMatch_ = Match{
          {rightInput_},
          rightIndex_,
          endRightIndex,
          endRightIndex < rightInput_->size(),
          std::nullopt};

      if (!leftMatch_->complete || !rightMatch_->complete) {
        if (!leftMatch_->complete) {
          // Need to continue looking for the end of match.
          input_ = nullptr;
        }

        if (!rightMatch_->complete) {
          // Need to continue looking for the end of match.
          rightInput_ = nullptr;
        }
        return nullptr;
      }

      index_ = endIndex;
      rightIndex_ = endRightIndex;

      if (addToOutput()) {
        return std::move(output_);
      }

      compareResult = compare();
    }
  }

  VELOX_UNREACHABLE();
}