void SubstraitToVeloxPlanConverter::constructSubfieldFilters()

in cpp/velox/substrait/SubstraitToVeloxPlan.cc [1994:2160]


void SubstraitToVeloxPlanConverter::constructSubfieldFilters(
    uint32_t colIdx,
    const std::string& inputName,
    const TypePtr& inputType,
    const FilterInfo& filterInfo,
    connector::hive::SubfieldFilters& filters) {
  if (!filterInfo.isInitialized()) {
    return;
  }

  bool nullAllowed = filterInfo.nullAllowed_;
  bool isNull = filterInfo.isNull_;
  uint32_t rangeSize = std::max(filterInfo.lowerBounds_.size(), filterInfo.upperBounds_.size());

  if constexpr (KIND == facebook::velox::TypeKind::HUGEINT) {
    // TODO: open it when the Velox's modification is ready.
    VELOX_NYI("constructSubfieldFilters not support for HUGEINT type");
  } else if constexpr (KIND == facebook::velox::TypeKind::BOOLEAN) {
    // Handle bool type filters.
    // Not equal.
    if (filterInfo.notValue_) {
      filters[common::Subfield(inputName)] =
          std::move(std::make_unique<common::BoolValue>(!filterInfo.notValue_.value().value<bool>(), nullAllowed));
    } else if (rangeSize == 0) {
      // IsNull/IsNotNull.
      if (!nullAllowed) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNotNull>());
      } else if (isNull) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNull>());
      } else {
        VELOX_NYI("Only IsNotNull and IsNull are supported in constructSubfieldFilters when no other filter ranges.");
      }
      return;
    } else {
      // Equal.
      auto value = filterInfo.lowerBounds_[0].value().value<bool>();
      VELOX_CHECK(value == filterInfo.upperBounds_[0].value().value<bool>(), "invalid state of bool equal");
      filters[common::Subfield(inputName)] = std::move(std::make_unique<common::BoolValue>(value, nullAllowed));
    }
  } else if constexpr (KIND == facebook::velox::TypeKind::ARRAY || KIND == facebook::velox::TypeKind::MAP) {
    // Only IsNotNull and IsNull are supported for array and map types.
    if (rangeSize == 0) {
      if (!nullAllowed) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNotNull>());
      } else if (isNull) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNull>());
      } else {
        VELOX_NYI(
            "Only IsNotNull and IsNull are supported in constructSubfieldFilters for input type '{}'.",
            inputType->toString());
      }
    }
  } else {
    using NativeType = typename RangeTraits<KIND>::NativeType;
    using RangeType = typename RangeTraits<KIND>::RangeType;
    using MultiRangeType = typename RangeTraits<KIND>::MultiRangeType;

    // Handle 'in' filter.
    if (filterInfo.values_.size() > 0) {
      // To filter out null is a default behaviour of Spark IN expression.
      nullAllowed = false;
      setInFilter<KIND>(filterInfo.values_, nullAllowed, inputName, filters);
      // Currently, In cannot coexist with other filter conditions
      // due to multirange is in 'OR' relation but 'AND' is needed.
      VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after IN filter.");
      VELOX_CHECK(!filterInfo.notValue_.has_value(), "Not equal cannot be supported after IN filter.");
      return;
    }

    // Construct the Filters.
    std::vector<std::unique_ptr<FilterType>> colFilters;

    // Handle not(equal) filter.
    if (filterInfo.notValue_) {
      variant notVariant = filterInfo.notValue_.value();
      createNotEqualFilter<KIND, FilterType>(notVariant, filterInfo.nullAllowed_, colFilters);
      // Currently, Not-equal cannot coexist with other filter conditions
      // due to multirange is in 'OR' relation but 'AND' is needed.
      VELOX_CHECK(rangeSize == 0, "LowerBounds or upperBounds conditons cannot be supported after not-equal filter.");
      if constexpr (std::is_same_v<MultiRangeType, common::MultiRange>) {
        if (colFilters.size() == 1) {
          filters[common::Subfield(inputName)] = std::move(colFilters.front());
        } else {
          filters[common::Subfield(inputName)] =
              std::make_unique<common::MultiRange>(std::move(colFilters), nullAllowed, true /*nanAllowed*/);
        }
      } else {
        if (colFilters.size() == 1) {
          filters[common::Subfield(inputName)] = std::move(colFilters.front());
        } else {
          filters[common::Subfield(inputName)] = std::make_unique<MultiRangeType>(std::move(colFilters), nullAllowed);
        }
      }
      return;
    }

    // Handle null filtering.
    if (rangeSize == 0) {
      if (!nullAllowed) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNotNull>());
      } else if (isNull) {
        filters[common::Subfield(inputName)] = std::move(std::make_unique<common::IsNull>());
      } else {
        VELOX_NYI("Only IsNotNull and IsNull are supported in constructSubfieldFilters when no other filter ranges.");
      }
      return;
    }

    NativeType lowerBound;
    if constexpr (KIND == facebook::velox::TypeKind::BIGINT) {
      if (inputType->isShortDecimal()) {
        lowerBound = DecimalUtil::kShortDecimalMin;
      } else {
        lowerBound = getLowest<NativeType>();
      }
    } else {
      lowerBound = getLowest<NativeType>();
    }

    NativeType upperBound;
    if constexpr (KIND == facebook::velox::TypeKind::BIGINT) {
      if (inputType->isShortDecimal()) {
        upperBound = DecimalUtil::kShortDecimalMax;
      } else {
        upperBound = getMax<NativeType>();
      }
    } else {
      upperBound = getMax<NativeType>();
    }

    bool lowerUnbounded = true;
    bool upperUnbounded = true;
    bool lowerExclusive = false;
    bool upperExclusive = false;

    // Handle other filter ranges.
    for (uint32_t idx = 0; idx < rangeSize; idx++) {
      if (idx < filterInfo.lowerBounds_.size() && filterInfo.lowerBounds_[idx]) {
        lowerUnbounded = false;
        variant lowerVariant = filterInfo.lowerBounds_[idx].value();
        lowerBound = lowerVariant.value<NativeType>();
        lowerExclusive = filterInfo.lowerExclusives_[idx];
      }

      if (idx < filterInfo.upperBounds_.size() && filterInfo.upperBounds_[idx]) {
        upperUnbounded = false;
        variant upperVariant = filterInfo.upperBounds_[idx].value();
        upperBound = upperVariant.value<NativeType>();
        upperExclusive = filterInfo.upperExclusives_[idx];
      }

      std::unique_ptr<FilterType> filter;
      if constexpr (std::is_same_v<RangeType, common::BigintRange>) {
        filter = std::move(std::make_unique<common::BigintRange>(
            lowerExclusive ? lowerBound + 1 : lowerBound, upperExclusive ? upperBound - 1 : upperBound, nullAllowed));
      } else {
        filter = std::move(std::make_unique<RangeType>(
            lowerBound, lowerUnbounded, lowerExclusive, upperBound, upperUnbounded, upperExclusive, nullAllowed));
      }

      colFilters.emplace_back(std::move(filter));
    }

    // Set the SubfieldFilter.
    setSubfieldFilter<KIND, FilterType>(std::move(colFilters), inputName, filterInfo.nullAllowed_, filters);
  }
}