absl::Status TableValidator::Validate()

in backend/schema/validators/table_validator.cc [238:388]


absl::Status TableValidator::Validate(const Table* table,
                                      SchemaValidationContext* context) {
  ZETASQL_RET_CHECK(!table->name_.empty());
  ZETASQL_RET_CHECK(!table->id_.empty());

  if (table->is_public()) {
    ZETASQL_RETURN_IF_ERROR(
        GlobalSchemaNames::ValidateSchemaName("Table", table->name_));
    if (context->is_postgresql_dialect()) {
      ZETASQL_RET_CHECK(table->postgresql_oid().has_value());
    } else {
      ZETASQL_RET_CHECK(!table->postgresql_oid().has_value());
    }
    if (!table->synonym_.empty()) {
      ZETASQL_RETURN_IF_ERROR(
          GlobalSchemaNames::ValidateSchemaName("Synonym", table->synonym_));
    }
  }

  const std::string object_type = OwningObjectType(table);
  const std::string object_name = OwningObjectName(table);

  // Validate that all columns are unique.
  CaseInsensitiveStringSet unique_columns;
  for (const Column* column : table->columns_) {
    ZETASQL_RET_CHECK_NE(column, nullptr);
    std::string column_name = column->Name();
    ZETASQL_RET_CHECK_EQ(column->table(), table);
    if (unique_columns.contains(column_name)) {
      return error::DuplicateColumnName(column->FullName());
    }
    unique_columns.insert(column_name);
  }

  if (table->columns_.size() > limits::kMaxColumnsPerTable) {
    return error::TooManyColumns(object_type, object_name,
                                 limits::kMaxColumnsPerTable);
  }

  // Validate that all key columns are unique.
  CaseInsensitiveStringSet unique_keys;
  for (const KeyColumn* key_column : table->primary_key_) {
    ZETASQL_RET_CHECK_NE(key_column, nullptr);
    const Column* column = key_column->column();
    ZETASQL_RET_CHECK_NE(column, nullptr);
    const Column* table_column = table->FindColumn(column->Name());
    ZETASQL_RET_CHECK_EQ(table_column, column);
    if (unique_keys.contains(column->Name())) {
      return error::MultipleRefsToKeyColumn(object_type, object_name,
                                            column->Name());
    }
    unique_keys.insert(column->Name());
  }

  if (table->primary_key_.size() > limits::kMaxKeyColumns) {
    return error::TooManyKeys(object_type, object_name,
                              table->primary_key_.size(),
                              limits::kMaxKeyColumns);
  }

  if (!table->indexes_.empty()) {
    ZETASQL_RET_CHECK(!table->columns_.empty());
  }

  for (const Index* index : table->indexes_) {
    ZETASQL_RET_CHECK_NE(index, nullptr);
    ZETASQL_RET_CHECK_EQ(index->indexed_table(), table);
  }

  if (table->indexes_.size() > limits::kMaxIndexesPerTable) {
    const Index* last_index = table->indexes_[limits::kMaxIndexesPerTable];
    return error::TooManyIndicesPerTable(last_index->Name(), table->Name(),
                                         limits::kMaxIndexesPerTable);
  }

  // Check interleave compatibility.
  if (!table->parent_table_) {
    if (table->on_delete_action_.has_value()) {
      return error::SetOnDeleteWithoutInterleaving(table->Name());
    }
  } else {
    bool ignore_nullability = table->owner_index() != nullptr &&
                              table->owner_index()->is_null_filtered();
    ZETASQL_RET_CHECK(table->parent_table_->is_public()
              // Change stream partition table should have interleave
              // compatibility even though it's not a public table.
              || table->parent_table_->owner_change_stream() != nullptr);
    auto parent_pk = table->parent_table_->primary_key();
    for (int i = 0; i < parent_pk.size(); ++i) {
      // The child has fewer primary key parts than the parent.
      if (i >= table->primary_key_.size()) {
        return error::MustReferenceParentKeyColumn(
            OwningObjectType(table), OwningObjectName(table),
            parent_pk[i]->column()->Name());
      }

      ZETASQL_RETURN_IF_ERROR(CheckKeyPartCompatibility(
          table, parent_pk[i], table->primary_key_[i], ignore_nullability));
    }
    ZETASQL_RETURN_IF_ERROR(CheckInterleaveDepthLimit(table));

    // Cannot add a table with no columns as a child.
    if (table->columns_.empty()) {
      return error::NoColumnsTable(OwningObjectType(table),
                                   OwningObjectName(table));
    }
  }

  for (const Table* child : table->child_tables_) {
    ZETASQL_RET_CHECK_NE(child, nullptr);
    ZETASQL_RET_CHECK_EQ(child->parent(), table);
  }

  for (const ForeignKey* foreign_key : table->foreign_keys_) {
    ZETASQL_RET_CHECK_NE(foreign_key, nullptr);
    ZETASQL_RET_CHECK_EQ(foreign_key->referencing_table(), table);
  }
  for (const ForeignKey* referencing_foreign_key :
       table->referencing_foreign_keys_) {
    ZETASQL_RET_CHECK_NE(referencing_foreign_key, nullptr);
    ZETASQL_RET_CHECK_EQ(referencing_foreign_key->referenced_table(), table);
  }

  if (table->owner_index_) {
    ZETASQL_RET_CHECK_EQ(table->indexes_.size(), 0);
    ZETASQL_RET_CHECK_EQ(table->child_tables_.size(), 0);
    ZETASQL_RET_CHECK(!table->columns_.empty());
    ZETASQL_RET_CHECK(!table->primary_key_.empty());
    ZETASQL_RET_CHECK_EQ(table->owner_index_->index_data_table(), table);
  }

  // Validate generated columns.
  GraphDependencyHelper<const Column*, GetColumnName> cycle_detector(
      /*object_type=*/"generated column");
  for (const Column* column : table->columns()) {
    ZETASQL_RETURN_IF_ERROR(cycle_detector.AddNodeIfNotExists(column));
  }
  for (const Column* column : table->columns()) {
    if (column->is_generated()) {
      for (const Column* dep : column->dependent_columns()) {
        ZETASQL_RETURN_IF_ERROR(
            cycle_detector.AddEdgeIfNotExists(column->Name(), dep->Name()));
      }
    }
  }
  ZETASQL_RETURN_IF_ERROR(cycle_detector.DetectCycle());
  ZETASQL_RETURN_IF_ERROR(
      ValidateRowDeletionPolicy(table->row_deletion_policy(), table));

  return absl::OkStatus();
}