absl::Status SchemaUpdaterImpl::ValidateSequenceOptions()

in backend/schema/updater/schema_updater.cc [4284:4360]


absl::Status SchemaUpdaterImpl::ValidateSequenceOptions(
    const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options,
    const Sequence* current_sequence) {
  absl::flat_hash_map<std::string, std::optional<int64_t>> int64_options;
  int64_options.reserve(3);
  for (const ddl::SetOption& option : set_options) {
    if (option.option_name() == kSequenceKindOptionName) {
      if (!option.has_string_value()) {
        return error::InvalidSequenceOptionValue(kSequenceKindOptionName,
                                                 "string");
      }
      if (option.string_value() != kSequenceKindBitReversedPositive) {
        return error::UnsupportedSequenceKind(option.string_value());
      }
    } else if (option.option_name() == kSequenceStartWithCounterOptionName ||
               option.option_name() == kSequenceSkipRangeMinOptionName ||
               option.option_name() == kSequenceSkipRangeMaxOptionName) {
      // In this block, we're only assigning values to the temporary
      // `int64_options` map as we see them. We check for their validity
      // after this for loop.

      // Checking for null value here is necessary, since a sequence can be
      // altered to clear the option.
      if (option.has_null_value()) {
        int64_options[option.option_name()] = std::nullopt;
      } else {
        if (!option.has_int64_value()) {
          return error::InvalidSequenceOptionValue(option.option_name(),
                                                   "integer");
        }
        int64_options[option.option_name()] = option.int64_value();
      }
    } else {
      return error::UnsupportedSequenceOption(option.option_name());
    }
  }

  if (int64_options.contains(kSequenceStartWithCounterOptionName)) {
    if (int64_options[kSequenceStartWithCounterOptionName].has_value() &&
        int64_options[kSequenceStartWithCounterOptionName].value() < 1) {
      return error::InvalidSequenceStartWithCounterValue();
    }
  }

  std::optional<int64_t> skip_range_min, skip_range_max;
  if (current_sequence != nullptr) {
    // We are altering the sequence to set options. So we're loading the
    // current values of these options to compare against the new values we're
    // seeing.
    skip_range_min = current_sequence->skip_range_min();
    skip_range_max = current_sequence->skip_range_max();
  }
  // In this condition and below, if skip_range_min (or skip_range_max) is
  // explicitly set to null, we are overwriting its current value here.
  if (int64_options.contains(kSequenceSkipRangeMinOptionName)) {
    skip_range_min = int64_options[kSequenceSkipRangeMinOptionName];
  }
  if (int64_options.contains(kSequenceSkipRangeMaxOptionName)) {
    skip_range_max = int64_options[kSequenceSkipRangeMaxOptionName];
  }

  if (skip_range_min.has_value() != skip_range_max.has_value()) {
    return error::SequenceSkipRangeMinMaxNotSetTogether();
  }
  if (!skip_range_min.has_value()) {
    // Skipped range is not set, safely return.
    return absl::OkStatus();
  }
  if (skip_range_min.value() > skip_range_max.value()) {
    return error::SequenceSkipRangeMinLargerThanMax();
  }
  if (skip_range_max.value() < 1) {
    return error::SequenceSkippedRangeHasAtleastOnePositiveNumber();
  }

  return absl::OkStatus();
}