absl::Status IndexValidator::Validate()

in backend/schema/validators/index_validator.cc [78:225]


absl::Status IndexValidator::Validate(const Index* index,
                                      SchemaValidationContext* context) {
  ZETASQL_RET_CHECK(!index->name_.empty());
  ZETASQL_RET_CHECK_NE(index->indexed_table_, nullptr);
  ZETASQL_RET_CHECK_NE(index->index_data_table_, nullptr);
  if (context->is_postgresql_dialect()) {
    ZETASQL_RET_CHECK(index->postgresql_oid().has_value());
  } else {
    ZETASQL_RET_CHECK(!index->postgresql_oid().has_value());
  }

  ZETASQL_RETURN_IF_ERROR(GlobalSchemaNames::ValidateSchemaName("Index", index->name_));
  if (!index->is_managed() &&
      !SDLObjectName::InSameSchema(index->name_,
                                   index->indexed_table_->Name())) {
    return error::IndexInDifferentSchema(index->name_,
                                         index->indexed_table_->Name());
  }
  if (absl::EqualsIgnoreCase(index->name_, "PRIMARY_KEY")) {
    return error::CannotNameIndexPrimaryKey();
  }
  if (absl::StartsWith(index->name_, "Dir_")) {
    return error::InvalidSchemaName("Index", index->name_);
  }

  if (index->key_columns_.empty()) {
    return error::IndexWithNoKeys(index->name_);
  }

  CaseInsensitiveStringSet keys_set;
  for (const auto* key_column : index->key_columns_) {
    std::string column_name = key_column->column()->Name();
    if (keys_set.contains(column_name)) {
      return error::IndexRefsColumnTwice(index->name_, column_name);
    }

    const auto* column_type = key_column->column()->GetType();
    if (index->is_search_index() && column_type->IsTokenListType()) {
      keys_set.insert(column_name);
      continue;
    }
    if (!IsSupportedKeyColumnType(column_type, index->is_vector_index())) {
      return error::IndexRefsUnsupportedColumn(index->name_,
                                               ToString(column_type));
    }
    if (index->is_null_filtered_) {
      ZETASQL_RET_CHECK(!key_column->column()->is_nullable());
    }
    keys_set.insert(column_name);
  }

  CaseInsensitiveStringSet stored_set;
  for (const auto* column : index->stored_columns_) {
    std::string column_name = column->Name();
    if (keys_set.contains(column_name)) {
      return error::IndexRefsKeyAsStoredColumn(index->name_, column_name);
    }
    if (index->indexed_table_->FindKeyColumn(column_name) != nullptr) {
      return error::IndexRefsTableKeyAsStoredColumn(
          index->name_, column_name, index->indexed_table_->Name());
    }
    if (stored_set.contains(column_name)) {
      return error::IndexRefsColumnTwice(index->name_, column_name);
    }
    stored_set.insert(column_name);
  }

  if (index->parent()) {
    const Table* table = index->indexed_table_;
    while (table != index->parent() && table->parent()) {
      table = table->parent();
    }
    if (table != index->parent()) {
      return error::IndexInterleaveTableUnacceptable(
          index->name_, index->indexed_table_->Name(), index->parent()->Name());
    }
  }

  if (index->is_search_index()) {
    // check partition by restrictions
    for (const auto* column : index->partition_by_) {
      const auto* column_type = column->GetType();
      if (column_type->IsTokenListType()) {
        return error::SearchIndexNotPartitionByokenListType(index->name_,
                                                            column->Name());
      }
    }

    // check order by restrictions
    for (const auto* column : index->order_by_) {
      if (column->is_nullable() && !index->is_null_filtered_column(column)) {
        return error::SearchIndexSortMustBeNotNullError(column->Name(),
                                                        index->name_);
      }

      const auto* column_type = column->GetType();
      if (!column_type->IsInteger()) {
        return error::SearchIndexOrderByMustBeIntegerType(
            index->name_, column->Name(), ToString(column_type));
      }
    }
  }

  if (index->is_vector_index()) {
    if (!index->partition_by_.empty()) {
      return error::VectorIndexPartitionByUnsupported(index->name_);
    }

    ZETASQL_RET_CHECK(index->key_columns_.size() == 1);
    const KeyColumn* key_column = index->key_columns_[0];
    if (!key_column->column()->GetType()->IsArray()) {
      return error::VectorIndexNonArrayKey(key_column->column()->Name(),
                                           index->name_);
    }

    const zetasql::Type* element_type =
        key_column->column()->GetType()->AsArray()->element_type();
    if (!element_type->IsFloat() && !element_type->IsDouble()) {
      return error::VectorIndexArrayKeyMustBeFloatOrDouble(
          key_column->column()->Name(), index->name_);
    }

    if (!key_column->column()->has_vector_length()) {
      return error::VectorIndexArrayKeyMustHaveVectorLength(
          key_column->column()->Name(), index->name_);
    }

    const int32_t max_vector_length = 8000;
    if (key_column->column()->vector_length() > max_vector_length) {
      return error::VectorIndexArrayKeyVectorLengthTooLarge(
          key_column->column()->Name(), index->name_,
          key_column->column()->vector_length().value(), max_vector_length);
    }

    bool is_key_null_filtered = false;
    for (const auto* column : index->null_filtered_columns_) {
      if (key_column->column()->Name() == column->Name()) {
        is_key_null_filtered = true;
        break;
      }
    }
    if (key_column->column()->is_nullable() && !is_key_null_filtered) {
      return error::VectorIndexKeyNotNullFiltered(key_column->column()->Name(),
                                                  index->name_);
    }
  }
  return absl::OkStatus();
}