backend/schema/validators/table_validator.cc (409 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "backend/schema/validators/table_validator.h" #include <algorithm> #include <optional> #include <string> #include <vector> #include "zetasql/public/type.h" #include "absl/algorithm/container.h" #include "absl/container/flat_hash_map.h" #include "absl/functional/function_ref.h" #include "absl/memory/memory.h" #include "absl/status/status.h" #include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_join.h" #include "absl/strings/substitute.h" #include "absl/synchronization/mutex.h" #include "backend/common/case.h" #include "backend/common/graph_dependency_helper.h" #include "backend/datamodel/types.h" #include "backend/schema/catalog/column.h" #include "backend/schema/catalog/foreign_key.h" #include "backend/schema/updater/global_schema_names.h" #include "common/errors.h" #include "common/limits.h" #include "zetasql/base/ret_check.h" #include "absl/status/status.h" #include "zetasql/base/status_macros.h" namespace google { namespace spanner { namespace emulator { namespace backend { namespace { absl::Status CheckKeyPartCompatibility(const Table* interleaved_table, const KeyColumn* parent_key, const KeyColumn* child_key, bool ignore_nullability) { const std::string object_type = OwningObjectType(interleaved_table); const std::string object_name = OwningObjectName(interleaved_table); const Column* parent_key_col = parent_key->column(); const Column* child_key_col = child_key->column(); if (child_key_col->Name() != parent_key_col->Name()) { // The parent key column does not match the child key column. But perhaps // the child declared the key column in a different position. Provide a // more helpful error message in this case (as they do refer to the parent // column, just at the wrong location). auto child_pk = interleaved_table->owner_index() ? interleaved_table->owner_index()->key_columns() : interleaved_table->primary_key(); for (int i = 0; i < child_pk.size(); ++i) { if (child_pk[i]->column()->Name() == parent_key_col->Name()) { return error::IncorrectParentKeyPosition(object_type, object_name, parent_key_col->Name(), i); } } return error::MustReferenceParentKeyColumn(object_type, object_name, parent_key_col->Name()); } // Parent and child key sort orders should match. if (child_key->is_descending() != parent_key->is_descending()) { return error::IncorrectParentKeyOrder( object_type, object_name, parent_key_col->Name(), child_key->is_descending() ? "ASC" : "DESC"); } // Parent and child key column types should match. if (!child_key_col->GetType()->Equals(parent_key_col->GetType())) { return error::IncorrectParentKeyType(object_type, object_name, parent_key_col->Name(), ToString(child_key_col->GetType()), ToString(parent_key_col->GetType())); } // We already checked the type. Check the type length. if (child_key_col->declared_max_length() != parent_key_col->declared_max_length()) { auto column_length = [](const Column* column) { return column->declared_max_length().has_value() ? absl::StrCat(column->declared_max_length().value()) : "MAX"; }; return error::IncorrectParentKeyLength( object_type, object_name, parent_key_col->Name(), column_length(child_key_col), column_length(parent_key_col)); } // We ignore nullability check for scenarios where the child table // keys belong to a null filtered interleaved index. if (ignore_nullability) { return absl::OkStatus(); } // Parent and child key column nullability should match. if (child_key_col->is_nullable() != parent_key_col->is_nullable()) { return error::IncorrectParentKeyNullability( object_type, object_name, parent_key_col->Name(), parent_key_col->is_nullable() ? "nullable" : "not null", child_key_col->is_nullable() ? "nullable" : "not null"); } return absl::Status(); } absl::Status CheckInterleaveDepthLimit(const Table* table) { int depth = 1; const Table* to_test = table; while (to_test->parent()) { to_test = to_test->parent(); ++depth; if (depth > limits::kMaxInterleavingDepth) { return error::DeepNesting(OwningObjectType(table), OwningObjectName(table), limits::kMaxInterleavingDepth); } } return absl::OkStatus(); } absl::string_view GetColumnName(const Column* const& column) { return column->Name(); } // Validate descedants of table using level-order traversal. absl::Status ValidateDescendantTables( const Table* table, absl::FunctionRef<absl::Status(const Table*)> validateFn) { ZETASQL_RET_CHECK(table != nullptr); std::vector<const Table*> queue; for (auto* children : table->children()) { queue.push_back(children); } while (!queue.empty()) { const Table* child = queue.back(); queue.pop_back(); absl::Status s = validateFn(child); if (!s.ok()) { return s; } for (auto* grandchildren : child->children()) { queue.push_back(grandchildren); } } return absl::OkStatus(); } absl::Status ValidateRowDeletionPolicy( std::optional<ddl::RowDeletionPolicy> row_deletion_policy, const Table* table) { if (!row_deletion_policy.has_value()) { return absl::OkStatus(); } auto table_name = table->Name(); auto column_name = row_deletion_policy->column_name(); auto* column = table->FindColumn(column_name); if (column == nullptr) { return error::RowDeletionPolicyOnColumnDoesNotExist(column_name, table_name); } if (!column->GetType()->IsTimestamp()) { return error::RowDeletionPolicyOnNonTimestampColumn(column_name, table_name); } ZETASQL_RETURN_IF_ERROR(ValidateDescendantTables(table, [&](const Table* children) { if (children->on_delete_action() == Table::OnDeleteAction::kNoAction) { return error::RowDeletionPolicyHasChildWithOnDeleteNoAction( table_name, children->Name()); } else { return absl::OkStatus(); } })); auto is_enforced = [](const ForeignKey* fk) { return fk->enforced(); }; const int enforced_count = absl::c_count_if(table->referencing_foreign_keys(), is_enforced); if (enforced_count == 0) { return absl::OkStatus(); } std::vector<const ForeignKey*> enforced_fks(enforced_count); absl::c_copy_if(table->referencing_foreign_keys(), enforced_fks.begin(), is_enforced); return error::ForeignKeyRowDeletionPolicyAddNotAllowed( table_name, absl::StrJoin(enforced_fks, ",", [](std::string* out, auto fk) { absl::StrAppend(out, fk->Name()); })); } absl::Status ValidateUpdateRowDeletionPolicy(const Table* table, const Table* old_table) { ZETASQL_RETURN_IF_ERROR( ValidateRowDeletionPolicy(table->row_deletion_policy(), old_table)); // This handles the case when an alter only affects the child tables. if (table->on_delete_action() != Table::OnDeleteAction::kCascade && table->parent() != nullptr && table->parent()->row_deletion_policy().has_value()) { return error::RowDeletionPolicyOnAncestors(table->Name(), table->parent()->Name()); } return absl::OkStatus(); } } // namespace absl::Status TableValidator::Validate(const Table* table, SchemaValidationContext* context) { ZETASQL_RET_CHECK(!table->name_.empty()); ZETASQL_RET_CHECK(!table->id_.empty()); if (table->is_public()) { ZETASQL_RETURN_IF_ERROR( GlobalSchemaNames::ValidateSchemaName("Table", table->name_)); if (context->is_postgresql_dialect()) { ZETASQL_RET_CHECK(table->postgresql_oid().has_value()); } else { ZETASQL_RET_CHECK(!table->postgresql_oid().has_value()); } if (!table->synonym_.empty()) { ZETASQL_RETURN_IF_ERROR( GlobalSchemaNames::ValidateSchemaName("Synonym", table->synonym_)); } } const std::string object_type = OwningObjectType(table); const std::string object_name = OwningObjectName(table); // Validate that all columns are unique. CaseInsensitiveStringSet unique_columns; for (const Column* column : table->columns_) { ZETASQL_RET_CHECK_NE(column, nullptr); std::string column_name = column->Name(); ZETASQL_RET_CHECK_EQ(column->table(), table); if (unique_columns.contains(column_name)) { return error::DuplicateColumnName(column->FullName()); } unique_columns.insert(column_name); } if (table->columns_.size() > limits::kMaxColumnsPerTable) { return error::TooManyColumns(object_type, object_name, limits::kMaxColumnsPerTable); } // Validate that all key columns are unique. CaseInsensitiveStringSet unique_keys; for (const KeyColumn* key_column : table->primary_key_) { ZETASQL_RET_CHECK_NE(key_column, nullptr); const Column* column = key_column->column(); ZETASQL_RET_CHECK_NE(column, nullptr); const Column* table_column = table->FindColumn(column->Name()); ZETASQL_RET_CHECK_EQ(table_column, column); if (unique_keys.contains(column->Name())) { return error::MultipleRefsToKeyColumn(object_type, object_name, column->Name()); } unique_keys.insert(column->Name()); } if (table->primary_key_.size() > limits::kMaxKeyColumns) { return error::TooManyKeys(object_type, object_name, table->primary_key_.size(), limits::kMaxKeyColumns); } if (!table->indexes_.empty()) { ZETASQL_RET_CHECK(!table->columns_.empty()); } for (const Index* index : table->indexes_) { ZETASQL_RET_CHECK_NE(index, nullptr); ZETASQL_RET_CHECK_EQ(index->indexed_table(), table); } if (table->indexes_.size() > limits::kMaxIndexesPerTable) { const Index* last_index = table->indexes_[limits::kMaxIndexesPerTable]; return error::TooManyIndicesPerTable(last_index->Name(), table->Name(), limits::kMaxIndexesPerTable); } // Check interleave compatibility. if (!table->parent_table_) { if (table->on_delete_action_.has_value()) { return error::SetOnDeleteWithoutInterleaving(table->Name()); } } else { bool ignore_nullability = table->owner_index() != nullptr && table->owner_index()->is_null_filtered(); ZETASQL_RET_CHECK(table->parent_table_->is_public() // Change stream partition table should have interleave // compatibility even though it's not a public table. || table->parent_table_->owner_change_stream() != nullptr); auto parent_pk = table->parent_table_->primary_key(); for (int i = 0; i < parent_pk.size(); ++i) { // The child has fewer primary key parts than the parent. if (i >= table->primary_key_.size()) { return error::MustReferenceParentKeyColumn( OwningObjectType(table), OwningObjectName(table), parent_pk[i]->column()->Name()); } ZETASQL_RETURN_IF_ERROR(CheckKeyPartCompatibility( table, parent_pk[i], table->primary_key_[i], ignore_nullability)); } ZETASQL_RETURN_IF_ERROR(CheckInterleaveDepthLimit(table)); // Cannot add a table with no columns as a child. if (table->columns_.empty()) { return error::NoColumnsTable(OwningObjectType(table), OwningObjectName(table)); } } for (const Table* child : table->child_tables_) { ZETASQL_RET_CHECK_NE(child, nullptr); ZETASQL_RET_CHECK_EQ(child->parent(), table); } for (const ForeignKey* foreign_key : table->foreign_keys_) { ZETASQL_RET_CHECK_NE(foreign_key, nullptr); ZETASQL_RET_CHECK_EQ(foreign_key->referencing_table(), table); } for (const ForeignKey* referencing_foreign_key : table->referencing_foreign_keys_) { ZETASQL_RET_CHECK_NE(referencing_foreign_key, nullptr); ZETASQL_RET_CHECK_EQ(referencing_foreign_key->referenced_table(), table); } if (table->owner_index_) { ZETASQL_RET_CHECK_EQ(table->indexes_.size(), 0); ZETASQL_RET_CHECK_EQ(table->child_tables_.size(), 0); ZETASQL_RET_CHECK(!table->columns_.empty()); ZETASQL_RET_CHECK(!table->primary_key_.empty()); ZETASQL_RET_CHECK_EQ(table->owner_index_->index_data_table(), table); } // Validate generated columns. GraphDependencyHelper<const Column*, GetColumnName> cycle_detector( /*object_type=*/"generated column"); for (const Column* column : table->columns()) { ZETASQL_RETURN_IF_ERROR(cycle_detector.AddNodeIfNotExists(column)); } for (const Column* column : table->columns()) { if (column->is_generated()) { for (const Column* dep : column->dependent_columns()) { ZETASQL_RETURN_IF_ERROR( cycle_detector.AddEdgeIfNotExists(column->Name(), dep->Name())); } } } ZETASQL_RETURN_IF_ERROR(cycle_detector.DetectCycle()); ZETASQL_RETURN_IF_ERROR( ValidateRowDeletionPolicy(table->row_deletion_policy(), table)); return absl::OkStatus(); } absl::Status TableValidator::ValidateUpdate(const Table* table, const Table* old_table, SchemaValidationContext* context) { if (table->is_deleted()) { ZETASQL_RET_CHECK(!table->owner_index_ || table->owner_index_->is_deleted()); if (!table->child_tables_.empty()) { // Build a sorted list of interleaved child tables and indexes. std::vector<std::string> interleaved_tables; std::vector<std::string> interleaved_indices; for (const auto& entry : table->child_tables_) { if (entry->owner_index()) { interleaved_indices.push_back(entry->owner_index()->Name()); } else { interleaved_tables.push_back(entry->Name()); } } std::sort(interleaved_tables.begin(), interleaved_tables.end()); std::sort(interleaved_indices.begin(), interleaved_indices.end()); // Cannot drop a table with interleaved child tables or indexes. if (!interleaved_tables.empty()) { return error::DropTableWithInterleavedTables( table->name_, absl::StrJoin(interleaved_tables, ",")); } else if (!interleaved_indices.empty()) { return error::DropTableWithDependentIndices( table->name_, absl::StrJoin(interleaved_indices, ",")); } } if (!table->indexes_.empty()) { return error::DropTableWithDependentIndices( table->name_, absl::StrJoin(table->indexes_.begin(), table->indexes_.end(), ",", [](std::string* out, const Index* child) { return out->append(child->Name()); })); } if (!table->change_streams_explicitly_tracking_table().empty()) { return error::DropTableWithDependentChangeStreams( table->name_, absl::StrJoin(table->change_streams().begin(), table->change_streams().end(), ",", [](std::string* out, const ChangeStream* child) { return out->append(child->Name()); })); } context->global_names()->RemoveName(table->Name()); if (!table->synonym().empty()) { context->global_names()->RemoveName(table->synonym()); } return absl::OkStatus(); } // ID should not change during cloning, but the name can. ZETASQL_RET_CHECK_EQ(table->id(), old_table->id()); if (table->is_public() && context->is_postgresql_dialect()) { ZETASQL_RET_CHECK(table->postgresql_oid().has_value()); ZETASQL_RET_CHECK(old_table->postgresql_oid().has_value()); ZETASQL_RET_CHECK_EQ(table->postgresql_oid().value(), old_table->postgresql_oid().value()); } else { ZETASQL_RET_CHECK(!table->postgresql_oid().has_value()); ZETASQL_RET_CHECK(!old_table->postgresql_oid().has_value()); } if (table->owner_change_stream_) { ZETASQL_RET_CHECK(!table->owner_change_stream_->is_deleted()); } if (table->owner_index_) { ZETASQL_RET_CHECK(!table->owner_index_->is_deleted()); } // Check additional constraints on new columns. for (const Column* column : table->columns()) { // Ignore old columns. if (old_table->FindColumn(column->Name()) != nullptr) { continue; } // New columns cannot be nullable unless it is a generated column or // it has a default value. Index stored columns of the index data table // could be as non nullable as the source table is, but these checks // (nullable, generated, default value) do not apply. if (!table->owner_index_ && !column->is_nullable() && !column->is_generated() && !column->has_default_value()) { return error::AddingNotNullColumn(table->name_, column->Name()); } } // Cannot drop key columns, change their order or nullability. ZETASQL_RET_CHECK_EQ(table->primary_key_.size(), old_table->primary_key_.size()); for (int i = 0; i < table->primary_key_.size(); ++i) { if (table->primary_key_[i]->is_deleted()) { return error::InvalidDropKeyColumn( table->primary_key_[i]->column()->Name(), table->name_); } ZETASQL_RET_CHECK_EQ(table->primary_key_[i]->is_descending(), old_table->primary_key()[i]->is_descending()); if (table->primary_key_[i]->column()->is_nullable() != old_table->primary_key()[i]->column()->is_nullable()) { std::string reason = absl::Substitute( "from $0 to $1", old_table->primary_key()[i]->column()->is_nullable() ? "NULL" : "NOT NULL", table->primary_key_[i]->column()->is_nullable() ? "NULL" : "NOT NULL"); return error::CannotChangeKeyColumn( absl::StrCat(table->name_, ".", table->primary_key_[i]->column()->Name()), reason); } } if (!table->synonym().empty() && !old_table->synonym().empty() && table->synonym() != old_table->synonym()) { return error::CannotAlterSynonym(table->synonym(), table->name_); } ZETASQL_RETURN_IF_ERROR(ValidateUpdateRowDeletionPolicy(table, old_table)); return absl::OkStatus(); } } // namespace backend } // namespace emulator } // namespace spanner } // namespace google