RowVectorPtr HashProbe::getOutput()

in velox/exec/HashProbe.cpp [409:502]


RowVectorPtr HashProbe::getOutput() {
  clearIdentityProjectedOutput();
  if (!input_) {
    if (noMoreInput_ && (isRightJoin(joinType_) || isFullJoin(joinType_))) {
      auto output = getNonMatchingOutputForRightJoin();
      if (output == nullptr) {
        finished_ = true;
      }
      return output;
    }
    if (noMoreInput_) {
      finished_ = true;
    }
    return nullptr;
  }

  const auto inputSize = input_->size();

  if (replacedWithDynamicFilter_) {
    stats_.addRuntimeStat("replacedWithDynamicFilterRows", inputSize);
    auto output = Operator::fillOutput(inputSize, nullptr);
    input_ = nullptr;
    return output;
  }

  const bool isSemiOrAntiJoin =
      core::isSemiJoin(joinType_) || core::isAntiJoin(joinType_);

  const bool emptyBuildSide = (table_->numDistinct() == 0);

  // Semi and anti joins are always cardinality reducing, e.g. for a given row
  // of input they produce zero or 1 row of output. Therefore, we can process
  // each batch of input in one go.
  auto outputBatchSize =
      (isSemiOrAntiJoin || emptyBuildSide) ? inputSize : outputBatchSize_;
  auto mapping =
      initializeRowNumberMapping(rowNumberMapping_, outputBatchSize, pool());
  outputRows_.resize(outputBatchSize);

  for (;;) {
    int numOut = 0;

    if (emptyBuildSide) {
      // When build side is empty, anti and left joins return all probe side
      // rows, including ones with null join keys.
      std::iota(mapping.begin(), mapping.end(), 0);
      numOut = inputSize;
    } else if (isAntiJoin(joinType_)) {
      // When build side is not empty, anti join returns probe rows with no
      // nulls in the join key and no match in the build side.
      for (auto i = 0; i < inputSize; i++) {
        if (nonNullRows_.isValid(i) &&
            (!activeRows_.isValid(i) || !lookup_->hits[i])) {
          mapping[numOut] = i;
          ++numOut;
        }
      }
    } else {
      numOut = table_->listJoinResults(
          results_,
          isLeftJoin(joinType_) || isFullJoin(joinType_),
          mapping,
          folly::Range(outputRows_.data(), outputRows_.size()));
    }

    if (!numOut) {
      input_ = nullptr;
      return nullptr;
    }
    VELOX_CHECK_LE(numOut, outputRows_.size());

    numOut = evalFilter(numOut);
    if (!numOut) {
      // The filter was false on all rows.
      if (isSemiOrAntiJoin) {
        input_ = nullptr;
        return nullptr;
      }
      continue;
    }

    if (isRightJoin(joinType_) || isFullJoin(joinType_)) {
      // Mark build-side rows that have a match on the join condition.
      table_->rows()->setProbedFlag(outputRows_.data(), numOut);
    }

    fillOutput(numOut);

    if (isSemiOrAntiJoin || emptyBuildSide) {
      input_ = nullptr;
    }
    return output_;
  }
}