backend/schema/updater/schema_updater.cc (5,429 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "backend/schema/updater/schema_updater.h" #include <algorithm> #include <cstdint> #include <functional> #include <iterator> #include <memory> #include <optional> #include <stack> #include <string> #include <utility> #include <vector> #include "zetasql/base/logging.h" #include "google/spanner/admin/database/v1/common.pb.h" #include "zetasql/public/analyzer.h" #include "zetasql/public/analyzer_options.h" #include "zetasql/public/analyzer_output.h" #include "zetasql/public/catalog.h" #include "zetasql/public/function.h" #include "zetasql/public/function.pb.h" #include "zetasql/public/function_signature.h" #include "zetasql/public/options.pb.h" #include "zetasql/public/simple_catalog.h" #include "zetasql/public/type.h" #include "zetasql/resolved_ast/resolved_ast.h" #include "zetasql/resolved_ast/resolved_node_kind.pb.h" #include "absl/algorithm/container.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/flags/flag.h" #include "absl/functional/function_ref.h" #include "absl/log/log.h" #include "absl/random/random.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/ascii.h" #include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" #include "absl/strings/str_join.h" #include "absl/strings/str_replace.h" #include "absl/strings/string_view.h" #include "absl/strings/strip.h" #include "absl/strings/substitute.h" #include "absl/time/clock.h" #include "absl/time/time.h" #include "absl/types/span.h" #include "backend/common/case.h" #include "backend/common/ids.h" #include "backend/common/utils.h" #include "backend/database/pg_oid_assigner/pg_oid_assigner.h" #include "backend/query/analyzer_options.h" #include "backend/query/catalog.h" #include "backend/query/function_catalog.h" #include "backend/query/prepare_property_graph_catalog.h" #include "backend/schema/backfills/change_stream_backfill.h" #include "backend/schema/backfills/column_value_backfill.h" #include "backend/schema/backfills/index_backfill.h" #include "backend/schema/builders/change_stream_builder.h" #include "backend/schema/builders/check_constraint_builder.h" #include "backend/schema/builders/column_builder.h" #include "backend/schema/builders/database_options_builder.h" #include "backend/schema/builders/foreign_key_builder.h" #include "backend/schema/builders/index_builder.h" #include "backend/schema/builders/locality_group_builder.h" #include "backend/schema/builders/model_builder.h" #include "backend/schema/builders/named_schema_builder.h" #include "backend/schema/builders/placement_builder.h" #include "backend/schema/builders/property_graph_builder.h" #include "backend/schema/builders/sequence_builder.h" #include "backend/schema/builders/table_builder.h" #include "backend/schema/builders/udf_builder.h" #include "backend/schema/builders/view_builder.h" #include "backend/schema/catalog/change_stream.h" #include "backend/schema/catalog/check_constraint.h" #include "backend/schema/catalog/column.h" #include "backend/schema/catalog/database_options.h" #include "backend/schema/catalog/foreign_key.h" #include "backend/schema/catalog/index.h" #include "backend/schema/catalog/locality_group.h" #include "backend/schema/catalog/model.h" #include "backend/schema/catalog/named_schema.h" #include "backend/schema/catalog/placement.h" #include "backend/schema/catalog/property_graph.h" #include "backend/schema/catalog/proto_bundle.h" #include "backend/schema/catalog/schema.h" #include "backend/schema/catalog/sequence.h" #include "backend/schema/catalog/table.h" #include "backend/schema/catalog/udf.h" #include "backend/schema/catalog/view.h" #include "backend/schema/ddl/operations.pb.h" #include "backend/schema/graph/schema_graph.h" #include "backend/schema/graph/schema_graph_editor.h" #include "backend/schema/graph/schema_node.h" #include "backend/schema/parser/ddl_parser.h" #include "backend/schema/updater/ddl_type_conversion.h" #include "backend/schema/updater/global_schema_names.h" #include "backend/schema/updater/schema_validation_context.h" #include "backend/schema/updater/sql_expression_validators.h" #include "backend/schema/verifiers/check_constraint_verifiers.h" #include "backend/schema/verifiers/foreign_key_verifiers.h" #include "backend/storage/storage.h" #include "common/constants.h" #include "common/errors.h" #include "common/feature_flags.h" #include "common/limits.h" #include "third_party/spanner_pg/ddl/ddl_translator.h" #include "third_party/spanner_pg/ddl/pg_to_spanner_ddl_translator.h" #include "third_party/spanner_pg/interface/emulator_parser.h" #include "third_party/spanner_pg/interface/parser_output.h" #include "third_party/spanner_pg/interface/pg_arena.h" #include "third_party/spanner_pg/interface/spangres_translator_interface.h" #include "third_party/spanner_pg/shims/error_shim.h" #include "third_party/spanner_pg/shims/memory_context_pg_arena.h" #include "google/protobuf/repeated_ptr_field.h" #include "zetasql/public/functions/uuid.h" #include "zetasql/base/ret_check.h" #include "zetasql/base/status_macros.h" ABSL_FLAG(bool, cloud_spanner_emulator_disable_cs_retention_check, false, "whether we want to check the retention limit when altering a change " "stream's retention period."); namespace google { namespace spanner { namespace emulator { namespace backend { namespace { namespace database_api = ::google::spanner::admin::database::v1; using ::postgres_translator::interfaces::ExpressionTranslateResult; typedef google::protobuf::RepeatedPtrField<ddl::SetOption> OptionList; // A struct that defines the columns used by an index. struct ColumnsUsedByIndex { std::vector<const KeyColumn*> index_key_columns; std::vector<const Column*> stored_columns; std::vector<const Column*> null_filtered_columns; std::vector<const Column*> partition_by_columns; std::vector<const Column*> order_by_columns; }; // Get all the names of objects in a vector of objects. Useful for creating // error messages with multiple objects, i.e. getting all the names of tables in // a schema. template <typename T> std::vector<std::string> GetObjectNames(absl::Span<const T* const> objects) { std::vector<std::string> names; for (const auto* object : objects) { names.push_back(object->Name()); } return names; } template <typename SetOptions> void SetSequenceOptionsForIdentityColumn( ddl::ColumnDefinition::IdentityColumnDefinition identity_column, SetOptions* set_options) { if (identity_column.has_start_with_counter()) { ddl::SetOption* start_with_counter = set_options->Add(); start_with_counter->set_option_name("start_with_counter"); start_with_counter->set_int64_value(identity_column.start_with_counter()); } if (identity_column.has_skip_range_min()) { ddl::SetOption* skip_range_min = set_options->Add(); skip_range_min->set_option_name("skip_range_min"); skip_range_min->set_int64_value(identity_column.skip_range_min()); ddl::SetOption* skip_range_max = set_options->Add(); skip_range_max->set_option_name("skip_range_max"); skip_range_max->set_int64_value(identity_column.skip_range_max()); } } // A class that processes a set of Cloud Spanner DDL statements, and applies // them to an existing (or empty) `Schema` to obtain the updated `Schema`. // // The effects of the DDL statements are checked for semantic validity during // the process and appropriate errors returned on any violations. // // Implementation note: // Semantic violation checks other than existence checks (required to build // proper reference relationships in the schema graph) should be avoided in this // class and should instead be encoded in the `Validate()` and // `ValidateUpdate()` implementations of `SchemaNode`(s) so that they are // executed during both database schema creation and update. class SchemaUpdaterImpl { public: SchemaUpdaterImpl() = delete; // Private construction only. ~SchemaUpdaterImpl() = default; // Disallow copies but enable moves. SchemaUpdaterImpl(const SchemaUpdaterImpl&) = delete; SchemaUpdaterImpl& operator=(const SchemaUpdaterImpl&) = delete; SchemaUpdaterImpl(SchemaUpdaterImpl&&) = default; // Assignment move is not supported by absl::Time. SchemaUpdaterImpl& operator=(SchemaUpdaterImpl&&) = delete; absl::StatusOr<SchemaUpdaterImpl> static Build( zetasql::TypeFactory* type_factory, TableIDGenerator* table_id_generator, ColumnIDGenerator* column_id_generator, Storage* storage, absl::Time schema_change_ts, PgOidAssigner* pg_oid_assigner, const Schema* existing_schema, std::string_view database_id) { SchemaUpdaterImpl impl(type_factory, table_id_generator, column_id_generator, storage, schema_change_ts, pg_oid_assigner, existing_schema, database_id); ZETASQL_RETURN_IF_ERROR(impl.Init()); return impl; } // Apply DDL statements returning the SchemaValidationContext containing // the schema change actions resulting from each statement. Please note that // it can return an empty vector when all statements are no-op and no changes // are made. absl::StatusOr<std::vector<SchemaValidationContext>> ApplyDDLStatements( const SchemaChangeOperation& schema_change_operation); std::vector<std::unique_ptr<const Schema>> GetIntermediateSchemas() { return std::move(intermediate_schemas_); } private: SchemaUpdaterImpl(zetasql::TypeFactory* type_factory, TableIDGenerator* table_id_generator, ColumnIDGenerator* column_id_generator, Storage* storage, absl::Time schema_change_ts, PgOidAssigner* pg_oid_assigner, const Schema* existing_schema, std::string_view database_id) : type_factory_(type_factory), table_id_generator_(table_id_generator), column_id_generator_(column_id_generator), storage_(storage), schema_change_timestamp_(schema_change_ts), latest_schema_(existing_schema), editor_(nullptr), pg_oid_assigner_(pg_oid_assigner), database_id_(database_id) {} // Initializes potentially failing components after construction. absl::Status Init(); // Applies the given `statement` on to `latest_schema_`. Please note that // it can return a nullptr if the statement is a no-op and no changes are // made. absl::StatusOr<std::unique_ptr<const Schema>> ApplyDDLStatement( absl::string_view statement, absl::string_view proto_descriptor_bytes, const database_api::DatabaseDialect& dialect); // Run any pending schema actions resulting from the schema change statements. absl::Status RunPendingActions( const std::vector<SchemaValidationContext>& pending_work, int* num_succesful); absl::Status InitColumnNameAndTypesFromTable( const Table* table, const ddl::CreateTable* ddl_create_table, std::vector<zetasql::SimpleTable::NameAndType>* name_and_types); absl::Status AnalyzeGeneratedColumn( absl::string_view expression, const std::string& column_name, const zetasql::Type* column_type, const Table* table, const ddl::CreateTable* ddl_create_table, absl::flat_hash_set<std::string>* dependent_column_names, absl::flat_hash_set<const SchemaNode*>* udf_dependencies); absl::Status AnalyzeColumnDefaultValue( absl::string_view expression, const std::string& column_name, const zetasql::Type* column_type, const Table* table, const ddl::CreateTable* ddl_create_table, absl::flat_hash_set<const SchemaNode*>* dependent_sequences, absl::flat_hash_set<const SchemaNode*>* udf_dependencies); absl::Status AnalyzeCheckConstraint( absl::string_view expression, const Table* table, const ddl::CreateTable* ddl_create_table, absl::flat_hash_set<std::string>* dependent_column_names, CheckConstraint::Builder* builder, absl::flat_hash_set<const SchemaNode*>* udf_dependencies); template <typename ColumnModifier> absl::Status SetColumnOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, const database_api::DatabaseDialect& dialect, ColumnModifier* modifier); template <typename Modifier> absl::Status ProcessLocalityGroupOption(const ddl::SetOption& option, Modifier* modifier); template <typename TableModifier> absl::Status SetTableOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, TableModifier* modifier); template <typename IndexModifier> absl::Status SetIndexOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, IndexModifier* modifier, bool allow_other_options = false); template <typename ColumnDef, typename ColumnDefModifer> absl::StatusOr<std::string> TranslatePGExpression( const ColumnDef& ddl_column, const Table* table, const ddl::CreateTable* ddl_create_table, ColumnDefModifer& modifier); template <typename ColumnDefModifier> absl::Status SetColumnDefinition(const ddl::ColumnDefinition& ddl_column, const Table* table, const ddl::CreateTable* ddl_create_table, const database_api::DatabaseDialect& dialect, bool is_alter, ColumnDefModifier* modifier); absl::Status AlterColumnDefinition( const ddl::ColumnDefinition& ddl_column, const Table* table, const database_api::DatabaseDialect& dialect, Column::Editor* editor); absl::Status AlterColumnSetDropDefault( const ddl::AlterTable::AlterColumn& alter_column, const Table* table, const Column* column, const database_api::DatabaseDialect& dialect, Column::Editor* editor); absl::StatusOr<const Column*> CreateColumn( const ddl::ColumnDefinition& ddl_column, const Table* table, const ddl::CreateTable* ddl_table, const database_api::DatabaseDialect& dialect); absl::StatusOr<const KeyColumn*> CreatePrimaryKeyColumn( const ddl::KeyPartClause& ddl_key_part, const Table* table, KeyColumn::Builder* builder); absl::Status CreatePrimaryKeyConstraint( const ddl::KeyPartClause& ddl_key_part, Table::Builder* builder, bool with_oid = true); absl::Status CreateInterleaveConstraint( const ddl::InterleaveClause& interleave, Table::Builder* builder); absl::Status CreateInterleaveConstraint(const Table* parent, Table::OnDeleteAction on_delete, Table::Builder* builder); absl::Status ValidateChangeStreamForClause( const ddl::ChangeStreamForClause& change_stream_for_clause, absl::string_view change_stream_name); absl::Status ValidateChangeStreamLimits( const ddl::ChangeStreamForClause& change_stream_for_clause, absl::string_view change_stream_name); absl::Status ValidateLimitsForTrackedObjects( const ddl::ChangeStreamForClause& change_stream_for_clause, absl::FunctionRef<absl::Status(const Table*)> table_cb, absl::FunctionRef<absl::Status(const Column*)> column_cb); absl::Status RegisterTrackedObjects( const ddl::ChangeStreamForClause& change_stream_for_clause, const ChangeStream* change_stream); absl::Status UnregisterChangeStreamFromTrackedObjects( const ChangeStream* change_stream); absl::Status BuildChangeStreamTrackedObjects( const ddl::ChangeStreamForClause& change_stream_for_clause, const ChangeStream* change_stream, ChangeStream::Builder* builder); absl::Status EditChangeStreamTrackedObjects( const ddl::ChangeStreamForClause& change_stream_for_clause, const ChangeStream* change_stream); absl::Status UnregisterChangeStreamFromTrackedObjects( const ChangeStream* change_stream, absl::FunctionRef<absl::Status(Table::Editor*)> table_cb, absl::FunctionRef<absl::Status(Column::Editor*)> column_cb); absl::Status RegisterTrackedObjects( const ChangeStream* change_stream, const ddl::ChangeStreamForClause& change_stream_for_clause, absl::FunctionRef<absl::Status(Table::Editor*)> table_cb, absl::FunctionRef<absl::Status(Column::Editor*)> column_cb); template <typename PlacementModifier> absl::Status SetPlacementOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, PlacementModifier* modifier); template <typename ChangeStreamModifier> absl::Status SetChangeStreamOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, ChangeStreamModifier* modifier); template <typename Modifier> absl::Status SetDatabaseOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, const database_api::DatabaseDialect& dialect, Modifier* modifier); absl::Status ValidateChangeStreamOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options); static std::string MakeChangeStreamTvfName( std::string change_stream_name, const database_api::DatabaseDialect& dialect); absl::StatusOr<const Table*> GetInterleaveConstraintTable( const std::string& interleave_in_table_name, const Table::Builder& builder) const; static Table::OnDeleteAction GetInterleaveConstraintOnDelete( const ddl::InterleaveClause& interleave); absl::Status CreateForeignKeyConstraint( const ddl::ForeignKey& ddl_foreign_key, const Table* referencing_table); absl::StatusOr<const ForeignKey*> BuildForeignKeyConstraint( const ddl::ForeignKey& ddl_foreign_key, const Table* referencing_table); absl::Status EvaluateForeignKeyReferencedPrimaryKey( const Table* table, const google::protobuf::RepeatedPtrField<std::string>& column_names, std::vector<int>* column_order, bool* index_required) const; absl::Status EvaluateForeignKeyReferencingPrimaryKey( const Table* table, const google::protobuf::RepeatedPtrField<std::string>& column_names, const std::vector<int>& column_order, bool* index_required) const; absl::StatusOr<const Index*> CreateForeignKeyIndex( const ForeignKey* foreign_key, const Table* table, const std::vector<std::string>& column_names, bool unique); bool CanInterleaveForeignKeyIndex( const Table* table, const std::vector<std::string>& column_names) const; absl::StatusOr<ExpressionTranslateResult> TranslatePostgreSqlExpression( const Table* table, const ddl::CreateTable* ddl_create_table, absl::string_view expression); absl::StatusOr<ExpressionTranslateResult> TranslatePostgreSqlQueryInView( absl::string_view query); absl::Status CreateCheckConstraint( const ddl::CheckConstraint& ddl_check_constraint, const Table* table, const ddl::CreateTable* ddl_create_table); absl::Status CreateRowDeletionPolicy( const ddl::RowDeletionPolicy& row_deletion_policy, Table::Builder* builder); absl::StatusOr<std::shared_ptr<const ProtoBundle>> CreateProtoBundle( const ddl::CreateProtoBundle& ddl_proto_bundle, absl::string_view proto_descriptor_bytes); absl::Status CreateTable(const ddl::CreateTable& ddl_table, const database_api::DatabaseDialect& dialect); absl::StatusOr<const Column*> CreateIndexDataTableColumn( const Table* indexed_table, const std::string& source_column_name, const Table* index_data_table, bool is_null_filtered); absl::Status AddIndexColumnsByName(const std::string& column_name, const Table* indexed_table, bool is_null_filtered, std::vector<const Column*>& columns, Table::Builder& builder); absl::Status AddSearchIndexColumns( const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>& key_parts, const Table* indexed_table, bool is_null_filtered, std::vector<const Column*>& columns, Table::Builder& builder); absl::StatusOr<std::unique_ptr<const Table>> CreateIndexDataTable( absl::string_view index_name, const std::vector<ddl::KeyPartClause>& index_pk, const std::string* interleave_in_table, const ::google::protobuf::RepeatedPtrField<ddl::StoredColumnDefinition>& stored_columns, const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>* partition_by, const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>* order_by, const ::google::protobuf::RepeatedPtrField<std::string>* null_filtered_columns, const Index* index, const Table* indexed_table, ColumnsUsedByIndex* columns_used_by_index); absl::StatusOr<std::unique_ptr<const Table>> CreateChangeStreamDataTable( const ChangeStream* change_stream, const Table* change_stream_partition_table); absl::StatusOr<std::unique_ptr<const Table>> CreateChangeStreamPartitionTable( const ChangeStream* change_stream); absl::StatusOr<const Column*> CreateChangeStreamTableColumn( const std::string& column_name, const Table* change_stream_table, const zetasql::Type* type); absl::StatusOr<const KeyColumn*> CreateChangeStreamTablePKColumn( const std::string& pk_column_name, const Table* change_stream_table); absl::Status CreateChangeStreamTablePKConstraint( const std::string& pk_column_name, Table::Builder* builder); absl::StatusOr<const Index*> CreateIndex( const ddl::CreateIndex& ddl_index, const Table* indexed_table = nullptr); absl::StatusOr<const Index*> CreateVectorIndex( const ddl::CreateVectorIndex& ddl_index, const Table* indexed_table = nullptr); absl::StatusOr<const Index*> CreateSearchIndex( const ddl::CreateSearchIndex& ddl_index, const Table* indexed_table = nullptr); absl::StatusOr<const ChangeStream*> CreateChangeStream( const ddl::CreateChangeStream& ddl_change_stream, const database_api::DatabaseDialect& dialect); absl::StatusOr<const Index*> CreateIndexHelper( const std::string& index_name, const std::string& index_base_name, bool is_unique, bool is_null_filtered, const std::string* interleave_in_table, const std::vector<ddl::KeyPartClause>& table_pk, const ::google::protobuf::RepeatedPtrField<ddl::StoredColumnDefinition>& stored_columns, bool is_search_index, bool is_vector_index, const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>* partition_by, const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>* order_by, const ::google::protobuf::RepeatedPtrField<std::string>* null_filtered_columns, const ::google::protobuf::RepeatedPtrField<ddl::SetOption>* set_options, const Table* indexed_table); absl::flat_hash_set<const SchemaNode*> GatherTransitiveDependenciesForSchemaNode( const absl::flat_hash_set<const SchemaNode*>& initial_set); bool IsBuiltInFunction(const std::string& function_name); bool CanFunctionReplaceTakenName(const ddl::CreateFunction& ddl_function); std::string GetFunctionKindAsString(const ddl::CreateFunction& ddl_function); absl::Status AnalyzeFunctionDefinition( const ddl::CreateFunction& ddl_function, bool replace, absl::flat_hash_set<const SchemaNode*>* dependencies, std::unique_ptr<zetasql::FunctionSignature>* function_signature, Udf::Determinism* determinism_level); absl::Status AnalyzeFunctionDefinition( const ddl::CreateFunction& ddl_function, bool replace, std::vector<View::Column>* output_columns, absl::flat_hash_set<const SchemaNode*>* dependencies); absl::StatusOr<Udf::Builder> CreateFunctionBuilder( const ddl::CreateFunction& ddl_function, std::unique_ptr<zetasql::FunctionSignature> function_signature, Udf::Determinism determinism_level, absl::flat_hash_set<const SchemaNode*> dependencies); absl::StatusOr<View::Builder> CreateFunctionBuilder( const ddl::CreateFunction& ddl_function, std::vector<View::Column> output_columns, absl::flat_hash_set<const SchemaNode*> dependencies); absl::Status CreateFunction(const ddl::CreateFunction& ddl_function); template <typename SequenceModifier> absl::Status SetSequenceOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, SequenceModifier* modifier); absl::Status ValidateSequenceOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, const Sequence* current_sequence); void SetOptionsForSequenceClauses( const ddl::CreateSequence& create_sequence, ::google::protobuf::RepeatedPtrField<ddl::SetOption>* set_options); bool IsDefaultSequenceKindSet(); absl::StatusOr<const Sequence*> CreateSequence( const ddl::CreateSequence& create_sequence, const database_api::DatabaseDialect& dialect, bool is_internal_use = false); absl::Status CreateLocalityGroup( const ddl::CreateLocalityGroup& create_locality_group); absl::Status CreateNamedSchema(const ddl::CreateSchema& create_schema); absl::Status AlterRowDeletionPolicy( std::optional<ddl::RowDeletionPolicy> row_deletion_policy, const Table* table); absl::Status ValidateAlterDatabaseOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options); absl::Status AlterDatabase(const ddl::AlterDatabase& alter_database, const database_api::DatabaseDialect& dialect); absl::Status AlterTable(const ddl::AlterTable& alter_table, const database_api::DatabaseDialect& dialect); absl::Status AlterChangeStream( const ddl::AlterChangeStream& alter_change_stream); absl::Status AlterChangeStreamForClause( const ddl::ChangeStreamForClause& ddl_change_stream_for_clause, const ChangeStream* change_stream); absl::Status AlterIndex(const ddl::AlterIndex& alter_index); absl::Status AlterVectorIndex(const ddl::AlterVectorIndex& alter_index); absl::Status AlterInterleaveAction( const ddl::InterleaveClause::Action& ddl_interleave_action, const Table* table); absl::Status AlterSequence(const ddl::AlterSequence& alter_sequence, const Sequence* current_sequence); absl::Status AlterNamedSchema(const ddl::AlterSchema& alter_schema); absl::Status AlterLocalityGroup( const ddl::AlterLocalityGroup& alter_locality_group); absl::StatusOr<std::shared_ptr<const ProtoBundle>> AlterProtoBundle( const ddl::AlterProtoBundle& ddl_alter_proto_bundle, absl::string_view proto_descriptor_bytes); absl::Status AlterProtoColumnTypes( const ProtoBundle* proto_bundle, const ddl::AlterProtoBundle& ddl_alter_proto_bundle); absl::Status AlterProtoColumnType(const Column* column, const ProtoBundle* proto_bundle, Column::Editor* editor); absl::StatusOr<const zetasql::Type*> GetProtoTypeFromBundle( const zetasql::Type* type, const ProtoBundle* proto_bundle); absl::Status AddCheckConstraint( const ddl::CheckConstraint& ddl_check_constraint, const Table* table); absl::Status AddForeignKey(const ddl::ForeignKey& ddl_foreign_key, const Table* table); absl::Status DropConstraint(const std::string& constraint_name, const Table* table); absl::Status RenameTo(const ddl::AlterTable::RenameTo& rename_to, const Table* table); absl::Status AddSynonym(const std::string& synonym, const Table* table); absl::Status DropSynonym(const ddl::AlterTable::DropSynonym& drop_synonym, const Table* table); absl::Status DropTable(const ddl::DropTable& drop_table); absl::Status DropIndex(const ddl::DropIndex& drop_index); absl::Status DropSearchIndex(const ddl::DropSearchIndex& drop_search_index); absl::Status DropVectorIndex(const ddl::DropVectorIndex& drop_vector_index); absl::Status DropChangeStream( const ddl::DropChangeStream& drop_change_stream); absl::Status DropSequence(const Sequence* drop_sequence); absl::Status DropNamedSchema(const ddl::DropSchema& drop_schema); absl::Status DropLocalityGroup( const ddl::DropLocalityGroup& drop_locality_group); template <typename Modifier> absl::Status AssignNewLocalityGroup(const std::string& locality_group_name, Modifier* modifier); template <typename LocalityGroupModifier> absl::Status SetLocalityGroupOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, LocalityGroupModifier* modifier); absl::Status ApplyImplSetColumnOptions( const ddl::SetColumnOptions& set_column_options, const database_api::DatabaseDialect& dialect); absl::StatusOr<std::shared_ptr<const ProtoBundle>> DropProtoBundle(); absl::Status DropFunction(const ddl::DropFunction& drop_function); absl::StatusOr<Model::ModelColumn> CreateModelColumn( const ddl::ColumnDefinition& ddl_column, const Model* model, const ddl::CreateModel* ddl_model, const database_api::DatabaseDialect& dialect); absl::Status CreateModel(const ddl::CreateModel& ddl_model, const database_api::DatabaseDialect& dialect); absl::Status AlterModel(const ddl::AlterModel& alter_model); absl::Status DropModel(const ddl::DropModel& drop_model); template <typename ModelModifier> absl::Status SetModelOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, ModelModifier* modifier); std::string GetTimeZone() const; absl::StatusOr<std::unique_ptr<const zetasql::AnalyzerOutput>> AnalyzeCreatePropertyGraph( const ddl::CreatePropertyGraph& ddl_create_property_graph, const zetasql::AnalyzerOptions& analyzer_options, Catalog* catalog); absl::Status CreatePropertyGraph( const ddl::CreatePropertyGraph& ddl_create_property_graph, const database_api::DatabaseDialect& dialect); absl::Status DropPropertyGraph( const ddl::DropPropertyGraph& ddl_drop_property_graph); absl::Status PopulatePropertyGraph( const ddl::CreatePropertyGraph& ddl_create_property_graph, const zetasql::ResolvedCreatePropertyGraphStmt* graph_stmt, PropertyGraph::Builder* property_graph_builder); absl::Status AddGraphElementTable( const zetasql::ResolvedGraphElementTable* element, bool is_node, PropertyGraph::Builder* graph_builder); // Adds a new schema object `node` to the schema copy being edited by // `editor_`. absl::Status AddNode(std::unique_ptr<const SchemaNode> node); // Drops the schema object `node` in `latest_schema_` from the schema copy // being edited by `editor_`. absl::Status DropNode(const SchemaNode* node); // Modifies the schema object `node` in `LatestSchema` in the schema // copy being edited by `editor_`. template <typename T> absl::Status AlterNode(const T* node, const SchemaGraphEditor::EditCallback<T>& alter_cb) { ZETASQL_RET_CHECK_NE(node, nullptr); ZETASQL_RETURN_IF_ERROR(editor_->EditNode<T>(node, alter_cb)); return absl::OkStatus(); } absl::Status AlterInNamedSchema( const std::string& object_name, const SchemaGraphEditor::EditCallback<NamedSchema>& alter_cb); // Type factory for the database. Not owned. zetasql::TypeFactory* const type_factory_; // Unique table ID generator for the database. Not owned. TableIDGenerator* const table_id_generator_; // Unique column ID generator for the database. Not owned. ColumnIDGenerator* const column_id_generator_; // Database's storage. For doing data-dependent validations and index // backfills. Storage* storage_; // The timestamp at which the schema changes should be applied/validated // against the database's contents. const absl::Time schema_change_timestamp_; // The latest schema snapshot corresponding to the statements preceding the // statement currently being applied. Note that that does not guarantee that // any verfication/backfill effects of those statements have been applied. // Not owned. const Schema* latest_schema_; // The intermediate schema snapshots representing the schema state after // applying each statement. std::vector<std::unique_ptr<const Schema>> intermediate_schemas_; // Validation context for the statement being currently processed. // This is also being used in SchemaGraphEditor. Please make sure this is only // passed by reference. SchemaValidationContext* statement_context_; // Editor used to modify the schema graph. std::unique_ptr<SchemaGraphEditor> editor_; // Manages global schema names to prevent and generate unique names. GlobalSchemaNames global_names_; // Assigns OIDs to database objects when dialect is POSTGRESQL. The assigner // is owned by the database and is shared across all schema changes. PgOidAssigner* pg_oid_assigner_; // Holds the database id for this schema updater. std::string database_id_; }; absl::Status SchemaUpdaterImpl::Init() { for (const SchemaNode* node : latest_schema_->GetSchemaGraph()->GetSchemaNodes()) { if (auto name = node->GetSchemaNameInfo(); name && name.value().global) { ZETASQL_RETURN_IF_ERROR( global_names_.AddName(name.value().kind, name.value().name)); } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AddNode( std::unique_ptr<const SchemaNode> node) { ZETASQL_RETURN_IF_ERROR(editor_->AddNode(std::move(node))); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::DropNode(const SchemaNode* node) { ZETASQL_RET_CHECK_NE(node, nullptr); ZETASQL_RETURN_IF_ERROR(editor_->DeleteNode(node)); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterInNamedSchema( const std::string& object_name, const SchemaGraphEditor::EditCallback<NamedSchema>& alter_cb) { const absl::string_view schema_name = SDLObjectName::GetSchemaName(object_name); const NamedSchema* named_schema = latest_schema_->FindNamedSchema(std::string(schema_name)); if (named_schema == nullptr) { return error::NamedSchemaNotFound(schema_name); } ZETASQL_RETURN_IF_ERROR(AlterNode<NamedSchema>(named_schema, alter_cb)); return absl::OkStatus(); } absl::Status ValidateDdlStatement(const ddl::DDLStatement& ddl, database_api::DatabaseDialect dialect) { if ((ddl.has_create_function() || ddl.has_drop_function()) && !EmulatorFeatureFlags::instance().flags().enable_views) { return error::ViewsNotSupported(ddl.has_create_function() ? "CREATE" : "DROP"); } if ((ddl.has_create_sequence() || ddl.has_alter_sequence() || ddl.has_drop_sequence()) && dialect == database_api::DatabaseDialect::POSTGRESQL && !EmulatorFeatureFlags::instance() .flags() .enable_bit_reversed_positive_sequences_postgresql) { return error::SequenceNotSupportedInPostgreSQL(); } return absl::OkStatus(); } absl::StatusOr<std::unique_ptr<const Schema>> SchemaUpdaterImpl::ApplyDDLStatement( absl::string_view statement, absl::string_view proto_descriptor_bytes, const database_api::DatabaseDialect& dialect) { if (statement.empty()) { return error::EmptyDDLStatement(); } ZETASQL_RET_CHECK(!editor_->HasModifications()); ZETASQL_ASSIGN_OR_RETURN(std::unique_ptr<ddl::DDLStatement> ddl_statement, ParseDDLByDialect(statement, dialect)); ZETASQL_RETURN_IF_ERROR(ValidateDdlStatement(*ddl_statement, dialect)); // Apply the statement to the schema graph. auto proto_bundle = latest_schema_->proto_bundle(); switch (ddl_statement->statement_case()) { case ddl::DDLStatement::kCreateProtoBundle: { ZETASQL_ASSIGN_OR_RETURN(proto_bundle, CreateProtoBundle(ddl_statement->create_proto_bundle(), proto_descriptor_bytes)); break; } case ddl::DDLStatement::kCreateTable: { ZETASQL_RETURN_IF_ERROR(CreateTable(ddl_statement->create_table(), dialect)); break; } case ddl::DDLStatement::kCreateChangeStream: { ZETASQL_RETURN_IF_ERROR( CreateChangeStream(ddl_statement->create_change_stream(), dialect) .status()); break; } case ddl::DDLStatement::kCreateIndex: { if (global_names_.HasName(ddl_statement->create_index().index_name()) && ddl_statement->create_index().existence_modifier() == ddl::IF_NOT_EXISTS) { break; } ZETASQL_RETURN_IF_ERROR(CreateIndex(ddl_statement->create_index()).status()); break; } case ddl::DDLStatement::kCreateFunction: { if (dialect == database_api::DatabaseDialect::POSTGRESQL) { ddl::CreateFunction* create_function = ddl_statement->mutable_create_function(); ZETASQL_RET_CHECK(create_function->has_sql_body_origin() && create_function->sql_body_origin().has_original_expression()); ZETASQL_ASSIGN_OR_RETURN( ExpressionTranslateResult result, TranslatePostgreSqlQueryInView(absl::StripAsciiWhitespace( create_function->sql_body_origin().original_expression()))); // Overwrite the original_expression to the deparsed (formalized) PG // expression which can be different from the user-input PG expression. create_function->mutable_sql_body_origin()->set_original_expression( result.original_postgresql_expression); create_function->set_sql_body(result.translated_googlesql_expression); } ZETASQL_RETURN_IF_ERROR(CreateFunction(ddl_statement->create_function())); break; } case ddl::DDLStatement::kCreateSequence: { const ddl::CreateSequence& create_sequence = ddl_statement->create_sequence(); if (latest_schema_->FindSequence(create_sequence.sequence_name()) != nullptr && create_sequence.existence_modifier() == ddl::IF_NOT_EXISTS) { // A sequence with the same name already exists, and we have the // IF NOT EXISTS clause in the statement, so return it as a no-op which // would not cause any schema change. return nullptr; } ZETASQL_RETURN_IF_ERROR( CreateSequence(ddl_statement->create_sequence(), dialect).status()); break; } case ddl::DDLStatement::kCreateSchema: { ZETASQL_RETURN_IF_ERROR(CreateNamedSchema(ddl_statement->create_schema())); break; } case ddl::DDLStatement::kCreateVectorIndex: { if (global_names_.HasName( ddl_statement->create_vector_index().index_name()) && ddl_statement->create_vector_index().existence_modifier() == ddl::IF_NOT_EXISTS) { break; } ZETASQL_RETURN_IF_ERROR( CreateVectorIndex(ddl_statement->create_vector_index()).status()); break; } case ddl::DDLStatement::kCreateSearchIndex: { ZETASQL_RETURN_IF_ERROR( CreateSearchIndex(ddl_statement->create_search_index()).status()); break; } case ddl::DDLStatement::kCreateLocalityGroup: { ZETASQL_RETURN_IF_ERROR( CreateLocalityGroup(ddl_statement->create_locality_group())); break; } case ddl::DDLStatement::kAlterDatabase: { ZETASQL_RETURN_IF_ERROR(AlterDatabase(ddl_statement->alter_database(), dialect)); break; } case ddl::DDLStatement::kAlterTable: { ZETASQL_RETURN_IF_ERROR(AlterTable(ddl_statement->alter_table(), dialect)); break; } case ddl::DDLStatement::kAlterChangeStream: { ZETASQL_RETURN_IF_ERROR(AlterChangeStream(ddl_statement->alter_change_stream())); break; } case ddl::DDLStatement::kAlterSequence: { const ddl::AlterSequence& alter_sequence = ddl_statement->alter_sequence(); const Sequence* current_sequence = latest_schema_->FindSequence(alter_sequence.sequence_name(), /*exclude_internal=*/true); if (current_sequence == nullptr) { if (alter_sequence.existence_modifier() == ddl::IF_EXISTS) { // The sequence doesn't exist, but we have the IF EXISTS clause // in the statement, so return without error. break; } return error::SequenceNotFound(alter_sequence.sequence_name()); } ZETASQL_RETURN_IF_ERROR(AlterSequence(alter_sequence, current_sequence)); break; } case ddl::DDLStatement::kAlterIndex: { ZETASQL_RETURN_IF_ERROR(AlterIndex(ddl_statement->alter_index())); break; } case ddl::DDLStatement::kAlterVectorIndex: { ZETASQL_RETURN_IF_ERROR(AlterVectorIndex(ddl_statement->alter_vector_index())); break; } case ddl::DDLStatement::kAlterSchema: { ZETASQL_RETURN_IF_ERROR(AlterNamedSchema(ddl_statement->alter_schema())); break; } case ddl::DDLStatement::kAlterLocalityGroup: { ZETASQL_RETURN_IF_ERROR( AlterLocalityGroup(ddl_statement->alter_locality_group())); break; } case ddl::DDLStatement::kAlterProtoBundle: { ZETASQL_ASSIGN_OR_RETURN(proto_bundle, AlterProtoBundle(ddl_statement->alter_proto_bundle(), proto_descriptor_bytes)); break; } case ddl::DDLStatement::kDropTable: { ZETASQL_RETURN_IF_ERROR(DropTable(ddl_statement->drop_table())); break; } case ddl::DDLStatement::kDropIndex: { ZETASQL_RETURN_IF_ERROR(DropIndex(ddl_statement->drop_index())); break; } case ddl::DDLStatement::kDropSearchIndex: { ZETASQL_RETURN_IF_ERROR(DropSearchIndex(ddl_statement->drop_search_index())); break; } case ddl::DDLStatement::kDropVectorIndex: { ZETASQL_RETURN_IF_ERROR(DropVectorIndex(ddl_statement->drop_vector_index())); break; } case ddl::DDLStatement::kDropChangeStream: { ZETASQL_RETURN_IF_ERROR(DropChangeStream(ddl_statement->drop_change_stream())); break; } case ddl::DDLStatement::kDropFunction: { ZETASQL_RETURN_IF_ERROR(DropFunction(ddl_statement->drop_function())); break; } case ddl::DDLStatement::kDropSequence: { const ddl::DropSequence& drop_sequence = ddl_statement->drop_sequence(); const Sequence* current_sequence = latest_schema_->FindSequence(drop_sequence.sequence_name(), /*exclude_internal=*/true); if (current_sequence == nullptr) { if (drop_sequence.existence_modifier() == ddl::IF_EXISTS) { // The sequence doesn't exist, but we have the IF EXISTS clause // in the statement, so return without error. break; } return error::SequenceNotFound(drop_sequence.sequence_name()); } ZETASQL_RETURN_IF_ERROR(DropSequence(current_sequence)); break; } case ddl::DDLStatement::kDropSchema: { ZETASQL_RETURN_IF_ERROR(DropNamedSchema(ddl_statement->drop_schema())); break; } case ddl::DDLStatement::kDropLocalityGroup: { ZETASQL_RETURN_IF_ERROR(DropLocalityGroup(ddl_statement->drop_locality_group())); break; } case ddl::DDLStatement::kDropProtoBundle: { ZETASQL_ASSIGN_OR_RETURN(proto_bundle, DropProtoBundle()); break; } case ddl::DDLStatement::kAnalyze: // Intentionally no=op. break; case ddl::DDLStatement::kSetColumnOptions: ZETASQL_RETURN_IF_ERROR(ApplyImplSetColumnOptions( ddl_statement->set_column_options(), dialect)); break; case ddl::DDLStatement::kCreateModel: ZETASQL_RETURN_IF_ERROR(CreateModel(ddl_statement->create_model(), dialect)); break; case ddl::DDLStatement::kAlterModel: ZETASQL_RETURN_IF_ERROR(AlterModel(ddl_statement->alter_model())); break; case ddl::DDLStatement::kDropModel: ZETASQL_RETURN_IF_ERROR(DropModel(ddl_statement->drop_model())); break; case ddl::DDLStatement::kCreatePropertyGraph: ZETASQL_RETURN_IF_ERROR( CreatePropertyGraph(ddl_statement->create_property_graph(), dialect)); break; case ddl::DDLStatement::kDropPropertyGraph: ZETASQL_RETURN_IF_ERROR(DropPropertyGraph(ddl_statement->drop_property_graph())); break; case ddl::DDLStatement::kCreatePlacement: { const Placement* placement = latest_schema_->FindPlacement( ddl_statement->create_placement().placement_name()); if (placement != nullptr) { return error::SchemaObjectAlreadyExists( "Placement", ddl_statement->create_placement().placement_name()); } Placement::Builder placement_builder; placement_builder.set_name( ddl_statement->create_placement().placement_name()); const auto& set_options = ddl_statement->create_placement().set_options(); if (!set_options.empty()) { ZETASQL_RETURN_IF_ERROR(AlterNode<Placement>( placement_builder.get(), [this, set_options](Placement::Editor* editor) -> absl::Status { // Set placement options return SetPlacementOptions(set_options, editor); })); } ZETASQL_RETURN_IF_ERROR(AddNode(placement_builder.build())); break; } case ddl::DDLStatement::kAlterPlacement: { const Placement* placement = latest_schema_->FindPlacement( ddl_statement->alter_placement().placement_name()); if (placement == nullptr) { return error::PlacementNotFound( ddl_statement->alter_placement().placement_name()); } const auto& set_options = ddl_statement->alter_placement().set_options(); ddl_statement->create_placement().set_options(); if (!set_options.empty()) { ZETASQL_RETURN_IF_ERROR(AlterNode<Placement>( placement, [this, set_options](Placement::Editor* editor) -> absl::Status { // Set change stream options return SetPlacementOptions(set_options, editor); })); } break; } case ddl::DDLStatement::kDropPlacement: { const Placement* placement = latest_schema_->FindPlacement( ddl_statement->drop_placement().placement_name()); if (placement == nullptr) { return error::PlacementNotFound( ddl_statement->drop_placement().placement_name()); } ZETASQL_RETURN_IF_ERROR(DropNode(placement)); break; } default: ZETASQL_RET_CHECK(false) << "Unsupported ddl statement: " << ddl_statement->statement_case(); } // Since Proto bundle needs to be used for column validation (via // SchemaGraphEditor),this needs to be passed in statement context. // SchemaValidationContext is being used and updated in both editor and // updater and hence needs to be passed by reference. // Use this proto_bundle only during validation (before schema // generation). If there is a need to access proto_bundle after // validation, please use schema->proto_bundle(). statement_context_->set_proto_bundle(proto_bundle); ZETASQL_ASSIGN_OR_RETURN(auto new_schema_graph, editor_->CanonicalizeGraph()); return std::make_unique<const OwningSchema>( std::move(new_schema_graph), proto_bundle, dialect, database_id_); } absl::StatusOr<std::vector<SchemaValidationContext>> SchemaUpdaterImpl::ApplyDDLStatements( const SchemaChangeOperation& schema_change_operation) { std::vector<SchemaValidationContext> pending_work; for (const auto& statement : schema_change_operation.statements) { ZETASQL_VLOG(2) << "Applying statement " << statement; // Set up the SchemaValidationContext before passing it to `editor_`. This // includes setting the old schema snapshot and a callback to construct // a temporary schema snapshot of the pending new schema. The temporary // schema snapshot does not own the new schema nodes but will remain alive // for the lifetime of `editor_` and `statement_context_`. The callback // mechanism is needed because 1) SchemaUpdater doesn't know at which point // SchemaGraphEditor will validate the new schema and 2) SchemaGraphEditor // or SchemaValidationContext cannot take a dependency on Schema. std::unique_ptr<const Schema> new_tmp_schema = nullptr; SchemaValidationContext statement_context{ storage_, &global_names_, type_factory_, schema_change_timestamp_, schema_change_operation.database_dialect}; statement_context_ = &statement_context; statement_context_->SetOldSchemaSnapshot(latest_schema_); statement_context_->SetTempNewSchemaSnapshotConstructor( [this, &new_tmp_schema](const SchemaGraph* unowned_graph) -> const Schema* { new_tmp_schema = std::make_unique<const Schema>( unowned_graph, latest_schema_->proto_bundle(), latest_schema_->dialect(), database_id_); return new_tmp_schema.get(); }); // Initialize the editor that will be used to stage the schema changes. editor_ = std::make_unique<SchemaGraphEditor>( latest_schema_->GetSchemaGraph(), statement_context_); // If there is a semantic validation error, then we return right away. ZETASQL_ASSIGN_OR_RETURN( auto new_schema, ApplyDDLStatement(statement, schema_change_operation.proto_descriptor_bytes, schema_change_operation.database_dialect)); // This indicates that the statement was a no-op, e.g., a CREATE SEQUENCE IF // NOT EXISTS statement for an existent sequence. if (new_schema == nullptr) { continue; } // We save every schema snapshot as verifiers/backfillers from the // current/next statement may need to refer to the previous/current // schema snapshots. statement_context_->SetValidatedNewSchemaSnapshot(new_schema.get()); latest_schema_ = new_schema.get(); intermediate_schemas_.emplace_back(std::move(new_schema)); pg_oid_assigner_->MarkNextPostgresqlOidForIntermediateSchema(); // If everything was OK, make this the new schema snapshot for processing // the next statement and save the pending schema snapshot and backfill // work. pending_work.emplace_back(std::move(statement_context)); } return pending_work; } template <typename Modifier> absl::Status SchemaUpdaterImpl::ProcessLocalityGroupOption( const ddl::SetOption& option, Modifier* modifier) { if (option.has_string_value()) { ZETASQL_RETURN_IF_ERROR(AssignNewLocalityGroup(option.string_value(), modifier)); } else { ZETASQL_RET_CHECK(false) << "Option " << ddl::kLocalityGroupOptionName << " can only take string_value."; } return absl::OkStatus(); } template <typename TableModifier> absl::Status SchemaUpdaterImpl::SetTableOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, TableModifier* modifier) { for (const ddl::SetOption& option : set_options) { if (option.option_name() == ddl::kLocalityGroupOptionName) { ZETASQL_RETURN_IF_ERROR(ProcessLocalityGroupOption(option, modifier)); } else { ZETASQL_RET_CHECK(false) << "Invalid table option: " << option.option_name(); } } return absl::OkStatus(); } template <typename IndexModifier> absl::Status SchemaUpdaterImpl::SetIndexOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, IndexModifier* modifier, bool allow_other_options) { for (const ddl::SetOption& option : set_options) { if (option.option_name() == ddl::kLocalityGroupOptionName) { ZETASQL_RETURN_IF_ERROR(ProcessLocalityGroupOption(option, modifier)); } else { ZETASQL_RET_CHECK(allow_other_options) << "Invalid index option: " << option.option_name(); } } return absl::OkStatus(); } template <typename ColumnModifier> absl::Status SchemaUpdaterImpl::SetColumnOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, const database_api::DatabaseDialect& dialect, ColumnModifier* modifier) { std::optional<bool> allows_commit_timestamp = std::nullopt; std::string commit_timestamp_option_name = ddl::kCommitTimestampOptionName; if (dialect == database_api::DatabaseDialect::POSTGRESQL) { // PG uses spanner.commit_timestamp while in ZetaSQL, the default option // name is allow_commit_timestamp. commit_timestamp_option_name = ddl::kPGCommitTimestampOptionName; } for (const ddl::SetOption& option : set_options) { if (option.option_name() == commit_timestamp_option_name) { if (option.has_bool_value()) { allows_commit_timestamp = option.bool_value(); } else if (option.has_null_value()) { allows_commit_timestamp = std::nullopt; } else { ZETASQL_RET_CHECK(false) << "Option " << commit_timestamp_option_name << " can only take bool_value or null_value."; } modifier->set_allow_commit_timestamp(allows_commit_timestamp); } else if (option.option_name() == ddl::kLocalityGroupOptionName) { ZETASQL_RETURN_IF_ERROR(ProcessLocalityGroupOption(option, modifier)); } else { ZETASQL_RET_CHECK(false) << "Invalid column option: " << option.option_name(); } } return absl::OkStatus(); } template <typename Modifier> absl::Status SchemaUpdaterImpl::SetDatabaseOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, const database_api::DatabaseDialect& dialect, Modifier* modifier) { modifier->set_options(set_options); for (const ddl::SetOption& option : set_options) { if (absl::StripPrefix(option.option_name(), "spanner.internal.cloud_") == ddl::kDefaultSequenceKindOptionName) { if (option.has_string_value()) { std::optional<std::string> default_sequence_kind = option.string_value(); modifier->set_default_sequence_kind(default_sequence_kind); } else if (option.has_null_value()) { modifier->set_default_sequence_kind(std::nullopt); } } if (absl::StripPrefix(option.option_name(), "spanner.internal.cloud_") == ddl::kDefaultTimeZoneOptionName) { if (option.has_string_value()) { std::optional<std::string> default_time_zone = option.string_value(); modifier->set_default_time_zone(default_time_zone); } else if (option.has_null_value()) { modifier->set_default_time_zone(std::nullopt); } } } return absl::OkStatus(); } template <typename ChangeStreamModifier> absl::Status SchemaUpdaterImpl::SetChangeStreamOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, ChangeStreamModifier* modifier) { modifier->set_options(set_options); for (const ddl::SetOption& option : set_options) { if (option.has_null_value()) { continue; } if (option.option_name() == ddl::kChangeStreamRetentionPeriodOptionName) { std::optional<std::string> retention_period = option.string_value(); modifier->set_retention_period(retention_period); modifier->set_parsed_retention_period( ParseSchemaTimeSpec(*retention_period)); } else if (option.option_name() == ddl::kChangeStreamValueCaptureTypeOptionName) { std::optional<std::string> value_capture_type = option.string_value(); modifier->set_value_capture_type(value_capture_type); } else if (ddl::kChangeStreamBooleanOptions->contains( option.option_name())) { modifier->set_boolean_option(option.option_name(), option.bool_value()); } } return absl::OkStatus(); } template <typename PlacementModifier> absl::Status SchemaUpdaterImpl::SetPlacementOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, PlacementModifier* modifier) { modifier->set_options(set_options); for (const ddl::SetOption& option : set_options) { if (option.has_null_value()) { continue; } if (option.option_name() == ddl::kPlacementDefaultLeaderOptionName) { std::optional<std::string> default_leader = option.string_value(); modifier->set_default_leader(default_leader); } else if (option.option_name() == ddl::kPlacementInstancePartitionOptionName) { std::optional<std::string> instance_partition = option.string_value(); modifier->set_instance_partition(instance_partition); } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::ValidateChangeStreamOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options) { // All options reach this step are already checked by ddl parser to ensure // they have valid option names and correct value types. Thus we can directly // validate the values. for (const ddl::SetOption& option : set_options) { // Retention Period Validation if (option.option_name() == ddl::kChangeStreamRetentionPeriodOptionName) { if (option.has_string_value()) { const int64_t retention_seconds = ParseSchemaTimeSpec(option.string_value()); if (retention_seconds == -1) { return error::InvalidTimeDurationFormat(option.string_value()); } if (!absl::GetFlag( FLAGS_cloud_spanner_emulator_disable_cs_retention_check) && (retention_seconds < limits::kChangeStreamsMinRetention || retention_seconds > limits::kChangeStreamsMaxRetention)) { return error::InvalidDataRetentionPeriod(option.string_value()); } } continue; } // Value Capture Type Validation if (option.option_name() == ddl::kChangeStreamValueCaptureTypeOptionName) { if (option.has_string_value()) { const std::string& value_capture_type = option.string_value(); const absl::flat_hash_set<absl::string_view> validCaptureTypes = { kChangeStreamValueCaptureTypeDefault, kChangeStreamValueCaptureTypeNewRow, kChangeStreamValueCaptureTypeNewValues, kChangeStreamValueCaptureTypeNewRowOldValues}; if (validCaptureTypes.contains(value_capture_type)) { continue; } else { return error::InvalidValueCaptureType(value_capture_type); } } } } return absl::OkStatus(); } template <typename Modifier> absl::Status SchemaUpdaterImpl::AssignNewLocalityGroup( const std::string& locality_group_name, Modifier* modifier) { const LocalityGroup* locality_group = latest_schema_->FindLocalityGroup(locality_group_name); if (locality_group == nullptr) { return error::LocalityGroupNotFound(locality_group_name); } const LocalityGroup* old_locality_group = modifier->get()->locality_group(); if (old_locality_group != nullptr) { ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>( old_locality_group, [](LocalityGroup::Editor* editor) -> absl::Status { editor->decrement_use_count(); return absl::OkStatus(); })); } modifier->set_locality_group(locality_group); ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>( locality_group, [](LocalityGroup::Editor* editor) -> absl::Status { editor->increment_use_count(); return absl::OkStatus(); })); return absl::OkStatus(); } template <typename LocalityGroupModifier> absl::Status SchemaUpdaterImpl::SetLocalityGroupOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, LocalityGroupModifier* modifier) { modifier->set_options(set_options); for (const ddl::SetOption& option : set_options) { if (option.has_null_value()) { continue; } if (option.option_name() == ddl::kInternalLocalityGroupStorageOptionName) { if (option.has_bool_value()) { modifier->set_inflash(option.bool_value()); } } else if (option.option_name() == ddl::kInternalLocalityGroupSpillTimeSpanOptionName) { modifier->set_ssd_to_hdd_spill_timespans(option.string_list_value()); } } return absl::OkStatus(); } // Construct the change stream tvf name for googlesql std::string SchemaUpdaterImpl::MakeChangeStreamTvfName( std::string change_stream_name, const database_api::DatabaseDialect& dialect) { if (dialect == database_api::DatabaseDialect::POSTGRESQL) { return absl::StrCat(kChangeStreamTvfJsonPrefix, change_stream_name); } return absl::StrCat(kChangeStreamTvfStructPrefix, change_stream_name); } absl::Status SchemaUpdaterImpl::AlterColumnDefinition( const ddl::ColumnDefinition& ddl_column, const Table* table, const database_api::DatabaseDialect& dialect, Column::Editor* editor) { ZETASQL_RETURN_IF_ERROR(SetColumnDefinition(ddl_column, table, /*ddl_create_table=*/nullptr, dialect, /*is_alter=*/true, editor)); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterColumnSetDropDefault( const ddl::AlterTable::AlterColumn& alter_column, const Table* table, const Column* column, const database_api::DatabaseDialect& dialect, Column::Editor* editor) { const ddl::AlterTable::AlterColumn::AlterColumnOp type = alter_column.operation(); ZETASQL_RET_CHECK(type == ddl::AlterTable::AlterColumn::SET_DEFAULT || type == ddl::AlterTable::AlterColumn::DROP_DEFAULT); const ddl::ColumnDefinition& new_column_def = alter_column.column(); absl::flat_hash_set<const SchemaNode*> dependent_sequences; if (type == ddl::AlterTable::AlterColumn::SET_DEFAULT) { if (column->is_generated()) { return error::CannotSetDefaultValueOnGeneratedColumn(column->FullName()); } if (column->is_identity_column() && (new_column_def.has_column_default() || new_column_def.has_generated_column())) { return error::CannotAlterIdentityColumnToGeneratedOrDefaultColumn( table->Name(), new_column_def.column_name()); } std::string expression = new_column_def.column_default().expression(); if (dialect == database_api::DatabaseDialect::POSTGRESQL) { ZETASQL_ASSIGN_OR_RETURN(expression, TranslatePGExpression( new_column_def.column_default(), table, /*ddl_create_table=*/nullptr, *editor)); } ZETASQL_RET_CHECK(new_column_def.has_column_default() && !expression.empty()); absl::flat_hash_set<const SchemaNode*> udf_dependencies; absl::Status s = AnalyzeColumnDefaultValue( expression, column->Name(), column->GetType(), table, /*ddl_create_table=*/nullptr, &dependent_sequences, &udf_dependencies); if (!s.ok()) { return error::ColumnDefaultValueParseError(table->Name(), column->Name(), s.message()); } editor->set_postgresql_oid(pg_oid_assigner_->GetNextPostgresqlOid()); editor->set_expression(expression); editor->set_has_default_value(true); editor->set_udf_dependencies(udf_dependencies); const Column* existing_column = table->FindColumn(alter_column.column().column_name()); if (existing_column == nullptr) { return error::ColumnNotFound(table->Name(), alter_column.column().column_name()); } absl::flat_hash_set<const SchemaNode*> deps; for (const auto& dep : existing_column->udf_dependencies()) { deps.insert(dep); } // Check for a recursive columns by analyzing the transitive set of // dependencies, i.e., if the view is a dependency of itself. auto transitive_deps = GatherTransitiveDependenciesForSchemaNode(deps); if (std::find_if(transitive_deps.begin(), transitive_deps.end(), [existing_column](const SchemaNode* dep) { return (dep->As<const Column>() != nullptr && dep->As<const Column>()->Name() == existing_column->Name()); }) != transitive_deps.end()) { return error::ViewReplaceRecursive(existing_column->Name()); } } else { if (!column->has_default_value() || column->is_identity_column()) { return absl::OkStatus(); } editor->clear_expression(); editor->clear_original_expression(); editor->set_has_default_value(false); if (!column->is_generated()) { // Only default and generated columns need an OID. If the default is // dropped and the column is not generated then unassign the OID. editor->set_postgresql_oid(std::nullopt); } } // Clear all the old sequence dependencies and set the new ones editor->set_sequences_used(dependent_sequences); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::InitColumnNameAndTypesFromTable( const Table* table, const ddl::CreateTable* ddl_create_table, std::vector<zetasql::SimpleTable::NameAndType>* name_and_types) { if (ddl_create_table != nullptr) { // We are processing a CREATE TABLE statement, so 'const Table* table' may // not have all the columns yet. Add all columns from the ddl. for (const ddl::ColumnDefinition& ddl_column : ddl_create_table->column()) { ZETASQL_ASSIGN_OR_RETURN( const zetasql::Type* type, DDLColumnTypeToGoogleSqlType(ddl_column, type_factory_, latest_schema_->proto_bundle().get())); name_and_types->emplace_back(ddl_column.column_name(), type); } } else { for (const Column* column : table->columns()) { name_and_types->emplace_back(column->Name(), column->GetType()); } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AnalyzeGeneratedColumn( absl::string_view expression, const std::string& column_name, const zetasql::Type* column_type, const Table* table, const ddl::CreateTable* ddl_create_table, absl::flat_hash_set<std::string>* dependent_column_names, absl::flat_hash_set<const SchemaNode*>* udf_dependencies) { std::vector<zetasql::SimpleTable::NameAndType> name_and_types; ZETASQL_RETURN_IF_ERROR(InitColumnNameAndTypesFromTable(table, ddl_create_table, &name_and_types)); // For the case of adding a generated column in ALTER TABLE. if (ddl_create_table == nullptr && table->FindColumn(column_name) == nullptr) { name_and_types.emplace_back(column_name, column_type); } return AnalyzeColumnExpression( expression, column_type, table, latest_schema_, type_factory_, name_and_types, "stored generated columns", dependent_column_names, /*dependent_sequences=*/nullptr, /*allow_volatile_expression=*/false, udf_dependencies); } absl::Status SchemaUpdaterImpl::AnalyzeColumnDefaultValue( absl::string_view expression, const std::string& column_name, const zetasql::Type* column_type, const Table* table, const ddl::CreateTable* ddl_create_table, absl::flat_hash_set<const SchemaNode*>* dependent_sequences, absl::flat_hash_set<const SchemaNode*>* udf_dependencies) { std::vector<zetasql::SimpleTable::NameAndType> name_and_types; ZETASQL_RETURN_IF_ERROR(InitColumnNameAndTypesFromTable(table, ddl_create_table, &name_and_types)); // For the case of adding a default column in ALTER TABLE. if (ddl_create_table == nullptr && table->FindColumn(column_name) == nullptr) { name_and_types.emplace_back(column_name, column_type); } absl::flat_hash_set<std::string> dependent_column_names; ZETASQL_RETURN_IF_ERROR(AnalyzeColumnExpression( expression, column_type, table, latest_schema_, type_factory_, name_and_types, "column default", &dependent_column_names, dependent_sequences, /*allow_volatile_expression=*/true, udf_dependencies)); if (!dependent_column_names.empty()) { return error::DefaultExpressionWithColumnDependency(column_name); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AnalyzeCheckConstraint( absl::string_view expression, const Table* table, const ddl::CreateTable* ddl_create_table, absl::flat_hash_set<std::string>* dependent_column_names, CheckConstraint::Builder* builder, absl::flat_hash_set<const SchemaNode*>* udf_dependencies) { std::vector<zetasql::SimpleTable::NameAndType> name_and_types; ZETASQL_RETURN_IF_ERROR(InitColumnNameAndTypesFromTable(table, ddl_create_table, &name_and_types)); ZETASQL_RETURN_IF_ERROR(AnalyzeColumnExpression( expression, zetasql::types::BoolType(), table, latest_schema_, type_factory_, name_and_types, "check constraints", dependent_column_names, /*dependent_sequences=*/nullptr, /*allow_volatile_expression=*/false, udf_dependencies)); for (const std::string& column_name : *dependent_column_names) { builder->add_dependent_column(table->FindColumn(column_name)); } for (const SchemaNode* udf : *udf_dependencies) { builder->add_dependent_udf(udf); } return absl::OkStatus(); } template <typename ColumnDef, typename ColumnDefModifer> absl::StatusOr<std::string> SchemaUpdaterImpl::TranslatePGExpression( const ColumnDef& ddl_column, const Table* table, const ddl::CreateTable* ddl_create_table, ColumnDefModifer& modifier) { if (ddl_column.has_expression_origin()) { const std::string original_unformalized_expression = std::string(absl::StripAsciiWhitespace( ddl_column.expression_origin().original_expression())); ZETASQL_ASSIGN_OR_RETURN( ExpressionTranslateResult result, TranslatePostgreSqlExpression(table, ddl_create_table, original_unformalized_expression)); // Overwrite the original_expression to the deparsed (formalized) PG // expression which can be different from the user-input PG expression. modifier.set_original_expression(result.original_postgresql_expression); return result.translated_googlesql_expression; } else { return error::Internal(absl::StrCat( "The field 'expression_origin' is empty: ", ddl_column.DebugString())); } } template <typename ColumnDefModifer> absl::Status SchemaUpdaterImpl::SetColumnDefinition( const ddl::ColumnDefinition& ddl_column, const Table* table, const ddl::CreateTable* ddl_create_table, const database_api::DatabaseDialect& dialect, bool is_alter, ColumnDefModifer* modifier) { bool is_generated = false; bool has_default_value = false; bool is_identity_column = false; // Process any changes in column definition. ZETASQL_ASSIGN_OR_RETURN( const zetasql::Type* column_type, DDLColumnTypeToGoogleSqlType(ddl_column, type_factory_, latest_schema_->proto_bundle().get())); modifier->set_type(column_type); if (column_type->IsTokenList() && !ddl_column.hidden()) { return error::NonHiddenTokenlistColumn(table->Name(), ddl_column.column_name()); } // For the case of removing a vector length param in ALTER TABLE ALTER COLUMN. if (ddl_create_table == nullptr) { const Column* column = table->FindColumn(ddl_column.column_name()); if (column != nullptr && column->has_vector_length() && !ddl_column.has_vector_length()) { return error::CannotAlterColumnToRemoveVectorLength( ddl_column.column_name()); } } // Do not allow a column to convert to and stop being an identity column. const Column* old_column = table->FindColumn(ddl_column.column_name()); if (old_column != nullptr && old_column->is_identity_column() != ddl_column.has_identity_column()) { if (ddl_column.has_identity_column()) { return error::CannotAlterToIdentityColumn(table->Name(), ddl_column.column_name()); } else { return error::CannotAlterColumnToDropIdentity(table->Name(), ddl_column.column_name()); } } absl::flat_hash_set<const SchemaNode*> udf_dependencies; absl::flat_hash_set<const SchemaNode*> dependent_sequences; if (ddl_column.has_column_default()) { std::string expression = ddl_column.column_default().expression(); if (dialect == database_api::DatabaseDialect::POSTGRESQL) { ZETASQL_ASSIGN_OR_RETURN(expression, TranslatePGExpression(ddl_column.column_default(), table, ddl_create_table, *modifier)); } has_default_value = true; modifier->set_expression(expression); absl::Status s = AnalyzeColumnDefaultValue( expression, ddl_column.column_name(), column_type, table, ddl_create_table, &dependent_sequences, &udf_dependencies); if (!s.ok()) { return error::ColumnDefaultValueParseError( table->Name(), ddl_column.column_name(), s.message()); } } else if (ddl_column.has_identity_column()) { is_identity_column = true; // The default value is the expression // `GET_NEXT_SEQUENCE_VALUE(sequence_name)` has_default_value = true; std::vector<std::string> parts = absl::StrSplit(modifier->get()->id(), ':'); ZETASQL_RET_CHECK_GE(parts.size(), 2); std::string sequence_name = absl::StrFormat( "_identity_seq_%s", absl::StrReplaceAll(parts[0], {{".", "__"}})); const Sequence* existing_sequence = latest_schema_->FindSequence(sequence_name); if (is_alter) { ZETASQL_RET_CHECK(existing_sequence != nullptr) << "sequence does not exist: " << sequence_name; ddl::AlterSequence alter_sequence; alter_sequence.set_sequence_name(sequence_name); SetSequenceOptionsForIdentityColumn( ddl_column.identity_column(), alter_sequence.mutable_set_options()->mutable_options()); ZETASQL_RETURN_IF_ERROR(AlterSequence(alter_sequence, existing_sequence)); dependent_sequences.insert(existing_sequence); } else { ZETASQL_RET_CHECK(existing_sequence == nullptr) << "sequence already exists: " << sequence_name; // Create the internal sequence. ddl::CreateSequence create_sequence; create_sequence.set_sequence_name(sequence_name); if (ddl_column.identity_column().has_type() && ddl_column.identity_column().type() == ddl::ColumnDefinition::IdentityColumnDefinition:: BIT_REVERSED_POSITIVE) { ddl::SetOption* sequence_kind = create_sequence.add_set_options(); sequence_kind->set_option_name("sequence_kind"); sequence_kind->set_string_value("bit_reversed_positive"); } else if (!IsDefaultSequenceKindSet()) { return error::UnspecifiedIdentityColumnSequenceKind( ddl_column.column_name()); } SetSequenceOptionsForIdentityColumn( ddl_column.identity_column(), create_sequence.mutable_set_options()); ZETASQL_ASSIGN_OR_RETURN( const Sequence* sequence, CreateSequence(create_sequence, dialect, /*is_internal_use=*/true)); std::string expression; if (dialect == database_api::DatabaseDialect::POSTGRESQL) { expression = absl::StrFormat("(GET_NEXT_SEQUENCE_VALUE(\"%s\"))", sequence_name); } else { expression = absl::StrFormat("(GET_NEXT_SEQUENCE_VALUE(SEQUENCE %s))", sequence_name); } modifier->set_expression(expression); dependent_sequences.insert(sequence); } } else if (ddl_column.has_generated_column()) { std::string expression = ddl_column.generated_column().expression(); if (dialect == database_api::DatabaseDialect::POSTGRESQL) { ZETASQL_ASSIGN_OR_RETURN(expression, TranslatePGExpression( ddl_column.generated_column(), table, ddl_create_table, *modifier)); } is_generated = true; modifier->set_expression(expression); absl::flat_hash_set<std::string> dependent_column_names; absl::Status s = AnalyzeGeneratedColumn( expression, ddl_column.column_name(), column_type, table, ddl_create_table, &dependent_column_names, &udf_dependencies); if (!s.ok()) { return error::GeneratedColumnDefinitionParseError( table->Name(), ddl_column.column_name(), s.message()); } // Create a helper map to check if a column is generated. absl::flat_hash_set<std::string> generated_column_set; if (dialect == database_api::DatabaseDialect::POSTGRESQL) { if (ddl_create_table != nullptr) { for (const ddl::ColumnDefinition& column_def : ddl_create_table->column()) { if (column_def.has_generated_column()) { generated_column_set.insert(column_def.column_name()); } } } else { // This is for altering a table definition. for (const Column* column : table->columns()) { if (column->is_generated()) { generated_column_set.insert(column->Name()); } } } } for (const std::string& column_name : dependent_column_names) { if (dialect == database_api::DatabaseDialect::POSTGRESQL && generated_column_set.contains(column_name)) { // Check generated column does not reference to generated for // PostgreSQL schema. return error::DdlInvalidArgumentError( absl::Substitute("A generated column \"$0\" cannot reference " "another generated column \"$1\".", ddl_column.column_name(), column_name)); } modifier->add_dependent_column_name(column_name); } modifier->set_stored(ddl_column.generated_column().stored()); } if (!is_generated && !has_default_value) { // Altering a generated column to a non-generated column is disallowed. In // that case, the expression is cleared here and later validation at // column_validator.cc will block it. modifier->clear_expression(); } else { ZETASQL_RET_CHECK(is_generated != has_default_value); } modifier->set_is_identity_column(is_identity_column); modifier->set_has_default_value(has_default_value); // Set the default values for nullability and length. modifier->set_nullable(!ddl_column.not_null()); modifier->set_declared_max_length(std::nullopt); if (ddl_column.has_length()) { modifier->set_declared_max_length(ddl_column.length()); } else if (ddl_column.type() == ddl::ColumnDefinition::ARRAY && ddl_column.has_array_subtype() && ddl_column.array_subtype().has_length()) { modifier->set_declared_max_length(ddl_column.array_subtype().length()); } modifier->set_sequences_used(dependent_sequences); modifier->set_udf_dependencies(udf_dependencies); if (ddl_column.has_vector_length()) { // For the case of adding `vector_length` param in CREATE TABLE and ALTER // TABLE ADD COLUMN. if (ddl_create_table != nullptr || (ddl_create_table == nullptr && table->FindColumn(ddl_column.column_name()) == nullptr)) { modifier->set_vector_length(ddl_column.vector_length()); } else { // For the case of adding or editing `vector_length` param in ALTER TABLE // ALTER COLUMN. return error::CannotAlterColumnToAddVectorLength( ddl_column.column_name()); } } if (!ddl_column.set_options().empty()) { ZETASQL_RETURN_IF_ERROR( SetColumnOptions(ddl_column.set_options(), dialect, modifier)); } if (is_alter) { const Column* existing_column = table->FindColumn(ddl_column.column_name()); if (existing_column == nullptr) { return error::ColumnNotFound(table->Name(), ddl_column.column_name()); } absl::flat_hash_set<const SchemaNode*> deps; for (const auto& dep : existing_column->udf_dependencies()) { deps.insert(dep); } // Check for a recursive columns by analyzing the transitive set of // dependencies, i.e., if the view is a dependency of itself. auto transitive_deps = GatherTransitiveDependenciesForSchemaNode(deps); if (std::find_if(transitive_deps.begin(), transitive_deps.end(), [existing_column](const SchemaNode* dep) { return (dep->As<const Column>() != nullptr && dep->As<const Column>()->Name() == existing_column->Name()); }) != transitive_deps.end()) { return error::ViewReplaceRecursive(existing_column->Name()); } } return absl::OkStatus(); } absl::StatusOr<const Column*> SchemaUpdaterImpl::CreateColumn( const ddl::ColumnDefinition& ddl_column, const Table* table, const ddl::CreateTable* ddl_table, const database_api::DatabaseDialect& dialect) { const std::string& column_name = ddl_column.column_name(); Column::Builder builder; builder .set_id(column_id_generator_->NextId( absl::StrCat(table->Name(), ".", column_name))) .set_name(column_name); ZETASQL_RETURN_IF_ERROR(SetColumnDefinition(ddl_column, table, ddl_table, dialect, /*is_alter=*/false, &builder)); builder.set_hidden(ddl_column.has_hidden() && ddl_column.hidden()); builder.set_is_placement_key(ddl_column.has_placement_key() && ddl_column.placement_key()); const Column* column = builder.get(); builder.set_table(table); if (column->is_generated() || column->has_default_value()) { statement_context_->AddAction( [column](const SchemaValidationContext* context) { return BackfillGeneratedColumnValue(column, context); }); std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to column " << table->Name() << "." << column->Name(); } } ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return column; } // Check the number of change streams tracking an object and return error if the // number exceed the limit. absl::Status SchemaUpdaterImpl::ValidateChangeStreamLimits( const ddl::ChangeStreamForClause& change_stream_for_clause, absl::string_view change_stream_name) { if (change_stream_for_clause.all()) { int all_count = 1; for (const ChangeStream* change_stream : latest_schema_->change_streams()) { if (change_stream->Name() != change_stream_name && (change_stream->for_clause() != nullptr && change_stream->for_clause()->all())) { ++all_count; // Number of change streams tracking ALL should not exceed the limit. if (all_count > limits::kMaxChangeStreamsTrackingATableOrColumn) { return error::TooManyChangeStreamsTrackingSameObject( change_stream_name, limits::kMaxChangeStreamsTrackingATableOrColumn, "ALL"); } } } } // Checks the number of change streams tracking the same table. auto validate_table = [&](const Table* table) { absl::flat_hash_set<std::string> all_change_streams; all_change_streams.reserve(table->change_streams().size()); for (auto& change_stream : table->change_streams()) { all_change_streams.insert(change_stream->Name()); } int change_stream_count = table->change_streams().size(); if (!all_change_streams.contains(change_stream_name)) { ++change_stream_count; } if (change_stream_count > limits::kMaxChangeStreamsTrackingATableOrColumn) { return error::TooManyChangeStreamsTrackingSameObject( change_stream_name, limits::kMaxChangeStreamsTrackingATableOrColumn, table->Name()); } return absl::OkStatus(); }; // Checks the number of change streams tracking the same column. auto validate_column = [&](const Column* column) { absl::flat_hash_set<std::string> all_change_streams; all_change_streams.reserve(column->change_streams().size()); for (auto& change_stream : column->change_streams()) { all_change_streams.insert(change_stream->Name()); } // No need to validate this limit for primary key columns because change // streams tracking pk columns only don't count towards the number limit (3) // of change streams per column. int change_stream_count = column->change_streams().size(); if (!all_change_streams.contains(change_stream_name)) { ++change_stream_count; } if (change_stream_count > limits::kMaxChangeStreamsTrackingATableOrColumn) { return error::TooManyChangeStreamsTrackingSameObject( change_stream_name, limits::kMaxChangeStreamsTrackingATableOrColumn, column->Name()); } return absl::OkStatus(); }; ZETASQL_RETURN_IF_ERROR(ValidateLimitsForTrackedObjects( change_stream_for_clause, validate_table, validate_column)); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::RegisterTrackedObjects( const ChangeStream* change_stream, const ddl::ChangeStreamForClause& change_stream_for_clause, absl::FunctionRef<absl::Status(Table::Editor*)> table_cb, absl::FunctionRef<absl::Status(Column::Editor*)> column_cb) { if (change_stream_for_clause.all()) { for (const auto& table : latest_schema_->tables()) { if (!table->is_trackable_by_change_stream()) { continue; } ZETASQL_RETURN_IF_ERROR(AlterNode<Table>(table, table_cb)); for (const auto& column : table->columns()) { // Skip all key columns when registering/unregistering change stream // tracking columns to prevent modifications on key columns with child // tables. This is safe because primary key columns are guaranteed to be // populated in data change records no matter user specified or not. if (column->is_trackable_by_change_stream() && !table->FindKeyColumn(column->Name())) { ZETASQL_RETURN_IF_ERROR(AlterNode<Column>(column, column_cb)); } } } return absl::OkStatus(); } for (auto& entry : change_stream_for_clause.tracked_tables().table_entry()) { const Table* table = latest_schema_->FindTable(entry.table_name()); ZETASQL_RET_CHECK(table != nullptr); if (!table->is_trackable_by_change_stream()) { return error::TrackUntrackableTables(entry.table_name()); } ZETASQL_RETURN_IF_ERROR(AlterNode<Table>( table, [change_stream](Table::Editor* editor) -> absl::Status { editor->add_change_stream_explicitly_tracking_table(change_stream); return absl::OkStatus(); })); if (entry.has_all_columns()) { ZETASQL_RETURN_IF_ERROR(AlterNode<Table>(table, table_cb)); for (auto& column : table->columns()) { if (column->is_trackable_by_change_stream() && !table->FindKeyColumn(column->Name())) { ZETASQL_RETURN_IF_ERROR(AlterNode<Column>(column, column_cb)); } } } else if (entry.has_tracked_columns()) { for (const std::string& column_name : entry.tracked_columns().column_name()) { const Column* column = table->FindColumn(column_name); if (!column->is_trackable_by_change_stream()) { return error::TrackUntrackableColumns(column_name); } ZETASQL_RET_CHECK(column != nullptr); ZETASQL_RETURN_IF_ERROR(AlterNode<Column>(column, column_cb)); ZETASQL_RETURN_IF_ERROR(AlterNode<Column>( column, [change_stream](Column::Editor* editor) -> absl::Status { editor->add_change_stream_explicitly_tracking_column( change_stream); return absl::OkStatus(); })); } } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::UnregisterChangeStreamFromTrackedObjects( const ChangeStream* change_stream, absl::FunctionRef<absl::Status(Table::Editor*)> table_cb, absl::FunctionRef<absl::Status(Column::Editor*)> column_cb) { for (auto& pair : change_stream->tracked_tables_columns()) { std::string table_name = pair.first; std::vector<std::string> column_name_list = pair.second; const Table* table = latest_schema_->FindTable(table_name); if (table->FindChangeStream(change_stream->Name())) { ZETASQL_RETURN_IF_ERROR(AlterNode(table, table_cb)); } for (std::string& column_name : column_name_list) { const Column* column = table->FindColumn(column_name); if (!table->FindKeyColumn(column->Name())) { ZETASQL_RETURN_IF_ERROR(AlterNode(column, column_cb)); } } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::RegisterTrackedObjects( const ddl::ChangeStreamForClause& change_stream_for_clause, const ChangeStream* change_stream) { // Register tracked objects. auto register_tracked_table = [change_stream](Table::Editor* table_editor) { table_editor->add_change_stream(change_stream); return absl::OkStatus(); }; auto register_tracked_column = [change_stream](Column::Editor* column_editor) { column_editor->add_change_stream(change_stream); return absl::OkStatus(); }; return RegisterTrackedObjects(change_stream, change_stream_for_clause, register_tracked_table, register_tracked_column); } absl::Status SchemaUpdaterImpl::UnregisterChangeStreamFromTrackedObjects( const ChangeStream* change_stream) { auto unregister_tracked_table = [change_stream](Table::Editor* table_editor) { table_editor->remove_change_stream(change_stream); return absl::OkStatus(); }; auto unregister_tracked_column = [change_stream](Column::Editor* column_editor) { column_editor->remove_change_stream(change_stream); return absl::OkStatus(); }; ZETASQL_RETURN_IF_ERROR(UnregisterChangeStreamFromTrackedObjects( change_stream, unregister_tracked_table, unregister_tracked_column)); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::ValidateLimitsForTrackedObjects( const ddl::ChangeStreamForClause& change_stream_for_clause, absl::FunctionRef<absl::Status(const Table*)> table_cb, absl::FunctionRef<absl::Status(const Column*)> column_cb) { std::vector<std::string> tables_names_; if (change_stream_for_clause.all()) { for (const auto& table : latest_schema_->tables()) { ZETASQL_RETURN_IF_ERROR(table_cb(table)); for (const auto& column : table->columns()) { if (column->is_trackable_by_change_stream() && !table->FindKeyColumn(column->Name())) { ZETASQL_RETURN_IF_ERROR(column_cb(column)); } } } return absl::OkStatus(); } for (auto& entry : change_stream_for_clause.tracked_tables().table_entry()) { const Table* table = latest_schema_->FindTable(entry.table_name()); ZETASQL_RET_CHECK(table != nullptr); if (entry.has_all_columns()) { ZETASQL_RETURN_IF_ERROR(table_cb(table)); for (const auto& column : table->columns()) { if (column->is_trackable_by_change_stream() && !table->FindKeyColumn(column->Name())) { ZETASQL_RETURN_IF_ERROR(column_cb(column)); } } } else if (!entry.tracked_columns().column_name().empty()) { for (const std::string& column_name : entry.tracked_columns().column_name()) { const Column* column = table->FindColumn(column_name); ZETASQL_RET_CHECK(column != nullptr); ZETASQL_RETURN_IF_ERROR(column_cb(column)); } } else { // Only key columns are tracked implicitly, which doesn't count toward // change streams number limit. continue; } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::ValidateChangeStreamForClause( const ddl::ChangeStreamForClause& change_stream_for_clause, absl::string_view change_stream_name) { if (change_stream_for_clause.has_all()) { return absl::OkStatus(); } if (!change_stream_for_clause.has_tracked_tables()) { return error::CreateChangeStreamForClauseInvalidOneof(change_stream_name); } const ddl::ChangeStreamForClause::TrackedTables& tracked_tables = change_stream_for_clause.tracked_tables(); if (tracked_tables.table_entry_size() == 0) { return error::CreateChangeStreamForClauseZeroEntriesInTrackedTables( change_stream_name); } absl::flat_hash_set<absl::string_view> tracked_tables_set; for (const ddl::ChangeStreamForClause::TrackedTables::Entry& entry : tracked_tables.table_entry()) { if (!entry.has_table_name()) { return error:: CreateChangeStreamForClauseTrackedTablesEntryMissingTableName( change_stream_name); } const std::string& table_name = entry.table_name(); // Cannot list the same table more than once. if (!tracked_tables_set.insert(table_name).second) { return error::ChangeStreamDuplicateTable(change_stream_name, table_name); } // Tracking a change stream is not supported. if (latest_schema_->FindChangeStream(table_name) != nullptr) { return error::InvalidTrackedObjectInChangeStream( change_stream_name, "Change Stream", table_name); } // Tracking an index is not supported. if (latest_schema_->FindIndex(table_name) != nullptr) { return error::InvalidTrackedObjectInChangeStream(change_stream_name, "Index", table_name); } // TODO: Return error if the change stream is tracking a // function after function is supported. // TODO: The parser would have returned a parser error if the // user specified a FOR clause with table names starting with SPANNER_SYS.* // or INFORMATION_SCHEMA.*. const Table* table = latest_schema_->FindTable(table_name); if (table == nullptr) { return error::UnsupportedTrackedObjectOrNonExistentTableInChangeStream( change_stream_name, table_name); } // Validate the tracked columns of the change stream if (!entry.has_all_columns()) { if (!entry.has_tracked_columns()) { return error::CreateChangeStreamForClauseTrackedTablesEntryInvalidOneof( change_stream_name); } absl::flat_hash_set<absl::string_view> tracked_columns_set; // Note: entry.tracked_columns() can contain zero columns, indicating only // the primary key columns of the table are tracked. for (const std::string& column_name : entry.tracked_columns().column_name()) { // Cannot list the same column more than once. if (!tracked_columns_set.insert(column_name).second) { return error::ChangeStreamDuplicateColumn(change_stream_name, column_name, table_name); } const Column* column = table->FindColumn(column_name); // Check that the column exists in the table if (column == nullptr) { return error::NonexistentTrackedColumnInChangeStream( change_stream_name, column_name, table_name); } const KeyColumn* key_column = table->FindKeyColumn(column_name); // Primary key column should not be listed in the FOR clause. if (key_column != nullptr) { return error::KeyColumnInChangeStreamForClause( change_stream_name, column_name, table_name); } if (column->is_generated()) { // Tracking a non-key Stored Generated column is not supported return error::InvalidTrackedObjectInChangeStream( change_stream_name, "non-key generated column", absl::StrCat(table_name, ".", column_name)); } } } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::CreateInterleaveConstraint( const ddl::InterleaveClause& interleave, Table::Builder* builder) { ZETASQL_ASSIGN_OR_RETURN(const Table* parent, GetInterleaveConstraintTable( interleave.table_name(), *builder)); Table::OnDeleteAction on_delete = GetInterleaveConstraintOnDelete(interleave); return CreateInterleaveConstraint(parent, on_delete, builder); } absl::Status SchemaUpdaterImpl::CreateInterleaveConstraint( const Table* parent, Table::OnDeleteAction on_delete, Table::Builder* builder) { ZETASQL_RET_CHECK_EQ(builder->get()->parent(), nullptr); if (parent->row_deletion_policy().has_value() && on_delete != Table::OnDeleteAction::kCascade) { return error::RowDeletionPolicyOnAncestors(builder->get()->Name(), parent->Name()); } ZETASQL_RETURN_IF_ERROR(AlterNode<Table>( parent, [builder](Table::Editor* parent_editor) -> absl::Status { parent_editor->add_child_table(builder->get()); builder->set_parent_table(parent_editor->get()); return absl::OkStatus(); })); builder->set_on_delete(on_delete); return absl::OkStatus(); } absl::StatusOr<const Table*> SchemaUpdaterImpl::GetInterleaveConstraintTable( const std::string& interleave_in_table_name, const Table::Builder& builder) const { const auto* parent = latest_schema_->FindTableCaseSensitive(interleave_in_table_name); if (parent == nullptr) { const Table* table = builder.get(); if (table->owner_index() == nullptr) { return error::TableNotFound(interleave_in_table_name); } else { return error::IndexInterleaveTableNotFound(table->owner_index()->Name(), interleave_in_table_name); } } return parent; } Table::OnDeleteAction SchemaUpdaterImpl::GetInterleaveConstraintOnDelete( const ddl::InterleaveClause& interleave) { return interleave.on_delete() == ddl::InterleaveClause::CASCADE ? Table::OnDeleteAction::kCascade : Table::OnDeleteAction::kNoAction; } absl::Status SchemaUpdaterImpl::CreatePrimaryKeyConstraint( const ddl::KeyPartClause& ddl_key_part, Table::Builder* builder, bool with_oid) { KeyColumn::Builder key_col_builder; if (with_oid) { // Assign OID for PRIMARY KEY constraint. std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); key_col_builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " for PRIMARY KEY constraint on table " << builder->get()->Name(); } } ZETASQL_ASSIGN_OR_RETURN( const KeyColumn* key_col, CreatePrimaryKeyColumn(ddl_key_part, builder->get(), &key_col_builder)); builder->add_key_column(key_col); return absl::OkStatus(); } absl::StatusOr<const KeyColumn*> SchemaUpdaterImpl::CreatePrimaryKeyColumn( const ddl::KeyPartClause& ddl_key_part, const Table* table, KeyColumn::Builder* builder) { const std::string& key_column_name = ddl_key_part.key_name(); // References to columns in primary key clause are case-sensitive. const Column* column = table->FindColumnCaseSensitive(key_column_name); if (column == nullptr) { return error::NonExistentKeyColumn( OwningObjectType(table), OwningObjectName(table), key_column_name); } builder->set_column(column); // TODO: Specifying NULLS FIRST/LAST is unsupported in the // emulator. Currently, users cannot specify ASC_NULLS_LAST and // DESC_NULLS_FIRST. if (ddl_key_part.order() == ddl::KeyPartClause::ASC_NULLS_LAST) { builder->set_descending(false).set_nulls_last(true); } else if (ddl_key_part.order() == ddl::KeyPartClause::DESC_NULLS_FIRST) { builder->set_descending(true).set_nulls_last(false); } else { bool is_descending = (ddl_key_part.order() == ddl::KeyPartClause::DESC); builder->set_descending(is_descending); // Ascending direction with NULLs sorted first and descending direction // with NULLs sorted last. builder->set_nulls_last(is_descending); } const KeyColumn* key_col = builder->get(); ZETASQL_RETURN_IF_ERROR(AddNode(builder->build())); return key_col; } absl::Status SchemaUpdaterImpl::CreateForeignKeyConstraint( const ddl::ForeignKey& ddl_foreign_key, const Table* referencing_table) { // Build and add the emulator foreign key before creating any of its backing // indexes. This ensures the foreign key is validated first which in turn // ensures better foreign key error messages instead of obscure index errors. // Not enforced foreign keys doesn't perform referential integrity // checks, so the index on the referencing table is not created. Only the // unique index on the referenced table is created, because the definition of // foreign keys requires them to refer to unique identities. ZETASQL_ASSIGN_OR_RETURN( const ForeignKey* foreign_key, BuildForeignKeyConstraint(ddl_foreign_key, referencing_table)); auto index_column_names = [](const google::protobuf::RepeatedPtrField<std::string>& column_names, const std::vector<int>& column_order) { std::vector<std::string> names; names.reserve(column_names.size()); for (int i = 0; i < column_names.size(); ++i) { names.push_back(column_names[column_order[i]]); } return names; }; bool referenced_index_required = false; std::vector<int> index_column_order; ZETASQL_RETURN_IF_ERROR(EvaluateForeignKeyReferencedPrimaryKey( foreign_key->referenced_table(), ddl_foreign_key.referenced_column_name(), &index_column_order, &referenced_index_required)); if (referenced_index_required) { std::vector<std::string> column_names; ZETASQL_ASSIGN_OR_RETURN( const Index* referenced_index, CreateForeignKeyIndex( foreign_key, foreign_key->referenced_table(), index_column_names(ddl_foreign_key.referenced_column_name(), index_column_order), /*unique=*/true)); ZETASQL_RETURN_IF_ERROR( AlterNode<ForeignKey>(foreign_key, [&](ForeignKey::Editor* editor) { editor->set_referenced_index(referenced_index); return absl::OkStatus(); })); } if (!foreign_key->enforced()) { // Skip referencing index creation and fk data validation for unenforced // foreign keys. return absl::OkStatus(); } bool referencing_index_required = false; ZETASQL_RETURN_IF_ERROR(EvaluateForeignKeyReferencingPrimaryKey( referencing_table, ddl_foreign_key.constrained_column_name(), index_column_order, &referencing_index_required)); if (referencing_index_required) { ZETASQL_ASSIGN_OR_RETURN( const Index* referencing_index, CreateForeignKeyIndex( foreign_key, referencing_table, index_column_names(ddl_foreign_key.constrained_column_name(), index_column_order), /*unique=*/false)); ZETASQL_RETURN_IF_ERROR( AlterNode<ForeignKey>(foreign_key, [&](ForeignKey::Editor* editor) { editor->set_referencing_index(referencing_index); return absl::OkStatus(); })); } // Validate any existing data. Skip new tables which have no data yet. if (latest_schema_->FindTableCaseSensitive(referencing_table->Name()) != nullptr) { statement_context_->AddAction( [foreign_key](const SchemaValidationContext* context) { return VerifyForeignKeyData(foreign_key, context); }); } return absl::OkStatus(); } ForeignKey::Action GetForeignKeyOnDeleteAction(const ddl::ForeignKey& fk) { return fk.on_delete() == ddl::ForeignKey::CASCADE ? ForeignKey::Action::kCascade : ForeignKey::Action::kNoAction; } absl::StatusOr<const ForeignKey*> SchemaUpdaterImpl::BuildForeignKeyConstraint( const ddl::ForeignKey& ddl_foreign_key, const Table* referencing_table) { ForeignKey::Builder builder; std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " for FOREIGN KEY constraint " << ddl_foreign_key.constraint_name() << " on table " << referencing_table->Name(); } ZETASQL_RETURN_IF_ERROR( AlterNode<Table>(referencing_table, [&](Table::Editor* editor) { editor->add_foreign_key(builder.get()); return absl::OkStatus(); })); builder.set_referencing_table(referencing_table); const Table* referenced_table = latest_schema_->FindTableCaseSensitive( ddl_foreign_key.referenced_table_name()); if (referenced_table == nullptr) { if (ddl_foreign_key.referenced_table_name() != referencing_table->Name()) { return error::TableNotFound(ddl_foreign_key.referenced_table_name()); } // Self-referencing foreign key. referenced_table = referencing_table; } ZETASQL_RETURN_IF_ERROR( AlterNode<Table>(referenced_table, [&](Table::Editor* editor) { editor->add_referencing_foreign_key(builder.get()); return absl::OkStatus(); })); builder.set_referenced_table(referenced_table); std::string foreign_key_name; if (ddl_foreign_key.has_constraint_name()) { foreign_key_name = ddl_foreign_key.constraint_name(); ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Foreign Key", foreign_key_name)); builder.set_constraint_name(foreign_key_name); } else { ZETASQL_ASSIGN_OR_RETURN(foreign_key_name, global_names_.GenerateForeignKeyName( referencing_table->Name(), referenced_table->Name())); builder.set_generated_name(foreign_key_name); } auto add_columns = [&](const Table* table, const google::protobuf::RepeatedPtrField<std::string>& column_names, std::function<void(const Column*)> add_column) { for (const std::string& column_name : column_names) { const Column* column = table->FindColumnCaseSensitive(column_name); if (column == nullptr) { return error::ForeignKeyColumnNotFound(column_name, table->Name(), foreign_key_name); } add_column(column); } return absl::OkStatus(); }; ZETASQL_RETURN_IF_ERROR(add_columns(referencing_table, ddl_foreign_key.constrained_column_name(), [&builder](const Column* column) { builder.add_referencing_column(column); })); ZETASQL_RETURN_IF_ERROR(add_columns(referenced_table, ddl_foreign_key.referenced_column_name(), [&builder](const Column* column) { builder.add_referenced_column(column); })); if (ddl_foreign_key.has_on_delete()) { if (ddl_foreign_key.on_delete() != ddl::ForeignKey::ACTION_UNSPECIFIED && ddl_foreign_key.on_delete() != ddl::ForeignKey::NO_ACTION && !EmulatorFeatureFlags::instance() .flags() .enable_fk_delete_cascade_action) { return error::ForeignKeyOnDeleteActionUnsupported( ForeignKey::ActionName(GetForeignKeyOnDeleteAction(ddl_foreign_key))); } builder.set_delete_action(GetForeignKeyOnDeleteAction(ddl_foreign_key)); } if (!ddl_foreign_key.enforced()) { if (!EmulatorFeatureFlags::instance() .flags() .enable_fk_enforcement_option) { return error::ForeignKeyEnforcementUnsupported(); } builder.set_enforced(ddl_foreign_key.enforced()); } const ForeignKey* foreign_key = builder.get(); ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return foreign_key; } absl::Status SchemaUpdaterImpl::EvaluateForeignKeyReferencedPrimaryKey( const Table* table, const google::protobuf::RepeatedPtrField<std::string>& column_names, std::vector<int>* column_order, bool* index_required) const { column_order->reserve(column_names.size()); *index_required = true; // Build a map of foreign key column names to their index positions. CaseInsensitiveStringMap<int> column_positions; for (int i = 0; i < column_names.size(); ++i) { column_positions[column_names[i]] = i; } // Evaluate the primary key columns. auto primary_key = table->primary_key(); for (const KeyColumn* key_column : primary_key) { auto it = column_positions.find(key_column->column()->Name()); if (it == column_positions.end()) { break; // Foreign key doesn't use this key column. } column_order->push_back(it->second); if (column_order->size() == column_names.size()) { *index_required = primary_key.size() != column_names.size(); return absl::OkStatus(); } } // Match the remaining referenced columns in table column order. absl::flat_hash_set<int> assigned(column_order->begin(), column_order->end()); for (const Column* column : table->columns()) { auto it = column_positions.find(column->Name()); if (it != column_positions.end() && assigned.insert(it->second).second) { column_order->push_back(it->second); if (column_order->size() == column_names.size()) { break; // Done ordering foreign key columns. } } } // Use the primary key if not all columns match. The validator will generate // an appropriate error message. *index_required = column_order->size() == column_names.size(); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::EvaluateForeignKeyReferencingPrimaryKey( const Table* table, const google::protobuf::RepeatedPtrField<std::string>& column_names, const std::vector<int>& column_order, bool* index_required) const { *index_required = true; // Skip creating an index if there is an error the validator will flag later. if (column_order.size() != column_names.size()) { *index_required = false; return absl::OkStatus(); } // Foreign key columns must be a prefix of the primary key. auto primary_key = table->primary_key(); if (column_names.size() > primary_key.size()) { return absl::OkStatus(); } // Nullable columns require a null-filtered index. if (absl::c_any_of(column_names, [table](const std::string& column_name) { return table->FindColumn(column_name)->is_nullable(); })) { return absl::OkStatus(); } // Build a map of foreign key column names to their index positions. ZETASQL_RET_CHECK_EQ(column_order.size(), column_names.size()); CaseInsensitiveStringMap<int> column_positions; for (int i = 0; i < column_names.size(); ++i) { column_positions[column_names[i]] = column_order[i]; } // Evaluate the primary key columns to see if we can use the primary key. int matching_key_columns = 0; for (const KeyColumn* key_column : primary_key) { auto it = column_positions.find(key_column->column()->Name()); if (it == column_positions.end() || it->second != matching_key_columns) { break; } ++matching_key_columns; } *index_required = matching_key_columns != column_names.size(); return absl::OkStatus(); } absl::StatusOr<const Index*> SchemaUpdaterImpl::CreateForeignKeyIndex( const ForeignKey* foreign_key, const Table* table, const std::vector<std::string>& column_names, bool unique) { bool null_filtered = absl::c_any_of(column_names, [table](const std::string& column_name) { return table->FindColumn(column_name)->is_nullable(); }); ZETASQL_ASSIGN_OR_RETURN(std::string index_name, global_names_.GenerateManagedIndexName( table->Name(), column_names, null_filtered, unique)); const Index* index = latest_schema_->FindIndex(index_name); if (index == nullptr) { index = statement_context_->FindAddedNode<Index>(index_name); } if (index == nullptr) { ddl::CreateIndex ddl_index; ddl_index.set_index_name(index_name); ddl_index.set_index_base_name(table->Name()); ddl_index.set_unique(unique); ddl_index.set_null_filtered(null_filtered); for (const std::string& column_name : column_names) { auto* key_part = ddl_index.add_key(); key_part->set_key_name(column_name); const auto* key_column = table->FindKeyColumn(column_name); if (key_column != nullptr) { if (key_column->is_descending() && key_column->is_nulls_last()) { key_part->set_order(ddl::KeyPartClause::DESC); } else if (key_column->is_descending() && !key_column->is_nulls_last()) { key_part->set_order(ddl::KeyPartClause::DESC_NULLS_FIRST); } else if (!key_column->is_descending() && key_column->is_nulls_last()) { key_part->set_order(ddl::KeyPartClause::ASC_NULLS_LAST); } } } if (CanInterleaveForeignKeyIndex(table, column_names)) { ddl_index.set_interleave_in_table(table->Name()); } ZETASQL_ASSIGN_OR_RETURN(index, CreateIndex(ddl_index, table)); } ZETASQL_RETURN_IF_ERROR(AlterNode<Index>(index, [&](Index::Editor* index_editor) { index_editor->add_managing_node(foreign_key); return absl::OkStatus(); })); return index; } bool SchemaUpdaterImpl::CanInterleaveForeignKeyIndex( const Table* table, const std::vector<std::string>& column_names) const { // The index can be interleaved into the indexed table if table's primary key // columns match a prefix of the index's columns. auto primary_key = table->primary_key(); if (primary_key.size() > column_names.size()) { return false; // Too many primary key columns to be a prefix. } for (int i = 0; i < primary_key.size(); ++i) { if (!absl::EqualsIgnoreCase(primary_key[i]->column()->Name(), column_names[i])) { return false; // Different columns. } } return true; // Matching prefix. } absl::Status MapSpangresDDLErrorToSpannerError(const absl::Status& status) { if (status.ok()) return status; switch (status.code()) { // Used in DDL translator for unsupported pg sql feature. case absl::StatusCode::kFailedPrecondition: // Used in DDL direct printer for unimplemented pg sql printing. case absl::StatusCode::kUnimplemented: // Used in pg sql parser to indicate invalid syntax. case absl::StatusCode::kInvalidArgument: // Used in pg analyzer to indicate `NotFound` error, for example: table, // column not found. case absl::StatusCode::kNotFound: // Used in pg analyzer to indicate some kind of statement is not supported, // for example: too many arguments. case absl::StatusCode::kOutOfRange: { return error::DdlInvalidArgumentError(status.message()); } default: { return error::DdlUnavailableError(); } } } // Translates Spangres SQL expression stored as a string in // `original_expression` field and returns a translated ZetaSQL expression. // When we first translate a Spangres DDL statement which contains an // expression, like check constraints, generated column, and column default, // we do not translate the expression. We translate it later when we have more // table information. absl::StatusOr<ExpressionTranslateResult> SchemaUpdaterImpl::TranslatePostgreSqlExpression( const Table* table, const ddl::CreateTable* ddl_create_table, absl::string_view expression) { std::vector<zetasql::SimpleTable::NameAndType> name_and_types; ZETASQL_RETURN_IF_ERROR(InitColumnNameAndTypesFromTable(table, ddl_create_table, &name_and_types)); zetasql::SimpleTable simple_table(table->Name(), name_and_types); // Setup a catalog for PostgreSQL analyzer needed to resolve provided AST. zetasql::SimpleCatalog catalog("pg simple catalog"); catalog.AddTable(&simple_table); // Setup zetasql::AnalyzerOptions needed for translation from PostgreSQL to // ZetaSQL. zetasql::AnalyzerOptions analyzer_options = MakeGoogleSqlAnalyzerOptions(GetTimeZone()); analyzer_options.CreateDefaultArenasIfNotSet(); ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<postgres_translator::interfaces::PGArena> arena, postgres_translator::spangres::MemoryContextPGArena::Init(nullptr)); ZETASQL_ASSIGN_OR_RETURN( ExpressionTranslateResult result, postgres_translator::spangres::TranslateTableLevelExpression( expression, simple_table.Name(), catalog, analyzer_options, type_factory_, std::make_unique<FunctionCatalog>( type_factory_, /*catalog_name=*/kCloudSpannerEmulatorFunctionCatalogName, /*schema=*/latest_schema_)), _.With(MapSpangresDDLErrorToSpannerError)); return result; } absl::StatusOr<ExpressionTranslateResult> SchemaUpdaterImpl::TranslatePostgreSqlQueryInView(absl::string_view query) { zetasql::AnalyzerOptions analyzer_options = MakeGoogleSqlAnalyzerOptionsForViewsAndFunctions( GetTimeZone(), admin::database::v1::POSTGRESQL); analyzer_options.CreateDefaultArenasIfNotSet(); FunctionCatalog function_catalog(type_factory_); function_catalog.SetLatestSchema(latest_schema_); Catalog catalog(latest_schema_, &function_catalog, type_factory_, analyzer_options); ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<postgres_translator::interfaces::PGArena> arena, postgres_translator::spangres::MemoryContextPGArena::Init(nullptr)); ZETASQL_ASSIGN_OR_RETURN( ExpressionTranslateResult result, postgres_translator::spangres::TranslateQueryInView( query, catalog, analyzer_options, type_factory_, std::make_unique<FunctionCatalog>( type_factory_, /*catalog_name=*/kCloudSpannerEmulatorFunctionCatalogName, /*schema=*/latest_schema_)), _.With(MapSpangresDDLErrorToSpannerError)); return result; } absl::Status SchemaUpdaterImpl::CreateCheckConstraint( const ddl::CheckConstraint& ddl_check_constraint, const Table* table, const ddl::CreateTable* ddl_create_table) { CheckConstraint::Builder builder; std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " for CHECK CONSTRAINT " << ddl_check_constraint.name() << " on table " << table->Name(); } ZETASQL_RETURN_IF_ERROR(AlterNode<Table>(table, [&](Table::Editor* editor) { editor->add_check_constraint(builder.get()); return absl::OkStatus(); })); builder.set_table(table); std::string check_constraint_name; bool is_generated_name = false; if (ddl_check_constraint.has_name()) { check_constraint_name = ddl_check_constraint.name(); ZETASQL_RETURN_IF_ERROR( global_names_.AddName("Check Constraint", check_constraint_name)); } else { ZETASQL_ASSIGN_OR_RETURN(check_constraint_name, global_names_.GenerateCheckConstraintName(table->Name())); is_generated_name = true; } builder.set_constraint_name(check_constraint_name); builder.has_generated_name(is_generated_name); builder.set_expression(ddl_check_constraint.expression()); if (ddl_check_constraint.has_expression_origin() && !ddl_check_constraint.expression_origin().original_expression().empty()) { builder.set_original_expression(std::string(absl::StripAsciiWhitespace( ddl_check_constraint.expression_origin().original_expression()))); } absl::flat_hash_set<std::string> dependent_column_names; absl::flat_hash_set<const SchemaNode*> udf_dependencies; absl::Status s = AnalyzeCheckConstraint( ddl_check_constraint.expression(), table, ddl_create_table, &dependent_column_names, &builder, &udf_dependencies); if (!s.ok()) { const std::string display_name = is_generated_name ? "<unnamed>" : check_constraint_name; return error::CheckConstraintExpressionParseError( table->Name(), ddl_check_constraint.expression(), display_name, s.message()); } const CheckConstraint* check_constraint = builder.get(); statement_context_->AddAction( [check_constraint](const SchemaValidationContext* context) { return VerifyCheckConstraintData(check_constraint, context); }); ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::CreateRowDeletionPolicy( const ddl::RowDeletionPolicy& row_deletion_policy, Table::Builder* builder) { builder->set_row_deletion_policy(row_deletion_policy); return absl::OkStatus(); } absl::StatusOr<std::shared_ptr<const ProtoBundle>> SchemaUpdaterImpl::CreateProtoBundle( const ddl::CreateProtoBundle& ddl_proto_bundle, absl::string_view proto_descriptor_bytes) { ZETASQL_ASSIGN_OR_RETURN(auto proto_bundle_builder, ProtoBundle::Builder::New(proto_descriptor_bytes)); auto insert_types = ddl_proto_bundle.insert_type(); std::vector<std::string> insert_type_names; insert_type_names.reserve(insert_types.size()); for (int i = 0; i < insert_types.size(); ++i) { insert_type_names.push_back(insert_types.at(i).source_name()); } ZETASQL_RETURN_IF_ERROR(proto_bundle_builder->InsertTypes(insert_type_names)); return proto_bundle_builder->Build(); } absl::Status SchemaUpdaterImpl::CreateTable( const ddl::CreateTable& ddl_table, const database_api::DatabaseDialect& dialect) { if (latest_schema_->tables().size() >= limits::kMaxTablesPerDatabase) { return error::TooManyTablesPerDatabase(ddl_table.table_name(), limits::kMaxTablesPerDatabase); } if (global_names_.HasName(ddl_table.table_name()) && ddl_table.existence_modifier() == ddl::IF_NOT_EXISTS) { return absl::OkStatus(); } ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Table", ddl_table.table_name())); Table::Builder builder; std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to table " << ddl_table.table_name(); } builder.set_id(table_id_generator_->NextId(ddl_table.table_name())) .set_name(ddl_table.table_name()); for (const ddl::ColumnDefinition& ddl_column : ddl_table.column()) { ZETASQL_ASSIGN_OR_RETURN( const Column* column, CreateColumn(ddl_column, builder.get(), &ddl_table, dialect)); builder.add_column(column); } for (const Column* column : builder.get()->columns()) { if (column->is_generated()) { const_cast<Column*>(column)->PopulateDependentColumns(); } } // Some constraints have a dependency on the primary key, so create it first. for (const ddl::KeyPartClause& ddl_key_part : ddl_table.primary_key()) { ZETASQL_RETURN_IF_ERROR(CreatePrimaryKeyConstraint(ddl_key_part, &builder, /*with_oid=*/true)); } // Assign OID for PRIMARY KEY index. oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_primary_key_index_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " for PRIMARY KEY index on table " << ddl_table.table_name(); } for (const ddl::ForeignKey& ddl_foreign_key : ddl_table.foreign_key()) { ZETASQL_RETURN_IF_ERROR(CreateForeignKeyConstraint(ddl_foreign_key, builder.get())); } if (ddl_table.has_interleave_clause()) { ZETASQL_RETURN_IF_ERROR( CreateInterleaveConstraint(ddl_table.interleave_clause(), &builder)); if (ddl_table.interleave_clause().type() == ddl::InterleaveClause::IN_PARENT) { oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_interleave_in_parent_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " for IN_PARENT interleave on table " << ddl_table.table_name(); } } } for (const ddl::CheckConstraint& ddl_check_constraint : ddl_table.check_constraint()) { const Table* table = builder.get(); if (dialect == database_api::DatabaseDialect::POSTGRESQL) { ddl::CheckConstraint mutable_check_constraint = ddl_check_constraint; ZETASQL_RET_CHECK(mutable_check_constraint.has_expression_origin()); ZETASQL_ASSIGN_OR_RETURN(ExpressionTranslateResult result, TranslatePostgreSqlExpression( table, &ddl_table, mutable_check_constraint.expression_origin() .original_expression())); mutable_check_constraint.mutable_expression_origin() ->set_original_expression(result.original_postgresql_expression); mutable_check_constraint.set_expression( result.translated_googlesql_expression); ZETASQL_RETURN_IF_ERROR( CreateCheckConstraint(mutable_check_constraint, table, &ddl_table)); } else { ZETASQL_RETURN_IF_ERROR( CreateCheckConstraint(ddl_check_constraint, table, &ddl_table)); } } if (ddl_table.has_row_deletion_policy()) { ZETASQL_RETURN_IF_ERROR( CreateRowDeletionPolicy(ddl_table.row_deletion_policy(), &builder)); } if (ddl_table.has_synonym()) { ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Table", ddl_table.synonym())); builder.set_synonym(ddl_table.synonym()); } if (builder.get()->is_trackable_by_change_stream()) { // If change streams implicitly tracking the entire database, newly added // tables should be automatically watched by those change streams. for (const ChangeStream* change_stream : latest_schema_->change_streams()) { if (change_stream->track_all()) { std::vector<std::string> columns = builder.get()->trackable_columns(); for (const Column* column : builder.get()->columns()) { if (column->is_trackable_by_change_stream() && !column->table()->FindKeyColumn(column->Name())) { // Register the trackable columns of the newly added table to the // list of change streams implicitly tracking the entire database ZETASQL_RETURN_IF_ERROR(AlterNode<Column>( column, [change_stream](Column::Editor* editor) -> absl::Status { editor->add_change_stream(change_stream); return absl::OkStatus(); })); } } // Register the newly added table to the list of change streams // implicitly tracking the entire database ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>( change_stream, [ddl_table, columns]( ChangeStream::Editor* change_stream_editor) -> absl::Status { change_stream_editor->add_tracked_tables_columns( ddl_table.table_name(), columns); return absl::OkStatus(); })); // Register the change stream tracking the entire database to the newly // added table builder.add_change_stream(change_stream); } } } if (!ddl_table.set_options().empty()) { ZETASQL_RETURN_IF_ERROR(AlterNode<Table>( builder.get(), [this, &ddl_table](Table::Editor* editor) -> absl::Status { return SetTableOptions(ddl_table.set_options(), editor); })); } if (SDLObjectName::IsFullyQualifiedName(ddl_table.table_name())) { const absl::string_view schema_name = SDLObjectName::GetSchemaName(ddl_table.table_name()); const NamedSchema* named_schema = latest_schema_->FindNamedSchema(std::string(schema_name)); if (named_schema == nullptr) { return error::NamedSchemaNotFound(schema_name); } ZETASQL_RETURN_IF_ERROR(AlterNode<NamedSchema>( named_schema, [&](NamedSchema::Editor* editor) -> absl::Status { editor->add_table(builder.get()); if (ddl_table.has_synonym()) { editor->add_synonym(builder.get()); } return absl::OkStatus(); })); } ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return absl::OkStatus(); } absl::StatusOr<const Column*> SchemaUpdaterImpl::CreateIndexDataTableColumn( const Table* indexed_table, const std::string& source_column_name, const Table* index_data_table, bool is_null_filtered) { const Column* source_column = indexed_table->FindColumnCaseSensitive(source_column_name); if (source_column == nullptr) { return error::IndexRefsNonExistentColumn( index_data_table->owner_index()->Name(), source_column_name); } Column::Builder builder; builder.set_name(source_column->Name()) .set_id(column_id_generator_->NextId( absl::StrCat(index_data_table->Name(), ".", source_column->Name()))) .set_source_column(source_column) .set_table(index_data_table); if (is_null_filtered) { builder.set_nullable(false); } else { builder.set_nullable(source_column->is_nullable()); } const Column* column = builder.get(); ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return column; } absl::Status SchemaUpdaterImpl::AddIndexColumnsByName( const std::string& column_name, const Table* indexed_table, bool is_null_filtered, std::vector<const Column*>& columns, Table::Builder& builder) { const Column* column = builder.get()->FindColumn(column_name); // Skip already added columns if (column == nullptr) { ZETASQL_ASSIGN_OR_RETURN( column, CreateIndexDataTableColumn(indexed_table, column_name, builder.get(), is_null_filtered)); builder.add_column(column); } columns.push_back(column); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AddSearchIndexColumns( const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>& key_parts, const Table* indexed_table, bool is_null_filtered, std::vector<const Column*>& columns, Table::Builder& builder) { for (const ddl::KeyPartClause& ddl_key_part : key_parts) { const std::string& column_name = ddl_key_part.key_name(); ZETASQL_RETURN_IF_ERROR(AddIndexColumnsByName(column_name, indexed_table, is_null_filtered, columns, builder)); } return absl::OkStatus(); } absl::StatusOr<std::unique_ptr<const Table>> SchemaUpdaterImpl::CreateChangeStreamPartitionTable( const ChangeStream* change_stream) { std::string partition_table_name = absl::StrCat(kChangeStreamPartitionTablePrefix, change_stream->Name()); Table::Builder builder; builder.set_name(partition_table_name) .set_id(table_id_generator_->NextId(partition_table_name)) .set_owner_change_stream(change_stream); // Create columns in table _ChangeStream_Partition_${ChangeStreamName} ZETASQL_ASSIGN_OR_RETURN(const Column* column, CreateChangeStreamTableColumn( "partition_token", builder.get(), type_factory_->get_string())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("start_time", builder.get(), type_factory_->get_timestamp())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("end_time", builder.get(), type_factory_->get_timestamp())); builder.add_column(column); const zetasql::Type* updated_string_array_type; ZETASQL_RETURN_IF_ERROR(type_factory_->MakeArrayType(type_factory_->get_string(), &updated_string_array_type)); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn("parents", builder.get(), updated_string_array_type)); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn("children", builder.get(), updated_string_array_type)); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn("next_churn", builder.get(), type_factory_->get_string())); builder.add_column(column); // Set the partition_token as primary key column ZETASQL_RETURN_IF_ERROR( CreateChangeStreamTablePKConstraint("partition_token", &builder)); return builder.build(); } absl::StatusOr<std::unique_ptr<const Table>> SchemaUpdaterImpl::CreateChangeStreamDataTable( const ChangeStream* change_stream, const Table* change_stream_partition_table) { std::string data_table_name = absl::StrCat(kChangeStreamDataTablePrefix, change_stream->Name()); Table::Builder builder; builder.set_name(data_table_name) .set_id(table_id_generator_->NextId(data_table_name)) .set_owner_change_stream(change_stream); // Create columns in table _ChangeStream_Data_${ChangeStreamName} ZETASQL_ASSIGN_OR_RETURN(const Column* column, CreateChangeStreamTableColumn( "partition_token", builder.get(), type_factory_->get_string())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("commit_timestamp", builder.get(), type_factory_->get_timestamp())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn( "server_transaction_id", builder.get(), type_factory_->get_string())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("record_sequence", builder.get(), type_factory_->get_string())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn( "is_last_record_in_transaction_in_partition", builder.get(), type_factory_->get_bool())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn("table_name", builder.get(), type_factory_->get_string())); builder.add_column(column); const zetasql::Type* updated_string_array_type; ZETASQL_RETURN_IF_ERROR(type_factory_->MakeArrayType(type_factory_->get_string(), &updated_string_array_type)); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("column_types_name", builder.get(), updated_string_array_type)); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("column_types_type", builder.get(), updated_string_array_type)); builder.add_column(column); const zetasql::Type* updated_bool_array_type; ZETASQL_RETURN_IF_ERROR(type_factory_->MakeArrayType(type_factory_->get_bool(), &updated_bool_array_type)); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn( "column_types_is_primary_key", builder.get(), updated_bool_array_type)); builder.add_column(column); const zetasql::Type* updated_int64_array_type; ZETASQL_RETURN_IF_ERROR(type_factory_->MakeArrayType(type_factory_->get_int64(), &updated_int64_array_type)); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn( "column_types_ordinal_position", builder.get(), updated_int64_array_type)); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn("mods_keys", builder.get(), updated_string_array_type)); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("mods_new_values", builder.get(), updated_string_array_type)); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("mods_old_values", builder.get(), updated_string_array_type)); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn("mod_type", builder.get(), type_factory_->get_string())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("value_capture_type", builder.get(), type_factory_->get_string())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn( "number_of_records_in_transaction", builder.get(), type_factory_->get_int64())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn( "number_of_partitions_in_transaction", builder.get(), type_factory_->get_int64())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN( column, CreateChangeStreamTableColumn("transaction_tag", builder.get(), type_factory_->get_string())); builder.add_column(column); ZETASQL_ASSIGN_OR_RETURN(column, CreateChangeStreamTableColumn( "is_system_transaction", builder.get(), type_factory_->get_bool())); builder.add_column(column); // Set primary key columns ZETASQL_RETURN_IF_ERROR( CreateChangeStreamTablePKConstraint("partition_token", &builder)); ZETASQL_RETURN_IF_ERROR( CreateChangeStreamTablePKConstraint("commit_timestamp", &builder)); ZETASQL_RETURN_IF_ERROR( CreateChangeStreamTablePKConstraint("server_transaction_id", &builder)); ZETASQL_RETURN_IF_ERROR( CreateChangeStreamTablePKConstraint("record_sequence", &builder)); // Set _ChangeStream_Partition_${ChangeStreamName} as interleaved parent table Table::OnDeleteAction on_delete = Table::OnDeleteAction::kNoAction; ZETASQL_RETURN_IF_ERROR(CreateInterleaveConstraint(change_stream_partition_table, on_delete, &builder)); return builder.build(); } absl::StatusOr<const Column*> SchemaUpdaterImpl::CreateChangeStreamTableColumn( const std::string& column_name, const Table* change_stream_table, const zetasql::Type* type) { Column::Builder builder; builder.set_name(column_name) .set_id(column_id_generator_->NextId( absl::StrCat(change_stream_table->Name(), ".", column_name))) .set_table(change_stream_table) .set_type(type); if (column_name == "end_time" || column_name == "start_time" || column_name == "commit_timestamp") { builder.set_allow_commit_timestamp(true); } const Column* column = builder.get(); ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return column; } absl::StatusOr<const KeyColumn*> SchemaUpdaterImpl::CreateChangeStreamTablePKColumn( const std::string& pk_column_name, const Table* change_stream_table) { KeyColumn::Builder builder; const bool is_descending = false; // References to columns in primary key clause are case-sensitive. const Column* column = change_stream_table->FindColumnCaseSensitive(pk_column_name); if (column == nullptr) { return error::NonExistentKeyColumn(OwningObjectType(change_stream_table), OwningObjectName(change_stream_table), pk_column_name); } builder.set_column(column).set_descending(is_descending); const KeyColumn* key_col = builder.get(); ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return key_col; } absl::Status SchemaUpdaterImpl::CreateChangeStreamTablePKConstraint( const std::string& pk_column_name, Table::Builder* builder) { ZETASQL_ASSIGN_OR_RETURN( const KeyColumn* key_col, CreateChangeStreamTablePKColumn(pk_column_name, builder->get())); builder->add_key_column(key_col); return absl::OkStatus(); } absl::StatusOr<std::unique_ptr<const Table>> SchemaUpdaterImpl::CreateIndexDataTable( absl::string_view index_name, const std::vector<ddl::KeyPartClause>& index_pk, const std::string* interleave_in_table, const ::google::protobuf::RepeatedPtrField<ddl::StoredColumnDefinition>& stored_columns, const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>* partition_by, const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>* order_by, const ::google::protobuf::RepeatedPtrField<std::string>* null_filtered_columns, const Index* index, const Table* indexed_table, ColumnsUsedByIndex* columns_used_by_index) { std::string table_name = absl::StrCat(kIndexDataTablePrefix, index_name); Table::Builder builder; builder.set_name(table_name) .set_id(table_id_generator_->NextId(table_name)) .set_owner_index(index); absl::flat_hash_set<std::string> null_filtered_columns_set; // Add null filtered columns to index data table. if (null_filtered_columns != nullptr) { for (const std::string& column_name : *null_filtered_columns) { null_filtered_columns_set.insert(column_name); } } // Add indexed columns to the index_data_table's columns and primary key. if (!index_pk.empty()) { // The primary key is a combination of (index_keys,indexed_table_keys) std::vector<ddl::KeyPartClause> data_table_pk; data_table_pk.reserve(index_pk.size()); // First create columns for the specified primary key. for (const ddl::KeyPartClause& ddl_key_part : index_pk) { data_table_pk.push_back(ddl_key_part); const std::string& column_name = ddl_key_part.key_name(); ZETASQL_ASSIGN_OR_RETURN( const Column* column, CreateIndexDataTableColumn( indexed_table, column_name, builder.get(), index->is_null_filtered() || null_filtered_columns_set.contains(column_name))); builder.add_column(column); } // Next, create columns for the indexed table's primary key. for (const KeyColumn* key_col : indexed_table->primary_key()) { if (builder.get()->FindColumn(key_col->column()->Name()) != nullptr) { // Skip already added columns continue; } std::string key_col_name = key_col->column()->Name(); ZETASQL_ASSIGN_OR_RETURN( const Column* column, CreateIndexDataTableColumn( indexed_table, key_col_name, builder.get(), index->is_null_filtered() || null_filtered_columns_set.contains(key_col_name))); builder.add_column(column); // Add to the PK specification. ddl::KeyPartClause key_part; key_part.set_key_name(key_col_name); if (key_col->is_descending() && key_col->is_nulls_last()) { key_part.set_order(ddl::KeyPartClause::DESC); } else if (key_col->is_descending() && !key_col->is_nulls_last()) { key_part.set_order(ddl::KeyPartClause::DESC_NULLS_FIRST); } else if (!key_col->is_descending() && key_col->is_nulls_last()) { key_part.set_order(ddl::KeyPartClause::ASC_NULLS_LAST); } data_table_pk.push_back(key_part); } for (const ddl::KeyPartClause& ddl_key_part : data_table_pk) { // The data table is a hidden table so don't assign oids to to the PKs. ZETASQL_RETURN_IF_ERROR(CreatePrimaryKeyConstraint(ddl_key_part, &builder, /*with_oid=*/false)); } int num_declared_keys = index_pk.size(); auto data_table_key_cols = builder.get()->primary_key(); for (int i = 0; i < num_declared_keys; ++i) { columns_used_by_index->index_key_columns.push_back( data_table_key_cols[i]); } } if (interleave_in_table != nullptr) { const Table* parent_table = indexed_table; if (!absl::EqualsIgnoreCase(parent_table->Name(), *interleave_in_table)) { ZETASQL_ASSIGN_OR_RETURN(parent_table, GetInterleaveConstraintTable( *interleave_in_table, builder)); } ZETASQL_RETURN_IF_ERROR(CreateInterleaveConstraint( parent_table, Table::OnDeleteAction::kCascade, &builder)); } // Add stored columns to index data table. for (const ddl::StoredColumnDefinition& ddl_column : stored_columns) { const std::string& column_name = ddl_column.name(); ZETASQL_ASSIGN_OR_RETURN(const Column* column, CreateIndexDataTableColumn( indexed_table, column_name, builder.get(), /*is_null_filtered=*/ null_filtered_columns_set.contains(column_name))); builder.add_column(column); columns_used_by_index->stored_columns.push_back(column); } // Add null filtered columns to index data table. if (null_filtered_columns != nullptr) { // Add null filtered columns to index data table. for (const std::string& column_name : *null_filtered_columns) { ZETASQL_RETURN_IF_ERROR(AddIndexColumnsByName( column_name, indexed_table, /*is_null_filtered=*/true, columns_used_by_index->null_filtered_columns, builder)); } } if (partition_by != nullptr) { // Add partition by columns to index data table ZETASQL_RETURN_IF_ERROR(AddSearchIndexColumns( *partition_by, indexed_table, index->is_null_filtered(), columns_used_by_index->partition_by_columns, builder)); } if (order_by != nullptr) { // Add order by columns to index data table. ZETASQL_RETURN_IF_ERROR(AddSearchIndexColumns( *order_by, indexed_table, index->is_null_filtered(), columns_used_by_index->order_by_columns, builder)); } return builder.build(); } absl::StatusOr<const Index*> SchemaUpdaterImpl::CreateIndex( const ddl::CreateIndex& ddl_index, const Table* indexed_table) { const std::string* interleave_in_table = ddl_index.has_interleave_in_table() ? &ddl_index.interleave_in_table() : nullptr; const auto& index_pk = std::vector<ddl::KeyPartClause>( ddl_index.key().begin(), ddl_index.key().end()); bool is_unique = ddl_index.unique(); bool is_null_filtered = ddl_index.null_filtered(); const Index* index = latest_schema_->FindIndexCaseSensitive(ddl_index.index_name()); if (index != nullptr && ddl_index.existence_modifier() == ddl::IF_NOT_EXISTS) { return index; } const ::google::protobuf::RepeatedPtrField<ddl::SetOption>* set_options = nullptr; set_options = &ddl_index.set_options(); return CreateIndexHelper( ddl_index.index_name(), ddl_index.index_base_name(), is_unique, is_null_filtered, interleave_in_table, index_pk, ddl_index.stored_column_definition(), /*is_search_index=*/false, /*is_vector_index=*/false, /*partition_by=*/nullptr, /*order_by=*/nullptr, /*null_filtered_columns=*/&ddl_index.null_filtered_column(), /*set_options=*/set_options, indexed_table); } const google::protobuf::FieldDescriptor* GetFieldDescriptor(absl::string_view sdl_type, absl::string_view sdl_name, absl::string_view option_name, google::protobuf::Message* proto, std::string* error) { error->clear(); const google::protobuf::FieldDescriptor* field = proto->GetDescriptor()->FindFieldByName(option_name); if (field == nullptr) { *error = absl::StrCat("Unable to set ", option_name, " option on ", sdl_type, " ", sdl_name, ". Unknown option name."); return nullptr; } return field; } bool SetInt64Option(absl::string_view sdl_type, absl::string_view sdl_name, absl::string_view option_name, int64_t option_value, google::protobuf::Message* proto, std::string* error) { const google::protobuf::FieldDescriptor* field = GetFieldDescriptor(sdl_type, sdl_name, option_name, proto, error); if (field == nullptr) { return false; } const google::protobuf::Reflection* reflection = proto->GetReflection(); reflection->SetInt64(proto, field, option_value); return true; } bool SetStringOption(absl::string_view sdl_type, absl::string_view sdl_name, absl::string_view option_name, absl::string_view option_value, google::protobuf::Message* proto, std::string* error) { const google::protobuf::FieldDescriptor* field = GetFieldDescriptor(sdl_type, sdl_name, option_name, proto, error); if (field == nullptr) { return false; } const google::protobuf::Reflection* reflection = proto->GetReflection(); reflection->SetString(proto, field, std::string(option_value)); return true; } bool ApplyOptions(absl::string_view sdl_type, absl::string_view sdl_name, const OptionList& input, google::protobuf::Message* proto, std::string* error) { for (const ddl::SetOption& option : input) { if (option.has_int64_value()) { if (!SetInt64Option(sdl_type, sdl_name, option.option_name(), option.int64_value(), proto, error)) { return false; } } else if (option.has_string_value()) { if (!SetStringOption(sdl_type, sdl_name, option.option_name(), option.string_value(), proto, error)) { return false; } } else { *error = absl::StrCat("Unable to set ", option.option_name(), " option on ", sdl_type, " ", sdl_name, ". No value to set."); return false; } } return true; } absl::Status SetVectorIndexOptions( std::string_view sdl_name, const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, Index::Builder* modifier) { std::string error; ddl::VectorIndexOptionsProto vector_index_options; if (!ApplyOptions("Vector Index", sdl_name, set_options, &vector_index_options, &error)) { return error::OptionsError(error); } int tree_depth = vector_index_options.tree_depth(); if (tree_depth != 2 && tree_depth != 3) { return error::OptionsError("vector index tree depth must be 2 or 3."); } if (vector_index_options.has_num_leaves() && vector_index_options.num_leaves() <= 0) { return error::OptionsError("vector index num_leaves must be > 0."); } if (vector_index_options.has_num_branches() && vector_index_options.num_branches() <= 0) { return error::OptionsError("vector index num_branches must be > 0."); } if (vector_index_options.has_num_branches() && vector_index_options.has_num_leaves()) { if (vector_index_options.num_branches() > vector_index_options.num_leaves()) { return error::OptionsError( "num_leaves cannot be fewer than num_branches."); } } if (vector_index_options.has_distance_type()) { ddl::VectorIndexOptionsProto::DistanceType distance_type; if (!ddl::VectorIndexOptionsProto::DistanceType_Parse( vector_index_options.distance_type(), &distance_type) || distance_type == ddl::VectorIndexOptionsProto::DISTANCE_TYPE_UNSPECIFIED) { return error::OptionsError( absl::StrCat("The distance_type of ", sdl_name, " is invalid.")); } } if (vector_index_options.has_leaf_scatter_factor()) { if (vector_index_options.leaf_scatter_factor() < 0) { return error::OptionsError( "vector index leaf_scatter_factor must be >= 0."); } if (vector_index_options.leaf_scatter_factor() > 32) { return error::OptionsError( "vector index leaf_scatter_factor must be <= 32."); } } if (vector_index_options.has_min_branch_splits()) { if (vector_index_options.min_branch_splits() <= 0) { return error::OptionsError("vector index min_branch_splits must be > 0."); } } if (vector_index_options.has_min_leaf_splits()) { if (vector_index_options.min_leaf_splits() <= 0) { return error::OptionsError("vector index min_leaf_splits must be > 0."); } } modifier->set_vector_index_options(vector_index_options); return absl::OkStatus(); } absl::StatusOr<const Index*> SchemaUpdaterImpl::CreateVectorIndex( const ddl::CreateVectorIndex& ddl_index, const Table* indexed_table) { if (ddl_index.partition_by_size() > 0) { return error::VectorIndexPartitionByUnsupported(ddl_index.index_name()); } std::vector<ddl::KeyPartClause> index_pk; index_pk.push_back(ddl_index.key()); const Index* index = latest_schema_->FindIndexCaseSensitive(ddl_index.index_name()); if (index != nullptr && ddl_index.existence_modifier() == ddl::IF_NOT_EXISTS) { return index; } return CreateIndexHelper( ddl_index.index_name(), ddl_index.index_base_name(), false, false, nullptr, index_pk, ddl_index.stored_column_definition(), /*is_search_index=*/false, /*is_vector_index=*/true, /*partition_by=*/nullptr, /*order_by=*/nullptr, /*null_filtered_columns=*/&ddl_index.null_filtered_column(), /*set_options=*/&ddl_index.set_options(), indexed_table); } absl::StatusOr<const Index*> SchemaUpdaterImpl::CreateSearchIndex( const ddl::CreateSearchIndex& ddl_index, const Table* indexed_table) { const std::string* interleave_in_table = ddl_index.has_interleave_in_table() ? &ddl_index.interleave_in_table() : nullptr; std::vector<ddl::KeyPartClause> table_pk; table_pk.reserve(ddl_index.token_column_definition().size()); for (const ddl::TokenColumnDefinition& token_column_def : ddl_index.token_column_definition()) { const ddl::KeyPartClause& key = token_column_def.token_column(); if (key.has_order()) { return error::SearchIndexTokenlistKeyOrderUnsupported( key.key_name(), ddl_index.index_name()); } table_pk.push_back(token_column_def.token_column()); } bool is_null_filtered = ddl_index.null_filtered(); return CreateIndexHelper( ddl_index.index_name(), ddl_index.index_base_name(), /*is_unique=*/false, is_null_filtered, interleave_in_table, table_pk, ddl_index.stored_column_definition(), /*is_search_index=*/true, false, &ddl_index.partition_by(), &ddl_index.order_by(), &ddl_index.null_filtered_column(), /*set_options=*/nullptr, indexed_table); } absl::StatusOr<const ChangeStream*> SchemaUpdaterImpl::CreateChangeStream( const ddl::CreateChangeStream& ddl_change_stream, const database_api::DatabaseDialect& dialect) { if (latest_schema_->change_streams().size() >= limits::kMaxChangeStreamsPerDatabase) { return error::TooManyChangeStreamsPerDatabase( ddl_change_stream.change_stream_name(), limits::kMaxChangeStreamsPerDatabase); } // Validate the change stream name in global_names_ ZETASQL_RETURN_IF_ERROR(global_names_.AddName( "Change Stream", ddl_change_stream.change_stream_name())); ChangeStream::Builder builder; std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " on change stream " << ddl_change_stream.change_stream_name(); } builder.set_name(ddl_change_stream.change_stream_name()); const ChangeStream* change_stream = builder.get(); const std::string tvf_name = MakeChangeStreamTvfName(ddl_change_stream.change_stream_name(), dialect); builder.set_tvf_name(tvf_name); std::optional<uint32_t> tvf_oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_tvf_postgresql_oid(tvf_oid); if (tvf_oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << tvf_oid.value() << " to TVF " << tvf_name << " on change stream " << ddl_change_stream.change_stream_name(); } // Validate the change stream tvf name in global_names_ ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Change Stream", tvf_name)); // Set up _ChangeStream_Partition_${ChangeStreamName} ZETASQL_ASSIGN_OR_RETURN(std::unique_ptr<const Table> change_stream_partition_table, CreateChangeStreamPartitionTable(change_stream)); builder.set_change_stream_partition_table( change_stream_partition_table.get()); // Register a backfill action for the change stream partition table. statement_context_->AddAction( [change_stream](const SchemaValidationContext* context) { return BackfillChangeStream(change_stream, context); }); // Set up _ChangeStream_Data_${ChangeStreamName} ZETASQL_ASSIGN_OR_RETURN(std::unique_ptr<const Table> change_stream_data_table, CreateChangeStreamDataTable( change_stream, change_stream_partition_table.get())); builder.set_change_stream_data_table(change_stream_data_table.get()); // Set up for clause if (ddl_change_stream.has_for_clause()) { ZETASQL_RETURN_IF_ERROR( ValidateChangeStreamForClause(ddl_change_stream.for_clause(), ddl_change_stream.change_stream_name())); builder.set_for_clause(ddl_change_stream.for_clause()); builder.set_track_all(ddl_change_stream.for_clause().all()); ZETASQL_RETURN_IF_ERROR( ValidateChangeStreamLimits(ddl_change_stream.for_clause(), ddl_change_stream.change_stream_name())); } // Validate and set change stream options. const auto& set_options = ddl_change_stream.set_options(); if (!set_options.empty()) { ZETASQL_RETURN_IF_ERROR(ValidateChangeStreamOptions(set_options)); ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>( change_stream, [this, set_options](ChangeStream::Editor* editor) -> absl::Status { // Set change stream options return SetChangeStreamOptions(set_options, editor); })); } // Add the current change stream to the change stream lists of tracked table // and column. if (ddl_change_stream.has_for_clause()) { ZETASQL_RETURN_IF_ERROR( RegisterTrackedObjects(ddl_change_stream.for_clause(), change_stream)); ZETASQL_RETURN_IF_ERROR(BuildChangeStreamTrackedObjects( ddl_change_stream.for_clause(), change_stream, &builder)); } // Set the creation time to now builder.set_creation_time(absl::Now()); ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); ZETASQL_RETURN_IF_ERROR(AddNode(std::move(change_stream_partition_table))); ZETASQL_RETURN_IF_ERROR(AddNode(std::move(change_stream_data_table))); return change_stream; } absl::Status SchemaUpdaterImpl::BuildChangeStreamTrackedObjects( const ddl::ChangeStreamForClause& change_stream_for_clause, const ChangeStream* change_stream, ChangeStream::Builder* builder) { if (change_stream_for_clause.has_tracked_tables()) { for (const ddl::ChangeStreamForClause::TrackedTables::Entry& entry : change_stream_for_clause.tracked_tables().table_entry()) { if (!latest_schema_->FindTable(entry.table_name()) ->is_trackable_by_change_stream()) { return error::TrackUntrackableTables(entry.table_name()); } std::vector<std::string> columns; if (entry.has_tracked_columns()) { for (const std::string& column_name : entry.tracked_columns().column_name()) { columns.push_back(column_name); } builder->add_tracked_tables_columns(entry.table_name(), columns); } else if (entry.has_all_columns()) { const Table* table = latest_schema_->FindTable(entry.table_name()); for (const Column* column : latest_schema_->FindTable(entry.table_name())->columns()) { if (!column->is_trackable_by_change_stream() && !table->FindKeyColumn(column->Name())) { continue; } columns.push_back(column->Name()); } builder->add_tracked_tables_columns(entry.table_name(), columns); } else { return error::UnsetTrackedObject(change_stream->Name(), entry.table_name()); } } } else if (change_stream_for_clause.all()) { for (const Table* table : latest_schema_->tables()) { if (!table->is_trackable_by_change_stream()) { continue; } builder->add_tracked_tables_columns(table->Name(), table->trackable_columns()); } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::EditChangeStreamTrackedObjects( const ddl::ChangeStreamForClause& change_stream_for_clause, const ChangeStream* change_stream) { if (change_stream_for_clause.all()) { for (const auto& table : latest_schema_->tables()) { if (!table->is_trackable_by_change_stream()) { continue; } ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>( change_stream, [table](ChangeStream::Editor* editor) -> absl::Status { editor->add_tracked_tables_columns(table->Name(), table->trackable_columns()); return absl::OkStatus(); })); } return absl::OkStatus(); } if (!change_stream_for_clause.has_tracked_tables()) { return absl::OkStatus(); } for (const ddl::ChangeStreamForClause::TrackedTables::Entry& entry : change_stream_for_clause.tracked_tables().table_entry()) { std::vector<std::string> columns; if (!entry.has_all_columns()) { for (const std::string& column_name : entry.tracked_columns().column_name()) { if (latest_schema_->FindTable(entry.table_name()) ->is_trackable_by_change_stream()) { columns.push_back(column_name); } else { return error::TrackUntrackableColumns(column_name); } } } else { for (const std::string& column_name : latest_schema_->FindTable(entry.table_name())->trackable_columns()) { columns.push_back(column_name); } } ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>( change_stream, [&, entry, columns](ChangeStream::Editor* editor) -> absl::Status { editor->clear_tracked_tables_columns(); editor->add_tracked_tables_columns(entry.table_name(), columns); return absl::OkStatus(); })); } return absl::OkStatus(); } absl::StatusOr<const Index*> SchemaUpdaterImpl::CreateIndexHelper( const std::string& index_name, const std::string& index_base_name, bool is_unique, bool is_null_filtered, const std::string* interleave_in_table, const std::vector<ddl::KeyPartClause>& table_pk, const ::google::protobuf::RepeatedPtrField<ddl::StoredColumnDefinition>& stored_columns, bool is_search_index, bool is_vector_index, const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>* partition_by, const ::google::protobuf::RepeatedPtrField<ddl::KeyPartClause>* order_by, const ::google::protobuf::RepeatedPtrField<std::string>* null_filtered_columns, const ::google::protobuf::RepeatedPtrField<ddl::SetOption>* set_options, const Table* indexed_table) { if (indexed_table == nullptr) { indexed_table = latest_schema_->FindTableCaseSensitive(index_base_name); if (indexed_table == nullptr) { return error::TableNotFound(index_base_name); } } if (latest_schema_->num_index() >= limits::kMaxIndexesPerDatabase) { return error::TooManyIndicesPerDatabase(index_name, limits::kMaxIndexesPerDatabase); } // Tables and indexes share a namespace. ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Index", index_name)); Index::Builder builder; std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to index " << index_name; } builder.set_name(index_name); builder.set_unique(is_unique); builder.set_null_filtered(is_null_filtered); ColumnsUsedByIndex columns_used_by_index; ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<const Table> data_table, CreateIndexDataTable(index_name, table_pk, interleave_in_table, stored_columns, partition_by, order_by, null_filtered_columns, builder.get(), indexed_table, &columns_used_by_index)); builder.set_index_data_table(data_table.get()); for (const KeyColumn* key_col : columns_used_by_index.index_key_columns) { builder.add_key_column(key_col); } for (const Column* col : columns_used_by_index.stored_columns) { builder.add_stored_column(col); } for (const Column* col : columns_used_by_index.null_filtered_columns) { builder.add_null_filtered_column(col); } if (is_search_index) { builder.set_index_type(is_search_index); for (const Column* col : columns_used_by_index.partition_by_columns) { builder.add_partition_by_column(col); } for (const Column* col : columns_used_by_index.order_by_columns) { builder.add_order_by_column(col); } } if (is_vector_index) { builder.set_vector_index_type(is_vector_index); ZETASQL_RETURN_IF_ERROR(SetVectorIndexOptions(index_name, *set_options, &builder)); } ZETASQL_RETURN_IF_ERROR(AlterNode<Table>( indexed_table, [&builder](Table::Editor* table_editor) -> absl::Status { table_editor->add_index(builder.get()); builder.set_indexed_table(table_editor->get()); return absl::OkStatus(); })); // Register a backfill action for the index. const Index* index = builder.get(); if (!is_search_index && !is_vector_index) { statement_context_->AddAction( [index](const SchemaValidationContext* context) { return BackfillIndex(index, context); }); } if (SDLObjectName::IsFullyQualifiedName(index_name)) { ZETASQL_RETURN_IF_ERROR(AlterInNamedSchema( index_name, [&builder](NamedSchema::Editor* editor) -> absl::Status { editor->add_index(builder.get()); return absl::OkStatus(); })); } if (set_options != nullptr && !set_options->empty()) { ZETASQL_RETURN_IF_ERROR(SetIndexOptions(*set_options, &builder, is_vector_index)); } // The data table must be added after the index for correct order of // validation. ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); ZETASQL_RETURN_IF_ERROR(AddNode(std::move(data_table))); return index; } absl::flat_hash_set<const SchemaNode*> SchemaUpdaterImpl::GatherTransitiveDependenciesForSchemaNode( const absl::flat_hash_set<const SchemaNode*>& initial_set) { absl::flat_hash_set<const SchemaNode*> transitive; std::stack<const SchemaNode*> explore; for (const auto& dep : initial_set) { transitive.insert(dep); explore.push(dep); } while (!explore.empty()) { auto dep = explore.top(); explore.pop(); // Only explore dependencies that are views, columns, or udfs. (Leave out // sequences). if (auto view = dynamic_cast<const View*>(dep); view != nullptr) { for (const auto& dependency : view->dependencies()) { if (transitive.insert(dependency).second) explore.push(dependency); } } if (auto udf = dynamic_cast<const Udf*>(dep); udf != nullptr) { for (const auto& dependency : udf->dependencies()) { if (transitive.insert(dependency).second) explore.push(dependency); } } if (auto column = dynamic_cast<const Column*>(dep); column != nullptr) { for (const auto& dependency : column->udf_dependencies()) { if (transitive.insert(dependency).second) explore.push(dependency); } } } return transitive; } bool SchemaUpdaterImpl::IsBuiltInFunction(const std::string& function_name) { // Do not include the schema to avoid populating the schema's Udfs. FunctionCatalog function_catalog(type_factory_); const zetasql::Function* function = nullptr; function_catalog.GetFunction({function_name}, &function); return function != nullptr; } bool SchemaUpdaterImpl::CanFunctionReplaceTakenName( const ddl::CreateFunction& ddl_function) { const View* existing_view = latest_schema_->FindView(ddl_function.function_name()); const Udf* existing_udf = latest_schema_->FindUdf(ddl_function.function_name()); if (existing_view != nullptr && ddl_function.function_kind() == ddl::Function::VIEW && ddl_function.is_or_replace()) { return true; } if (existing_udf != nullptr && ddl_function.function_kind() == ddl::Function::FUNCTION && ddl_function.is_or_replace()) { return true; } return false; } std::string SchemaUpdaterImpl::GetFunctionKindAsString( const ddl::CreateFunction& ddl_function) { if (ddl_function.function_kind() == ddl::Function::FUNCTION) { return "Function"; } if (ddl_function.function_kind() == ddl::Function::VIEW) { return "View"; } return "Unknown"; } absl::Status SchemaUpdaterImpl::AnalyzeFunctionDefinition( const ddl::CreateFunction& ddl_function, bool replace, absl::flat_hash_set<const SchemaNode*>* dependencies, std::unique_ptr<zetasql::FunctionSignature>* function_signature, Udf::Determinism* determinism_level) { std::string param_list = ""; for (int i = 0; i < ddl_function.param_size(); i++) { param_list += ddl_function.param(i).name() + " " + ddl_function.param(i).param_typename(); if (ddl_function.param(i).has_default_value()) { param_list += " DEFAULT " + ddl_function.param(i).default_value(); } if (i < ddl_function.param_size() - 1) { param_list += ", "; } } auto status = AnalyzeUdfDefinition(ddl_function.function_name(), param_list, ddl_function.sql_body(), latest_schema_, type_factory_, dependencies, function_signature, determinism_level); if (!status.ok()) { return replace ? error::FunctionReplaceError(ddl_function.function_name(), status.message()) : error::FunctionBodyAnalysisError( ddl_function.function_name(), status.message()); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AnalyzeFunctionDefinition( const ddl::CreateFunction& ddl_function, bool replace, std::vector<View::Column>* output_columns, absl::flat_hash_set<const SchemaNode*>* dependencies) { auto status = AnalyzeViewDefinition( ddl_function.function_name(), ddl_function.sql_body(), latest_schema_, type_factory_, output_columns, dependencies); if (!status.ok()) { return replace ? error::ViewReplaceError(ddl_function.function_name(), status.message()) : error::ViewBodyAnalysisError(ddl_function.function_name(), status.message()); } return absl::OkStatus(); } absl::StatusOr<Udf::Builder> SchemaUpdaterImpl::CreateFunctionBuilder( const ddl::CreateFunction& ddl_function, std::unique_ptr<zetasql::FunctionSignature> function_signature, Udf::Determinism determinism_level, absl::flat_hash_set<const SchemaNode*> dependencies) { Udf::Builder builder; std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to function " << ddl_function.function_name(); } builder.set_name(ddl_function.function_name()) .set_sql_body(ddl_function.sql_body()) .set_sql_security([&]() { switch (ddl_function.sql_security()) { case ddl::Function::UNSPECIFIED_SQL_SECURITY: return Udf::SqlSecurity::SQL_SECURITY_UNSPECIFIED; case ddl::Function::INVOKER: return Udf::SqlSecurity::INVOKER; } }()); if (ddl_function.has_sql_body_origin() && !ddl_function.sql_body_origin().original_expression().empty()) { builder.set_body_origin(absl::StripAsciiWhitespace( ddl_function.sql_body_origin().original_expression())); } if (ddl_function.has_return_typename() && ddl_function.return_typename() != function_signature->result_type().type()->TypeName( zetasql::PRODUCT_EXTERNAL)) { return error::FunctionTypeMismatch( ddl_function.function_name(), ddl_function.return_typename(), function_signature->result_type().type()->TypeName( zetasql::PRODUCT_EXTERNAL)); } builder.set_signature(std::move(function_signature)); builder.set_determinism_level(determinism_level); for (auto dependency : dependencies) { builder.add_dependency(dependency); } return builder; } absl::StatusOr<View::Builder> SchemaUpdaterImpl::CreateFunctionBuilder( const ddl::CreateFunction& ddl_function, std::vector<View::Column> output_columns, absl::flat_hash_set<const SchemaNode*> dependencies) { View::Builder builder; std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to view " << ddl_function.function_name(); } builder.set_name(ddl_function.function_name()) .set_sql_security([&]() { switch (ddl_function.sql_security()) { case ddl::Function::UNSPECIFIED_SQL_SECURITY: return View::SqlSecurity::UNSPECIFIED; case ddl::Function::INVOKER: return View::SqlSecurity::INVOKER; } }()) .set_sql_body(ddl_function.sql_body()); if (ddl_function.has_sql_body_origin() && !ddl_function.sql_body_origin().original_expression().empty()) { builder.set_sql_body_origin(absl::StripAsciiWhitespace( ddl_function.sql_body_origin().original_expression())); } for (auto output_column : output_columns) { builder.add_column(std::move(output_column)); } for (const auto& dependency : dependencies) { builder.add_dependency(dependency); } return builder; } absl::Status SchemaUpdaterImpl::CreateFunction( const ddl::CreateFunction& ddl_function) { if (ddl_function.function_kind() == ddl::Function::VIEW && latest_schema_->views().size() >= limits::kMaxViewsPerDatabase) { return error::TooManyViewsPerDatabase(ddl_function.function_name(), limits::kMaxViewsPerDatabase); } if (!ddl_function.has_sql_security() || ddl_function.sql_security() != ddl::Function::INVOKER) { return ddl_function.function_kind() == ddl::Function::FUNCTION ? error::FunctionRequiresInvokerSecurity( ddl_function.function_name()) : error::ViewRequiresInvokerSecurity( ddl_function.function_name()); } bool replace = false; if (global_names_.HasName(ddl_function.function_name())) { replace = CanFunctionReplaceTakenName(ddl_function); if (!replace) { return error::SchemaObjectAlreadyExists( GetFunctionKindAsString(ddl_function), ddl_function.function_name()); } } if (!replace) { ZETASQL_RETURN_IF_ERROR(global_names_.AddName(GetFunctionKindAsString(ddl_function), ddl_function.function_name())); } if (IsBuiltInFunction(ddl_function.function_name())) { return error::ReplacingBuiltInFunction( ddl_function.is_or_replace() ? "create or replace" : "create", GetFunctionKindAsString(ddl_function), ddl_function.function_name()); } absl::Status error; absl::flat_hash_set<const SchemaNode*> dependencies; if (ddl_function.function_kind() == ddl::Function::FUNCTION) { std::unique_ptr<zetasql::FunctionSignature> function_signature; Udf::Determinism determinism_level = Udf::Determinism::DETERMINISTIC; ZETASQL_RETURN_IF_ERROR( AnalyzeFunctionDefinition(ddl_function, replace, &dependencies, &function_signature, &determinism_level)); ZETASQL_ASSIGN_OR_RETURN( Udf::Builder builder, CreateFunctionBuilder(ddl_function, std::move(function_signature), determinism_level, dependencies)); if (replace) { const Udf* existing_udf = latest_schema_->FindUdf(ddl_function.function_name()); // Check for a recursive view by analyzing the transitive set of // dependencies, i.e., if the view is a dependency of itself. auto transitive_deps = GatherTransitiveDependenciesForSchemaNode(dependencies); if (std::find_if(transitive_deps.begin(), transitive_deps.end(), [existing_udf](const SchemaNode* dep) { return (dep->As<const Udf>() != nullptr && dep->As<const Udf>()->Name() == existing_udf->Name()); }) != transitive_deps.end()) { return error::ViewReplaceRecursive(existing_udf->Name()); } return AlterNode<Udf>(existing_udf, [&](Udf::Editor* editor) -> absl::Status { // Just replace the udf definition completely. // The temp instance inside builder will be // cleaned up when the builder goes out of scope. editor->copy_from(builder.get()); return absl::OkStatus(); }); } if (SDLObjectName::IsFullyQualifiedName(ddl_function.function_name())) { ZETASQL_RETURN_IF_ERROR(AlterInNamedSchema( ddl_function.function_name(), [&builder](NamedSchema::Editor* editor) -> absl::Status { editor->add_udf(builder.get()); return absl::OkStatus(); })); } return AddNode(builder.build()); } else if (ddl_function.function_kind() == ddl::Function::VIEW) { std::vector<View::Column> output_columns; ZETASQL_RETURN_IF_ERROR(AnalyzeFunctionDefinition(ddl_function, replace, &output_columns, &dependencies)); ZETASQL_ASSIGN_OR_RETURN( View::Builder builder, CreateFunctionBuilder(ddl_function, std::move(output_columns), dependencies)); if (replace) { const View* existing_view = latest_schema_->FindView(ddl_function.function_name()); // Check for a recursive view by analyzing the transitive set of // dependencies, i.e., if the view is a dependency of itself. auto transitive_deps = GatherTransitiveDependenciesForSchemaNode(dependencies); if (std::find_if(transitive_deps.begin(), transitive_deps.end(), [existing_view](const SchemaNode* dep) { return (dep->As<const View>() != nullptr && dep->As<const View>()->Name() == existing_view->Name()); }) != transitive_deps.end()) { return error::ViewReplaceRecursive(existing_view->Name()); } return AlterNode<View>(existing_view, [&](View::Editor* editor) -> absl::Status { // Just replace the view definition completely. // The temp instance inside builder will be // cleaned up when the builder goes out of scope. editor->copy_from(builder.get()); return absl::OkStatus(); }); } // No need to account for the named schema in the replace case. if (SDLObjectName::IsFullyQualifiedName(ddl_function.function_name())) { ZETASQL_RETURN_IF_ERROR(AlterInNamedSchema( ddl_function.function_name(), [&builder](NamedSchema::Editor* editor) -> absl::Status { editor->add_view(builder.get()); return absl::OkStatus(); })); } return AddNode(builder.build()); } return absl::OkStatus(); // ERROR FOR UNSUPPORTED VIEW TYPE } template <typename SequenceModifier> absl::Status SchemaUpdaterImpl::SetSequenceOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, SequenceModifier* modifier) { // We are only setting the sequence options as we see them here. We validate // the options in the function `ValidateSequenceOptions`. for (const ddl::SetOption& option : set_options) { if (option.option_name() == kSequenceKindOptionName) { modifier->set_sequence_kind(Sequence::BIT_REVERSED_POSITIVE); } else if (option.option_name() == kSequenceStartWithCounterOptionName) { if (option.has_int64_value()) { modifier->set_start_with_counter(option.int64_value()); } else { modifier->clear_start_with_counter(); } } else if (option.option_name() == kSequenceSkipRangeMinOptionName) { if (option.has_int64_value()) { modifier->set_skip_range_min(option.int64_value()); } else { modifier->clear_skip_range_min(); } } else if (option.option_name() == kSequenceSkipRangeMaxOptionName) { if (option.has_int64_value()) { modifier->set_skip_range_max(option.int64_value()); } else { modifier->clear_skip_range_max(); } } else { return error::UnsupportedSequenceOption(option.option_name()); } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::ValidateSequenceOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, const Sequence* current_sequence) { absl::flat_hash_map<std::string, std::optional<int64_t>> int64_options; int64_options.reserve(3); for (const ddl::SetOption& option : set_options) { if (option.option_name() == kSequenceKindOptionName) { if (!option.has_string_value()) { return error::InvalidSequenceOptionValue(kSequenceKindOptionName, "string"); } if (option.string_value() != kSequenceKindBitReversedPositive) { return error::UnsupportedSequenceKind(option.string_value()); } } else if (option.option_name() == kSequenceStartWithCounterOptionName || option.option_name() == kSequenceSkipRangeMinOptionName || option.option_name() == kSequenceSkipRangeMaxOptionName) { // In this block, we're only assigning values to the temporary // `int64_options` map as we see them. We check for their validity // after this for loop. // Checking for null value here is necessary, since a sequence can be // altered to clear the option. if (option.has_null_value()) { int64_options[option.option_name()] = std::nullopt; } else { if (!option.has_int64_value()) { return error::InvalidSequenceOptionValue(option.option_name(), "integer"); } int64_options[option.option_name()] = option.int64_value(); } } else { return error::UnsupportedSequenceOption(option.option_name()); } } if (int64_options.contains(kSequenceStartWithCounterOptionName)) { if (int64_options[kSequenceStartWithCounterOptionName].has_value() && int64_options[kSequenceStartWithCounterOptionName].value() < 1) { return error::InvalidSequenceStartWithCounterValue(); } } std::optional<int64_t> skip_range_min, skip_range_max; if (current_sequence != nullptr) { // We are altering the sequence to set options. So we're loading the // current values of these options to compare against the new values we're // seeing. skip_range_min = current_sequence->skip_range_min(); skip_range_max = current_sequence->skip_range_max(); } // In this condition and below, if skip_range_min (or skip_range_max) is // explicitly set to null, we are overwriting its current value here. if (int64_options.contains(kSequenceSkipRangeMinOptionName)) { skip_range_min = int64_options[kSequenceSkipRangeMinOptionName]; } if (int64_options.contains(kSequenceSkipRangeMaxOptionName)) { skip_range_max = int64_options[kSequenceSkipRangeMaxOptionName]; } if (skip_range_min.has_value() != skip_range_max.has_value()) { return error::SequenceSkipRangeMinMaxNotSetTogether(); } if (!skip_range_min.has_value()) { // Skipped range is not set, safely return. return absl::OkStatus(); } if (skip_range_min.value() > skip_range_max.value()) { return error::SequenceSkipRangeMinLargerThanMax(); } if (skip_range_max.value() < 1) { return error::SequenceSkippedRangeHasAtleastOnePositiveNumber(); } return absl::OkStatus(); } bool SchemaUpdaterImpl::IsDefaultSequenceKindSet() { const DatabaseOptions* db_options = latest_schema_->options(); return db_options != nullptr && db_options->default_sequence_kind().has_value() && db_options->default_sequence_kind().value() == kSequenceKindBitReversedPositive; } void SchemaUpdaterImpl::SetOptionsForSequenceClauses( const ddl::CreateSequence& create_sequence, ::google::protobuf::RepeatedPtrField<ddl::SetOption>* set_options) { ddl::SetOption* option; if (create_sequence.has_type() && create_sequence.type() == ddl::CreateSequence::BIT_REVERSED_POSITIVE) { option = set_options->Add(); option->set_option_name(kSequenceKindOptionName); option->set_string_value(kSequenceKindBitReversedPositive); } if (create_sequence.has_start_with_counter()) { option = set_options->Add(); option->set_option_name(kSequenceStartWithCounterOptionName); option->set_int64_value(create_sequence.start_with_counter()); } if (create_sequence.has_skip_range_min()) { option = set_options->Add(); option->set_option_name(kSequenceSkipRangeMinOptionName); option->set_int64_value(create_sequence.skip_range_min()); option = set_options->Add(); option->set_option_name(kSequenceSkipRangeMaxOptionName); option->set_int64_value(create_sequence.skip_range_max()); } } absl::StatusOr<const Sequence*> SchemaUpdaterImpl::CreateSequence( const ddl::CreateSequence& create_sequence, const database_api::DatabaseDialect& dialect, bool is_internal_use) { // Validate the sequence name in global_names_ ZETASQL_RETURN_IF_ERROR( global_names_.AddName("Sequence", create_sequence.sequence_name())); Sequence::Builder builder; std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to sequence " << create_sequence.sequence_name(); } builder.set_name(create_sequence.sequence_name()); absl::BitGen bitgen; builder.set_id(absl::StrCat( "seq_", zetasql::functions::GenerateUuid(bitgen) )); ::google::protobuf::RepeatedPtrField<ddl::SetOption> clause_options; if (dialect == database_api::DatabaseDialect::GOOGLE_STANDARD_SQL) { bool created_from_syntax = create_sequence.has_type() || create_sequence.has_start_with_counter() || create_sequence.has_skip_range_min(); if (created_from_syntax && !create_sequence.set_options().empty()) { return error::CannotSetSequenceClauseAndOptionTogether( create_sequence.sequence_name()); } if (created_from_syntax) { SetOptionsForSequenceClauses(create_sequence, &clause_options); builder.set_created_from_syntax(); } } if (!create_sequence.set_options().empty()) { builder.set_created_from_options(); } if (is_internal_use) { builder.set_internal_use(); } const Sequence* sequence = builder.get(); // Validate and set sequence options. const auto& set_options = clause_options.empty() ? create_sequence.set_options() : clause_options; ZETASQL_RETURN_IF_ERROR(ValidateSequenceOptions(set_options, /*current_sequence=*/nullptr)); ZETASQL_RETURN_IF_ERROR(AlterNode<Sequence>( sequence, [this, set_options](Sequence::Editor* editor) -> absl::Status { bool has_sequence_kind_option = false; for (const ddl::SetOption& option : set_options) { if (option.option_name() == kSequenceKindOptionName) { has_sequence_kind_option = true; break; } } if (!has_sequence_kind_option) { if (IsDefaultSequenceKindSet()) { editor->set_sequence_kind(Sequence::BIT_REVERSED_POSITIVE); editor->set_use_default_sequence_kind_option(); } else { return error::UnspecifiedSequenceKind(); } } // Set sequence options return SetSequenceOptions(set_options, editor); })); if (SDLObjectName::IsFullyQualifiedName(create_sequence.sequence_name())) { ZETASQL_RETURN_IF_ERROR(AlterInNamedSchema( create_sequence.sequence_name(), [&sequence](NamedSchema::Editor* editor) -> absl::Status { editor->add_sequence(sequence); return absl::OkStatus(); })); } ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return sequence; } absl::Status SchemaUpdaterImpl::CreateNamedSchema( const ddl::CreateSchema& create_schema) { if (latest_schema_->FindNamedSchema(create_schema.schema_name()) != nullptr && create_schema.existence_modifier() == ddl::IF_NOT_EXISTS) { return absl::OkStatus(); } // Validate the named schema name in global_names_ ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Schema", create_schema.schema_name())); NamedSchema::Builder builder; builder.set_name(create_schema.schema_name()); std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid(); builder.set_postgresql_oid(oid); if (oid.has_value()) { ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to named schema " << create_schema.schema_name(); } ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::CreateLocalityGroup( const ddl::CreateLocalityGroup& create_locality_group) { if (global_names_.HasName(create_locality_group.locality_group_name()) && create_locality_group.existence_modifier() == ddl::IF_NOT_EXISTS) { return absl::OkStatus(); } if (create_locality_group.locality_group_name() == ddl::kDefaultLocalityGroupName) { return error::CreatingDefaultLocalityGroup(); } if (absl::StartsWith(create_locality_group.locality_group_name(), "_")) { return error::InvalidLocalityGroupName( create_locality_group.locality_group_name()); } // Validate the locality group name in global_names_ ZETASQL_RETURN_IF_ERROR(global_names_.AddName( "LocalityGroup", create_locality_group.locality_group_name())); LocalityGroup::Builder builder; builder.set_name(create_locality_group.locality_group_name()); const LocalityGroup* locality_group = builder.get(); // Validate and set locality group options. const auto& set_options = create_locality_group.set_options(); if (!set_options.empty()) { ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>( locality_group, [this, set_options](LocalityGroup::Editor* editor) -> absl::Status { // Set locality group options return SetLocalityGroupOptions(set_options, editor); })); } // Add the locality group to the schema. ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterRowDeletionPolicy( std::optional<ddl::RowDeletionPolicy> row_deletion_policy, const Table* table) { return AlterNode(table, [&](Table::Editor* editor) { editor->set_row_deletion_policy(row_deletion_policy); return absl::OkStatus(); }); } absl::Status SchemaUpdaterImpl::ValidateAlterDatabaseOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options) { for (const ddl::SetOption& option : set_options) { absl::string_view option_name = absl::StripPrefix(option.option_name(), "spanner.internal.cloud_"); if (option_name == ddl::kWitnessLocationOptionName) { if (option.has_string_value()) { continue; } else if (option.has_null_value()) { return error::NullValueAlterDatabaseOption(); } } else if (option_name == ddl::kDefaultLeaderOptionName) { if (option.has_string_value()) { continue; } else if (option.has_null_value()) { return error::NullValueAlterDatabaseOption(); } } else if (option_name == ddl::kDefaultSequenceKindOptionName) { if (option.has_string_value() || option.has_null_value()) { if (latest_schema_->options() != nullptr && latest_schema_->options()->default_sequence_kind().has_value()) { return error::DefaultSequenceKindAlreadySet(); } if (option.has_string_value() && option.string_value() != kSequenceKindBitReversedPositive) { return error::UnsupportedSequenceKind(option.string_value()); } } else { return error::UnsupportedDefaultSequenceKindOptionValues(); } } else if (option_name == ddl::kDefaultTimeZoneOptionName) { if (option.has_string_value() || option.has_null_value()) { if (!latest_schema_->tables().empty()) { return error::ChangeDefaultTimeZoneOnNonEmptyDatabase(); } absl::TimeZone time_zone; if (option.has_string_value() && !absl::LoadTimeZone(option.string_value(), &time_zone)) { return error::InvalidDefaultTimeZoneOption(option.string_value()); } } else { return error::UnsupportedDefaultTimeZoneOptionValues(); } } else if (option_name == ddl::kVersionRetentionPeriodOptionName) { if (option.has_string_value() || option.has_null_value()) { continue; } return error::UnsupportedVersionRetentionPeriodOptionValues(); } else { return error::UnsupportedAlterDatabaseOption(option_name); } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::RenameTo( const ddl::AlterTable::RenameTo& rename_to, const Table* table) { ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Table", rename_to.name())); global_names_.RemoveName(table->Name()); const Table* updated_table = nullptr; ZETASQL_RETURN_IF_ERROR(AlterNode(table, [&](Table::Editor* editor) { editor->set_name(rename_to.name()); updated_table = editor->get(); return absl::OkStatus(); })); // Handle synonyms. if (!rename_to.synonym().empty()) { ZETASQL_RETURN_IF_ERROR(AddSynonym(rename_to.synonym(), updated_table)); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AddSynonym(const std::string& synonym, const Table* table) { ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Table", synonym)); const Table* updated_table = nullptr; ZETASQL_RETURN_IF_ERROR(AlterNode(table, [&](Table::Editor* editor) { editor->set_synonym(synonym); updated_table = editor->get(); return absl::OkStatus(); })); if (SDLObjectName::IsFullyQualifiedName(synonym)) { ZETASQL_RETURN_IF_ERROR(AlterInNamedSchema( synonym, [&updated_table](NamedSchema::Editor* editor) -> absl::Status { editor->add_synonym(updated_table); return absl::OkStatus(); })); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::DropSynonym( const ddl::AlterTable::DropSynonym& drop_synonym, const Table* table) { if (SDLObjectName::IsFullyQualifiedName(drop_synonym.synonym())) { ZETASQL_RETURN_IF_ERROR(AlterInNamedSchema( drop_synonym.synonym(), [&table](NamedSchema::Editor* editor) -> absl::Status { editor->drop_synonym(table); return absl::OkStatus(); })); } global_names_.RemoveName(drop_synonym.synonym()); return AlterNode(table, [&](Table::Editor* editor) { editor->drop_synonym(drop_synonym.synonym()); return absl::OkStatus(); }); } absl::Status SchemaUpdaterImpl::AlterDatabase( const ddl::AlterDatabase& alter_database, const database_api::DatabaseDialect& dialect) { const auto& set_options = alter_database.set_options(); if (!set_options.options().empty()) { ZETASQL_RETURN_IF_ERROR(ValidateAlterDatabaseOptions(set_options.options())); const DatabaseOptions* database_options = latest_schema_->options(); if (database_options == nullptr) { DatabaseOptions::Builder builder; builder.set_db_name(alter_database.db_name()); ZETASQL_RETURN_IF_ERROR( SetDatabaseOptions(set_options.options(), dialect, &builder)); ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); } else { const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& repeated_set_options = set_options.options(); ZETASQL_RETURN_IF_ERROR(AlterNode<DatabaseOptions>( database_options, [this, repeated_set_options, dialect](DatabaseOptions::Editor* editor) -> absl::Status { // Set database options return SetDatabaseOptions(repeated_set_options, dialect, editor); })); } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterChangeStream( const ddl::AlterChangeStream& alter_change_stream) { const ChangeStream* change_stream = latest_schema_->FindChangeStream( alter_change_stream.change_stream_name()); if (change_stream == nullptr) { return error::ChangeStreamNotFound( alter_change_stream.change_stream_name()); } std::string change_stream_name = change_stream->Name(); switch (alter_change_stream.alter_type_case()) { case ddl::AlterChangeStream::kSetForClause: { ZETASQL_RETURN_IF_ERROR(AlterChangeStreamForClause( alter_change_stream.set_for_clause(), change_stream)); return absl::OkStatus(); } case ddl::AlterChangeStream::kSetOptions: { const auto& set_options = alter_change_stream.set_options(); const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& repeated_set_options = set_options.options(); ZETASQL_RETURN_IF_ERROR(ValidateChangeStreamOptions(repeated_set_options)); ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>( change_stream, [this, repeated_set_options](ChangeStream::Editor* editor) -> absl::Status { // Set change stream options return SetChangeStreamOptions(repeated_set_options, editor); })); return absl::OkStatus(); } case ddl::AlterChangeStream::kDropForClause: { ZETASQL_RET_CHECK(alter_change_stream.drop_for_clause().all()); if (!change_stream->for_clause()) { return error::AlterChangeStreamDropNonexistentForClause( change_stream->Name()); } ZETASQL_RETURN_IF_ERROR(UnregisterChangeStreamFromTrackedObjects(change_stream)); ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>( change_stream, [](ChangeStream::Editor* editor) -> absl::Status { editor->clear_for_clause(); editor->clear_tracked_tables_columns(); return absl::OkStatus(); })); return absl::OkStatus(); } default: return error::Internal( absl::StrCat("Unsupported alter change stream type: ", alter_change_stream.DebugString())); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterChangeStreamForClause( const ddl::ChangeStreamForClause& ddl_change_stream_for_clause, const ChangeStream* change_stream) { ZETASQL_RETURN_IF_ERROR(ValidateChangeStreamForClause(ddl_change_stream_for_clause, change_stream->Name())); ZETASQL_RETURN_IF_ERROR(ValidateChangeStreamLimits(ddl_change_stream_for_clause, change_stream->Name())); ZETASQL_RETURN_IF_ERROR(UnregisterChangeStreamFromTrackedObjects(change_stream)); ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>( change_stream, [&](ChangeStream::Editor* editor) -> absl::Status { editor->set_for_clause(ddl_change_stream_for_clause); editor->set_track_all(ddl_change_stream_for_clause.all()); return absl::OkStatus(); })); ZETASQL_RETURN_IF_ERROR( RegisterTrackedObjects(ddl_change_stream_for_clause, change_stream)); ZETASQL_RETURN_IF_ERROR(EditChangeStreamTrackedObjects(ddl_change_stream_for_clause, change_stream)); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterSequence( const ddl::AlterSequence& alter_sequence, const Sequence* current_sequence) { std::string sequence_name = current_sequence->Name(); if (alter_sequence.alter_type_case() != ddl::AlterSequence::kSetOptions && alter_sequence.alter_type_case() != ddl::AlterSequence::kSetSkipRange && alter_sequence.alter_type_case() != ddl::AlterSequence::kSetStartWithCounter) { return error::Internal( absl::StrCat("Unsupported alter sequence statement: ", alter_sequence.DebugString())); } ::google::protobuf::RepeatedPtrField<ddl::SetOption> repeated_set_options; bool set_from_syntax = false; if (alter_sequence.alter_type_case() == ddl::AlterSequence::kSetOptions) { if (current_sequence->created_from_syntax()) { return error::CannotSetSequenceClauseAndOptionTogether(sequence_name); } repeated_set_options = alter_sequence.set_options().options(); } else if (alter_sequence.alter_type_case() == ddl::AlterSequence::kSetSkipRange) { if (current_sequence->created_from_options()) { return error::CannotSetSequenceClauseAndOptionTogether(sequence_name); } ddl::SetOption* skip_range_min = repeated_set_options.Add(); ddl::SetOption* skip_range_max = repeated_set_options.Add(); skip_range_min->set_option_name(kSequenceSkipRangeMinOptionName); skip_range_max->set_option_name(kSequenceSkipRangeMaxOptionName); if (alter_sequence.set_skip_range().has_min_value()) { skip_range_min->set_int64_value( alter_sequence.set_skip_range().min_value()); skip_range_max->set_int64_value( alter_sequence.set_skip_range().max_value()); } else { skip_range_min->set_null_value(true); skip_range_max->set_null_value(true); } set_from_syntax = true; } else if (alter_sequence.alter_type_case() == ddl::AlterSequence::kSetStartWithCounter) { if (current_sequence->created_from_options()) { return error::CannotSetSequenceClauseAndOptionTogether(sequence_name); } ddl::SetOption* start_with_counter = repeated_set_options.Add(); start_with_counter->set_option_name(kSequenceStartWithCounterOptionName); start_with_counter->set_int64_value( alter_sequence.set_start_with_counter().counter_value()); set_from_syntax = true; } ZETASQL_RETURN_IF_ERROR( ValidateSequenceOptions(repeated_set_options, current_sequence)); ZETASQL_RETURN_IF_ERROR(AlterNode<Sequence>( current_sequence, [this, set_from_syntax, repeated_set_options](Sequence::Editor* editor) -> absl::Status { if (set_from_syntax) { editor->set_created_from_syntax(); } else { editor->set_created_from_options(); } // Set sequence options return SetSequenceOptions(repeated_set_options, editor); })); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterNamedSchema( const ddl::AlterSchema& alter_schema) { const NamedSchema* current_named_schema = latest_schema_->FindNamedSchema(alter_schema.schema_name()); if (current_named_schema == nullptr) { if (alter_schema.if_exists() == true) { return absl::OkStatus(); } return error::NamedSchemaNotFound(alter_schema.schema_name()); } return error::AlterNamedSchemaNotSupported(); } absl::Status SchemaUpdaterImpl::AlterTable( const ddl::AlterTable& alter_table, const database_api::DatabaseDialect& dialect) { const Table* table = latest_schema_->FindTableCaseSensitive(alter_table.table_name()); if (table == nullptr) { return error::TableNotFound(alter_table.table_name()); } switch (alter_table.alter_type_case()) { case ddl::AlterTable::kSetInterleaveClause: { return AlterInterleaveAction( alter_table.set_interleave_clause().interleave_clause().on_delete(), table); } case ddl::AlterTable::kSetOnDelete: { return AlterInterleaveAction(alter_table.set_on_delete().action(), table); } case ddl::AlterTable::kAddCheckConstraint: { if (dialect == database_api::DatabaseDialect::POSTGRESQL) { ddl::CheckConstraint mutable_check_constraint = alter_table.add_check_constraint().check_constraint(); ZETASQL_RET_CHECK(mutable_check_constraint.has_expression_origin()); ZETASQL_ASSIGN_OR_RETURN(ExpressionTranslateResult result, TranslatePostgreSqlExpression( table, /*ddl_create_table=*/nullptr, mutable_check_constraint.expression_origin() .original_expression())); mutable_check_constraint.mutable_expression_origin() ->set_original_expression(result.original_postgresql_expression); mutable_check_constraint.set_expression( result.translated_googlesql_expression); return AddCheckConstraint(mutable_check_constraint, table); } return AddCheckConstraint( alter_table.add_check_constraint().check_constraint(), table); } case ddl::AlterTable::kAddForeignKey: { return AddForeignKey(alter_table.add_foreign_key().foreign_key(), table); } case ddl::AlterTable::kDropConstraint: { return DropConstraint(alter_table.drop_constraint().name(), table); } case ddl::AlterTable::kAddColumn: { const auto& column_def = alter_table.add_column().column(); // If the column exists but IF_NOT_EXISTS is set then we're fine. if (table->FindColumn(column_def.column_name()) != nullptr && alter_table.add_column().existence_modifier() == ddl::IF_NOT_EXISTS) { return absl::OkStatus(); } ZETASQL_ASSIGN_OR_RETURN(const Column* new_column, CreateColumn(column_def, table, /*ddl_table=*/ nullptr, dialect)); if (new_column->is_generated()) { const_cast<Column*>(new_column)->PopulateDependentColumns(); } if (new_column->is_trackable_by_change_stream()) { // Add the newly added column to tracking objects map for each change // stream implicitly/explicitly tracking the entire table this column // belongs to for (const ChangeStream* change_stream : table->change_streams()) { ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>( change_stream, [table, new_column]( ChangeStream::Editor* change_stream_editor) -> absl::Status { change_stream_editor->add_tracked_table_column( table->Name(), new_column->Name()); return absl::OkStatus(); })); } // Populate the list of change streams tracking this column ZETASQL_RETURN_IF_ERROR(AlterNode<Column>( new_column, [table](Column::Editor* column_editor) -> absl::Status { for (const ChangeStream* change_stream : table->change_streams()) { column_editor->add_change_stream(change_stream); } return absl::OkStatus(); })); } ZETASQL_RETURN_IF_ERROR(AlterNode<Table>( table, [new_column](Table::Editor* editor) -> absl::Status { editor->add_column(new_column); return absl::OkStatus(); })); return absl::OkStatus(); } case ddl::AlterTable::kAlterColumn: { const std::string& column_name = alter_table.alter_column().column().column_name(); const Column* column = table->FindColumn(column_name); if (column == nullptr) { return error::ColumnNotFound(table->Name(), column_name); } const auto& alter_column = alter_table.alter_column(); if (alter_column.has_operation()) { if (alter_column.operation() == ddl::AlterTable::AlterColumn::ALTER_IDENTITY) { if (!column->is_identity_column()) { return error::ColumnIsNotIdentityColumn(table->Name(), column_name); } ZETASQL_RET_CHECK(column->sequences_used().size() == 1); const Sequence* sequence = static_cast<const Sequence*>(column->sequences_used().at(0)); const auto& column_def = alter_column.column(); ddl::AlterSequence alter_sequence; alter_sequence.set_sequence_name(sequence->Name()); if (alter_column.identity_alter_start_with_counter()) { ddl::SetOption* start_with_counter = alter_sequence.mutable_set_options()->add_options(); start_with_counter->set_option_name("start_with_counter"); start_with_counter->set_int64_value( column_def.identity_column().start_with_counter()); } if (alter_column.identity_alter_skip_range()) { ddl::SetOption* skip_range_min = alter_sequence.mutable_set_options()->add_options(); ddl::SetOption* skip_range_max = alter_sequence.mutable_set_options()->add_options(); skip_range_min->set_option_name("skip_range_min"); skip_range_max->set_option_name("skip_range_max"); if (column_def.identity_column().has_skip_range_min()) { skip_range_min->set_int64_value( column_def.identity_column().skip_range_min()); skip_range_max->set_int64_value( column_def.identity_column().skip_range_max()); } else { skip_range_min->set_null_value(true); skip_range_max->set_null_value(true); } } ZETASQL_RETURN_IF_ERROR(AlterSequence(alter_sequence, sequence)); } else { ZETASQL_RETURN_IF_ERROR(AlterNode<Column>( column, [this, &alter_column, &column, &table, &dialect](Column::Editor* editor) -> absl::Status { return AlterColumnSetDropDefault(alter_column, table, column, dialect, editor); })); } } else { const auto& column_def = alter_column.column(); ZETASQL_RETURN_IF_ERROR(AlterNode<Column>( column, [this, &column_def, &table, dialect](Column::Editor* editor) -> absl::Status { return AlterColumnDefinition(column_def, table, dialect, editor); })); } return absl::OkStatus(); } case ddl::AlterTable::kDropColumn: { const Column* column = table->FindColumn(alter_table.drop_column()); if (column == nullptr) { return error::ColumnNotFound(table->Name(), alter_table.drop_column()); } if (!column->change_streams_explicitly_tracking_column().empty()) { std::vector<std::string> change_stream_names_list; for (const ChangeStream* const change_stream : column->change_streams()) { change_stream_names_list.push_back(change_stream->Name()); } std::string change_stream_names = absl::StrJoin(change_stream_names_list, ","); return error::DropColumnWithChangeStream( table->Name(), column->Name(), change_stream_names_list.size(), change_stream_names); } ZETASQL_RETURN_IF_ERROR(DropNode(column)); if (column->is_identity_column()) { ZETASQL_RET_CHECK(column->sequences_used().size() == 1); const Sequence* sequence = static_cast<const Sequence*>(column->sequences_used().at(0)); ZETASQL_RETURN_IF_ERROR(DropSequence(sequence)); } if (column->locality_group() != nullptr) { ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>( column->locality_group(), [](LocalityGroup::Editor* editor) -> absl::Status { editor->decrement_use_count(); return absl::OkStatus(); })); } return absl::OkStatus(); } case ddl::AlterTable::kAddRowDeletionPolicy: { const auto& policy = alter_table.add_row_deletion_policy(); if (!table->row_deletion_policy().has_value()) { return AlterRowDeletionPolicy(policy, table); } else { return error::RowDeletionPolicyAlreadyExists(policy.column_name(), table->Name()); } } case ddl::AlterTable::kAlterRowDeletionPolicy: { const auto& policy = alter_table.alter_row_deletion_policy(); if (table->row_deletion_policy().has_value()) { return AlterRowDeletionPolicy(policy, table); } else { return error::RowDeletionPolicyDoesNotExist(table->Name()); } } case ddl::AlterTable::kDropRowDeletionPolicy: { if (table->row_deletion_policy().has_value()) { return AlterRowDeletionPolicy(std::nullopt, table); } else { return error::RowDeletionPolicyDoesNotExist(table->Name()); } } case ddl::AlterTable::kRenameTo: { const auto& rename_to = alter_table.rename_to(); return RenameTo(rename_to, table); } case ddl::AlterTable::kAddSynonym: { const auto& add_synonym = alter_table.add_synonym(); if (table->synonym().empty()) { return AddSynonym(add_synonym.synonym(), table); } else { return error::SynonymAlreadyExists(table->synonym(), table->Name()); } } case ddl::AlterTable::kDropSynonym: { const auto& drop_synonym = alter_table.drop_synonym(); if (drop_synonym.synonym() == table->synonym()) { return DropSynonym(drop_synonym, table); } else { return error::SynonymDoesNotExist(drop_synonym.synonym(), table->Name()); } } case ddl::AlterTable::kSetOptions: { ZETASQL_RETURN_IF_ERROR(AlterNode<Table>( table, [this, &alter_table](Table::Editor* editor) -> absl::Status { return SetTableOptions(alter_table.set_options().options(), editor); })); return absl::OkStatus(); } default: return error::Internal(absl::StrCat("Unsupported alter table type: ", alter_table.DebugString())); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterIndex(const ddl::AlterIndex& alter_index) { const Index* index = latest_schema_->FindIndex(alter_index.index_name()); if (index == nullptr) { return error::IndexNotFound(alter_index.index_name()); } const Table* indexed_table = index->indexed_table(); const Table* indexed_data_table = index->index_data_table(); ZETASQL_RET_CHECK(indexed_table != nullptr) << "No indexed table found for the index "; ZETASQL_RET_CHECK(indexed_data_table != nullptr) << "No index data table found for the index "; switch (alter_index.alter_type_case()) { case ddl::AlterIndex::kAddStoredColumn: { const std::string& column_name = alter_index.add_stored_column().column_name(); if (indexed_table->FindColumn(column_name) == nullptr) { return error::ColumnNotFound(indexed_table->Name(), column_name); } if (indexed_data_table->FindColumn(column_name) != nullptr) { return error::ColumnInIndexAlreadyExists(index->Name(), column_name); } ZETASQL_ASSIGN_OR_RETURN(const Column* new_index_data_table_column, CreateIndexDataTableColumn(indexed_table, column_name, indexed_data_table, false)); ZETASQL_RETURN_IF_ERROR(AlterNode<Table>( indexed_data_table, [new_index_data_table_column](Table::Editor* editor) -> absl::Status { editor->add_column(new_index_data_table_column); return absl::OkStatus(); })); ZETASQL_RETURN_IF_ERROR(AlterNode<Index>( index, [new_index_data_table_column](Index::Editor* editor) -> absl::Status { editor->add_stored_column(new_index_data_table_column); return absl::OkStatus(); })); statement_context_->AddAction( [index, new_index_data_table_column]( const SchemaValidationContext* context) { return BackfillIndexAddedColumn(index, new_index_data_table_column, context); }); break; } case ddl::AlterIndex::kDropStoredColumn: { const std::string& column_name = alter_index.drop_stored_column(); auto stored_columns = index->stored_columns(); if (!absl::c_any_of(stored_columns, [column_name](const Column* c) { return c->Name() == column_name; })) { return error::ColumnNotFoundInIndex(alter_index.index_name(), column_name); } const Column* drop_column = indexed_data_table->FindColumn(column_name); return DropNode(drop_column); break; } case ddl::AlterIndex::kSetOptions: { ZETASQL_RETURN_IF_ERROR(AlterNode<Index>( index, [this, &alter_index](Index::Editor* editor) -> absl::Status { return SetIndexOptions(alter_index.set_options().options(), editor); })); return absl::OkStatus(); } default: ZETASQL_RET_CHECK_FAIL() << "Invalid alter index type: " << absl::StrCat(alter_index); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterVectorIndex( const ddl::AlterVectorIndex& alter_index) { const Index* index = latest_schema_->FindIndex(alter_index.index_name()); if (index == nullptr || !index->is_vector_index()) { return error::IndexNotFound(alter_index.index_name()); } const Table* indexed_table = index->indexed_table(); const Table* indexed_data_table = index->index_data_table(); ZETASQL_RET_CHECK(indexed_table != nullptr) << "No indexed table found for the index "; ZETASQL_RET_CHECK(indexed_data_table != nullptr) << "No index data table found for the index "; switch (alter_index.alter_type_case()) { case ddl::AlterVectorIndex::kAddStoredColumn: { const std::string& column_name = alter_index.add_stored_column().column().name(); if (indexed_table->FindColumn(column_name) == nullptr) { return error::VectorIndexStoredColumnNotFound(alter_index.index_name(), column_name); } for (const Column* column : index->stored_columns()) { if (column->Name() == column_name) { return error::VectorIndexStoredColumnAlreadyExists( alter_index.index_name(), column_name); } } if (indexed_table->FindKeyColumn(column_name) != nullptr) { return error::VectorIndexStoredColumnIsKey( alter_index.index_name(), column_name, indexed_table->Name()); } if (indexed_data_table->FindColumn(column_name) != nullptr) { return error::VectorIndexStoredColumnAlreadyPrimaryKey( alter_index.index_name(), column_name); } ZETASQL_ASSIGN_OR_RETURN(const Column* new_index_data_table_column, CreateIndexDataTableColumn(indexed_table, column_name, indexed_data_table, false)); ZETASQL_RETURN_IF_ERROR(AlterNode<Table>( indexed_data_table, [new_index_data_table_column](Table::Editor* editor) -> absl::Status { editor->add_column(new_index_data_table_column); return absl::OkStatus(); })); ZETASQL_RETURN_IF_ERROR(AlterNode<Index>( index, [new_index_data_table_column](Index::Editor* editor) -> absl::Status { editor->add_stored_column(new_index_data_table_column); return absl::OkStatus(); })); break; } case ddl::AlterVectorIndex::kDropStoredColumn: { const std::string& column_name = alter_index.drop_stored_column(); if (indexed_data_table->FindColumn(column_name) == nullptr) { return error::ColumnNotFound(index->Name(), column_name); } auto stored_columns = index->stored_columns(); bool is_stored_column = absl::c_any_of( stored_columns, [column_name](const Column* c) { return c->Name() == column_name; }); if (!is_stored_column) { if (!absl::c_any_of(index->key_columns(), [column_name](const KeyColumn* c) { return c->column()->Name() == column_name; })) { return error::ColumnNotFound(index->Name(), column_name); } return error::VectorIndexNotStoredColumn(alter_index.index_name(), column_name); } const Column* drop_column = indexed_data_table->FindColumn(column_name); return DropNode(drop_column); break; } case ddl::AlterVectorIndex::kSetOptions: { return error::AlterVectorIndexSetOptionsUnsupported(); } case ddl::AlterVectorIndex::kAlterStoredColumn: { return error::AlterVectorIndexStoredColumnUnsupported(); } default: ZETASQL_RET_CHECK_FAIL() << "Invalid alter index type: " << absl::StrCat(alter_index); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterInterleaveAction( const ddl::InterleaveClause::Action& ddl_interleave_action, const Table* table) { return AlterNode<Table>(table, [&](Table::Editor* editor) { if (ddl_interleave_action == ddl::InterleaveClause::CASCADE) { editor->set_on_delete(Table::OnDeleteAction::kCascade); } else { editor->set_on_delete(Table::OnDeleteAction::kNoAction); } return absl::OkStatus(); }); } absl::StatusOr<const zetasql::Type*> SchemaUpdaterImpl::GetProtoTypeFromBundle(const zetasql::Type* type, const ProtoBundle* proto_bundle) { ZETASQL_RET_CHECK(type->IsProto() || type->IsEnum()); auto ddl_type = GoogleSqlTypeToDDLColumnType(type); return DDLColumnTypeToGoogleSqlType(ddl_type, type_factory_, proto_bundle); } absl::Status SchemaUpdaterImpl::AlterProtoColumnType( const Column* column, const ProtoBundle* proto_bundle, Column::Editor* editor) { const zetasql::Type* type = column->GetType(); ZETASQL_RET_CHECK(proto_bundle != nullptr && (type->IsProto() || type->IsEnum() || type->IsArray())); if (type->IsArray()) { const zetasql::Type* element_type = type->AsArray()->element_type(); ZETASQL_RET_CHECK(element_type->IsProto() || element_type->IsEnum()); ZETASQL_ASSIGN_OR_RETURN(auto updated_element_type, GetProtoTypeFromBundle(element_type, proto_bundle)); const zetasql::Type* updated_array_type; ZETASQL_RETURN_IF_ERROR(type_factory_->MakeArrayType(updated_element_type, &updated_array_type)); editor->set_type(updated_array_type); return absl::OkStatus(); } ZETASQL_ASSIGN_OR_RETURN(auto updated_type, GetProtoTypeFromBundle(type, proto_bundle)); editor->set_type(updated_type); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AlterProtoColumnTypes( const ProtoBundle* proto_bundle, const ddl::AlterProtoBundle& ddl_alter_proto_bundle) { ZETASQL_RET_CHECK_NE(proto_bundle, nullptr); // Return if no types are being updated if (ddl_alter_proto_bundle.update_type().empty()) return absl::OkStatus(); absl::flat_hash_set<std::string> update_types; for (const auto& type : ddl_alter_proto_bundle.update_type()) { update_types.insert(type.source_name()); } const auto& tables = latest_schema_->tables(); for (const auto& table : tables) { auto columns = table->columns(); for (const Column* column : columns) { const zetasql::Type* column_type = column->GetType(); if (column_type->IsArray()) { column_type = column_type->AsArray()->element_type(); } if (!column_type->IsProto() && !column_type->IsEnum()) continue; // Types not in alter proto bundle do not require edits absl::string_view type_name = column_type->IsProto() ? column_type->AsProto()->descriptor()->full_name() : column_type->AsEnum()->enum_descriptor()->full_name(); if (!update_types.contains(type_name)) { continue; } ZETASQL_RETURN_IF_ERROR(AlterNode<Column>( column, [this, &column, &proto_bundle](Column::Editor* editor) -> absl::Status { return AlterProtoColumnType(column, proto_bundle, editor); })); } } return absl::OkStatus(); } absl::StatusOr<std::shared_ptr<const ProtoBundle>> SchemaUpdaterImpl::AlterProtoBundle( const ddl::AlterProtoBundle& ddl_alter_proto_bundle, absl::string_view proto_descriptor_bytes) { if (latest_schema_->proto_bundle()->empty()) { return absl::FailedPreconditionError( "Proto bundle does not yet exist; cannot alter it"); } ZETASQL_ASSIGN_OR_RETURN( auto proto_bundle_builder, ProtoBundle::Builder::New(proto_descriptor_bytes, latest_schema_->proto_bundle().get())); auto insert_types = ddl_alter_proto_bundle.insert_type(); std::vector<std::string> insert_type_names; insert_type_names.reserve(insert_types.size()); for (int i = 0; i < insert_types.size(); ++i) { insert_type_names.push_back(insert_types.at(i).source_name()); } ZETASQL_RETURN_IF_ERROR(proto_bundle_builder->InsertTypes(insert_type_names)); auto update_types = ddl_alter_proto_bundle.update_type(); std::vector<std::string> update_type_names; update_type_names.reserve(update_types.size()); for (int i = 0; i < update_types.size(); ++i) { update_type_names.push_back(update_types.at(i).source_name()); } ZETASQL_RETURN_IF_ERROR(proto_bundle_builder->UpdateTypes(update_type_names)); auto delete_types = ddl_alter_proto_bundle.delete_type(); ZETASQL_RETURN_IF_ERROR(proto_bundle_builder->DeleteTypes( std::vector(delete_types.begin(), delete_types.end()))); ZETASQL_ASSIGN_OR_RETURN(auto proto_bundle, proto_bundle_builder->Build()); ZETASQL_RETURN_IF_ERROR( AlterProtoColumnTypes(proto_bundle.get(), ddl_alter_proto_bundle)); return proto_bundle; } absl::Status SchemaUpdaterImpl::AlterLocalityGroup( const ddl::AlterLocalityGroup& alter_locality_group) { const LocalityGroup* locality_group = latest_schema_->FindLocalityGroup( alter_locality_group.locality_group_name()); LocalityGroup::Builder builder; if (locality_group == nullptr) { // Create default locality group if it does not exist. if (alter_locality_group.locality_group_name() == ddl::kDefaultLocalityGroupName) { builder.set_name(ddl::kDefaultLocalityGroupName); locality_group = builder.get(); ZETASQL_RETURN_IF_ERROR(AddNode(builder.build())); } else if (alter_locality_group.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } else { return error::LocalityGroupNotFound( alter_locality_group.locality_group_name()); } } // Validate and set locality group options. const auto& set_options = alter_locality_group.set_options(); if (!set_options.options().empty()) { ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>( locality_group, [this, &set_options](LocalityGroup::Editor* editor) -> absl::Status { // Set locality group options return SetLocalityGroupOptions(set_options.options(), editor); })); } else { return error::AlterLocalityGroupWithoutOptions(); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AddCheckConstraint( const ddl::CheckConstraint& ddl_check_constraint, const Table* table) { return AlterNode<Table>(table, [&](Table::Editor* editor) -> absl::Status { return CreateCheckConstraint(ddl_check_constraint, table, /*ddl_create_table=*/nullptr); }); } absl::Status SchemaUpdaterImpl::AddForeignKey( const ddl::ForeignKey& ddl_foreign_key, const Table* table) { return AlterNode<Table>(table, [&](Table::Editor* editor) -> absl::Status { return CreateForeignKeyConstraint(ddl_foreign_key, table); }); } absl::Status SchemaUpdaterImpl::DropConstraint( const std::string& constraint_name, const Table* table) { // Try each type of constraint supported by ALTER TABLE DROP CONSTRAINT. const ForeignKey* foreign_key = table->FindForeignKey(constraint_name); if (foreign_key != nullptr) { return DropNode(foreign_key); } const CheckConstraint* check_constraint = table->FindCheckConstraint(constraint_name); if (check_constraint != nullptr) { return DropNode(check_constraint); } return error::ConstraintNotFound(constraint_name, table->Name()); } absl::Status SchemaUpdaterImpl::DropTable(const ddl::DropTable& drop_table) { const Table* table = latest_schema_->FindTableCaseSensitive(drop_table.table_name()); if (table == nullptr) { if (drop_table.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } return error::TableNotFound(drop_table.table_name()); } if (!table->change_streams_explicitly_tracking_table().empty()) { absl::Span<const ChangeStream* const> change_streams_explicitly_tracking_table = table->change_streams_explicitly_tracking_table(); std::string change_stream_names; for (auto change_stream = change_streams_explicitly_tracking_table.begin(); change_stream != change_streams_explicitly_tracking_table.end(); ++change_stream) { if (change_stream != change_streams_explicitly_tracking_table.begin()) { change_stream_names += ","; } change_stream_names += (*change_stream)->Name(); } return error::DropTableWithChangeStream( table->Name(), change_streams_explicitly_tracking_table.size(), change_stream_names); } if (table->locality_group() != nullptr) { ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>( table->locality_group(), [](LocalityGroup::Editor* editor) -> absl::Status { editor->decrement_use_count(); return absl::OkStatus(); })); } // TODO : Error if any view depends on this table. absl::Status status = DropNode(table); if (status.ok()) { // Drop dependent sequences. for (const Column* column : table->columns()) { if (column->is_identity_column()) { ZETASQL_RET_CHECK(column->sequences_used().size() == 1); const Sequence* sequence = static_cast<const Sequence*>(column->sequences_used().at(0)); ZETASQL_RETURN_IF_ERROR(DropSequence(sequence)); } if (column->locality_group() != nullptr) { ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>( column->locality_group(), [](LocalityGroup::Editor* editor) -> absl::Status { editor->decrement_use_count(); return absl::OkStatus(); })); } } } return status; } absl::Status SchemaUpdaterImpl::DropIndex(const ddl::DropIndex& drop_index) { const Index* index = latest_schema_->FindIndexCaseSensitive(drop_index.index_name()); if (index == nullptr) { if (drop_index.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } return error::IndexNotFound(drop_index.index_name()); } if (index->locality_group() != nullptr) { ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>( index->locality_group(), [](LocalityGroup::Editor* editor) -> absl::Status { editor->decrement_use_count(); return absl::OkStatus(); })); } return DropNode(index); } absl::Status SchemaUpdaterImpl::DropSearchIndex( const ddl::DropSearchIndex& drop_search_index) { const Index* index = latest_schema_->FindIndexCaseSensitive(drop_search_index.index_name()); if (index == nullptr) { if (drop_search_index.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } return error::IndexNotFound(drop_search_index.index_name()); } if (!index->is_search_index()) { return error::IndexNotFound(drop_search_index.index_name()); } return DropNode(index); } absl::Status SchemaUpdaterImpl::DropVectorIndex( const ddl::DropVectorIndex& drop_vector_index) { const Index* index = latest_schema_->FindIndexCaseSensitive(drop_vector_index.index_name()); if (index == nullptr) { if (drop_vector_index.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } return error::IndexNotFound(drop_vector_index.index_name()); } if (!index->is_vector_index()) { return error::IndexNotFound(drop_vector_index.index_name()); } return DropNode(index); } absl::Status SchemaUpdaterImpl::DropChangeStream( const ddl::DropChangeStream& drop_change_stream) { const ChangeStream* change_stream = latest_schema_->FindChangeStream(drop_change_stream.change_stream_name()); if (change_stream == nullptr) { return error::ChangeStreamNotFound(drop_change_stream.change_stream_name()); } global_names_.RemoveName(change_stream->tvf_name()); return DropNode(change_stream); } absl::Status SchemaUpdaterImpl::DropSequence(const Sequence* drop_sequence) { global_names_.RemoveName(drop_sequence->Name()); drop_sequence->RemoveSequenceFromLastValuesMap(); ZETASQL_RETURN_IF_ERROR(DropNode(drop_sequence)); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::DropNamedSchema( const ddl::DropSchema& drop_schema) { const NamedSchema* current_named_schema = latest_schema_->FindNamedSchema(drop_schema.schema_name()); if (current_named_schema == nullptr) { if (drop_schema.if_exists() == true) { return absl::OkStatus(); } return error::NamedSchemaNotFound(drop_schema.schema_name()); } global_names_.RemoveName(drop_schema.schema_name()); return DropNode(current_named_schema); } absl::Status SchemaUpdaterImpl::DropLocalityGroup( const ddl::DropLocalityGroup& drop_locality_group) { if (drop_locality_group.locality_group_name() == ddl::kDefaultLocalityGroupName) { return error::DroppingDefaultLocalityGroup(); } const LocalityGroup* locality_group = latest_schema_->FindLocalityGroup( drop_locality_group.locality_group_name()); if (locality_group == nullptr) { if (drop_locality_group.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } return error::LocalityGroupNotFound( drop_locality_group.locality_group_name()); } if (locality_group->use_count() > 0) { return error::DroppingLocalityGroupWithAssignedTableColumn( locality_group->Name()); } return DropNode(locality_group); } absl::Status SchemaUpdaterImpl::ApplyImplSetColumnOptions( const ddl::SetColumnOptions& set_column_options, const database_api::DatabaseDialect& dialect) { for (const ddl::SetColumnOptions::ColumnPath& path : set_column_options.column_path()) { const Table* table = latest_schema_->FindTableCaseSensitive(path.table_name()); if (table == nullptr) { return error::TableNotFound(path.table_name()); } const Column* column = table->FindColumn(path.column_name()); if (column == nullptr) { return error::ColumnNotFound(path.table_name(), path.column_name()); } ZETASQL_RETURN_IF_ERROR( AlterNode<Column>(column, [this, &set_column_options, dialect](Column::Editor* editor) -> absl::Status { return SetColumnOptions( set_column_options.options(), dialect, editor); })); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::DropFunction( const ddl::DropFunction& drop_function) { const SchemaNode* node = nullptr; if (drop_function.function_kind() == ddl::Function::VIEW) { node = latest_schema_->FindViewCaseSensitive(drop_function.function_name()); if (node == nullptr) { if (drop_function.has_existence_modifier() && drop_function.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } return error::ViewNotFound(drop_function.function_name()); } } else if (drop_function.function_kind() == ddl::Function::FUNCTION) { node = latest_schema_->FindUdfCaseSensitive(drop_function.function_name()); if (node == nullptr) { if (drop_function.has_existence_modifier() && drop_function.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } return error::FunctionNotFound(drop_function.function_name()); } } return DropNode(node); } absl::StatusOr<Model::ModelColumn> SchemaUpdaterImpl::CreateModelColumn( const ddl::ColumnDefinition& ddl_column, const Model* model, const ddl::CreateModel* ddl_model, const database_api::DatabaseDialect& dialect) { const std::string& column_name = ddl_column.column_name(); ZETASQL_ASSIGN_OR_RETURN( const zetasql::Type* column_type, DDLColumnTypeToGoogleSqlType(ddl_column, type_factory_, latest_schema_->proto_bundle().get())); if (ddl_column.not_null()) { return error::ModelColumnNotNull(model->Name(), column_name); } if (ddl_column.hidden()) { return error::ModelColumnHidden(model->Name(), column_name); } if (ddl_column.has_length()) { return error::ModelColumnLength(model->Name(), column_name); } if (ddl_column.has_generated_column()) { return error::ModelColumnGenerated(model->Name(), column_name); } if (ddl_column.has_column_default()) { return error::ModelColumnDefault(model->Name(), column_name); } std::optional<bool> is_required; for (const auto& option : ddl_column.set_options()) { if (option.option_name() == ddl::kModelColumnRequiredOptionName) { if (option.has_null_value()) { is_required = std::nullopt; } else { ZETASQL_RET_CHECK(option.has_bool_value()) << "Option " << ddl::kModelColumnRequiredOptionName << " can only take bool value"; is_required = option.bool_value(); } } else { ZETASQL_RET_CHECK_FAIL() << "Invalid column option: " << option.option_name(); } } return Model::ModelColumn{ .name = column_name, .type = column_type, // Auto discovery doesn't work in Emulator. .is_explicit = true, .is_required = is_required, }; } absl::StatusOr<std::unique_ptr<const zetasql::AnalyzerOutput>> SchemaUpdaterImpl::AnalyzeCreatePropertyGraph( const ddl::CreatePropertyGraph& ddl_create_property_graph, const zetasql::AnalyzerOptions& analyzer_options, Catalog* catalog) { std::unique_ptr<const zetasql::AnalyzerOutput> analyzer_output; // Delegate to ZetaSQL for DDL parsing and analysis. ZETASQL_RETURN_IF_ERROR(zetasql::AnalyzeStatement( ddl_create_property_graph.ddl_body(), analyzer_options, catalog, type_factory_, &analyzer_output)); // Confirm that the analyzed statement is a valid CREATE PROPERTY GRAPH // statement. const zetasql::ResolvedStatement* stmt = analyzer_output->resolved_statement(); ZETASQL_RET_CHECK(stmt->Is<zetasql::ResolvedCreatePropertyGraphStmt>()) << "Failed to analyze DDL. Expects a CREATE [OR REPLACE] PROPERTY " "GRAPH statement, actual is: " << stmt->DebugString(); return analyzer_output; } std::string SchemaUpdaterImpl::GetTimeZone() const { std::string time_zone = kDefaultTimeZone; if (latest_schema_ != nullptr) { time_zone = latest_schema_->default_time_zone(); } return time_zone; } absl::Status SchemaUpdaterImpl::CreatePropertyGraph( const ddl::CreatePropertyGraph& ddl_create_property_graph, const database_api::DatabaseDialect& dialect) { const PropertyGraph* existing_graph = latest_schema_->FindPropertyGraph(ddl_create_property_graph.name()); bool replace = false; if (existing_graph && ddl_create_property_graph.existence_modifier() == ddl::IF_NOT_EXISTS) { return absl::OkStatus(); } else if (existing_graph && ddl_create_property_graph.existence_modifier() == ddl::OR_REPLACE) { replace = true; } else { if (latest_schema_->property_graphs().size() >= limits::kMaxPropertyGraphsPerDatabase) { return error::TooManyPropertyGraphsPerDatabase( ddl_create_property_graph.name(), limits::kMaxPropertyGraphsPerDatabase); } ZETASQL_RETURN_IF_ERROR(global_names_.AddName("PropertyGraph", ddl_create_property_graph.name())); } zetasql::AnalyzerOptions analyzer_options = MakeGoogleSqlAnalyzerOptions(GetTimeZone()); analyzer_options.mutable_language()->set_name_resolution_mode( zetasql::NAME_RESOLUTION_DEFAULT); analyzer_options.mutable_language()->EnableLanguageFeature( zetasql::FEATURE_V_1_4_SQL_GRAPH); analyzer_options.mutable_language()->AddSupportedStatementKind( zetasql::RESOLVED_CREATE_PROPERTY_GRAPH_STMT); FunctionCatalog function_catalog(type_factory_); function_catalog.SetLatestSchema(latest_schema_); // Create a catalog based on 'latest_schema_'for property graph DDL parsing // and analysis. PreparePropertyGraphCatalog catalog(latest_schema_, &function_catalog, type_factory_, analyzer_options); ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<const zetasql::AnalyzerOutput> analyzer_output, AnalyzeCreatePropertyGraph(ddl_create_property_graph, analyzer_options, &catalog)); const zetasql::ResolvedCreatePropertyGraphStmt* graph_stmt = analyzer_output->resolved_statement() ->GetAs<zetasql::ResolvedCreatePropertyGraphStmt>(); PropertyGraph::Builder property_graph_builder; ZETASQL_RETURN_IF_ERROR(PopulatePropertyGraph(ddl_create_property_graph, graph_stmt, &property_graph_builder)); const PropertyGraph* graph = property_graph_builder.get(); if (replace) { return AlterNode<PropertyGraph>( existing_graph, [&](PropertyGraph::Editor* editor) -> absl::Status { editor->copy_from(graph); return absl::OkStatus(); }); } else { return AddNode(property_graph_builder.build()); } } absl::StatusOr<std::string> GetColumnNameFromExpr( const zetasql::ResolvedExpr* expr) { if (expr->Is<zetasql::ResolvedColumnRef>()) { return expr->GetAs<zetasql::ResolvedColumnRef>()->column().name(); } if (expr->Is<zetasql::ResolvedCatalogColumnRef>()) { return expr->GetAs<zetasql::ResolvedCatalogColumnRef>()->column()->Name(); } return absl::InvalidArgumentError( absl::StrCat("Expected ResolvedColumnRef or ResolvedCatalogColumnRef." " Found ", expr->DebugString())); } absl::Status SetNodeReference( const zetasql::ResolvedGraphNodeTableReference* node_reference, bool is_source, PropertyGraph::GraphElementTable* new_element_table) { std::vector<std::string> node_table_column_names; std::vector<std::string> edge_table_column_names; for (int i = 0; i < node_reference->edge_table_column_list_size(); ++i) { ZETASQL_ASSIGN_OR_RETURN( std::string edge_column_name, GetColumnNameFromExpr(node_reference->edge_table_column_list(i))); ZETASQL_ASSIGN_OR_RETURN( std::string node_column_name, GetColumnNameFromExpr(node_reference->node_table_column_list(i))); edge_table_column_names.push_back(std::move(edge_column_name)); node_table_column_names.push_back(std::move(node_column_name)); } if (is_source) { new_element_table->set_source_node_reference( node_reference->node_table_identifier(), node_table_column_names, edge_table_column_names); } else { new_element_table->set_target_node_reference( node_reference->node_table_identifier(), node_table_column_names, edge_table_column_names); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::AddGraphElementTable( const zetasql::ResolvedGraphElementTable* element, bool is_node, PropertyGraph::Builder* graph_builder) { const zetasql::ResolvedTableScan* table_scan = element->input_scan()->GetAs<zetasql::ResolvedTableScan>(); PropertyGraph::GraphElementTable new_element_table; new_element_table.set_name(table_scan->table()->Name()); new_element_table.set_alias(element->alias()); for (const auto& key : element->key_list()) { ZETASQL_ASSIGN_OR_RETURN(std::string key_column_name, GetColumnNameFromExpr(key.get())); new_element_table.add_key_clause_column(std::move(key_column_name)); } for (const auto& label : element->label_name_list()) { new_element_table.add_label_name(label); } for (const auto& property_definition : element->property_definition_list()) { new_element_table.add_property_definition( property_definition->property_declaration_name(), property_definition->sql()); } if (is_node) { new_element_table.set_element_kind(PropertyGraph::GraphElementKind::NODE); graph_builder->add_node_table(new_element_table); return absl::OkStatus(); } new_element_table.set_element_kind(PropertyGraph::GraphElementKind::EDGE); ZETASQL_RETURN_IF_ERROR(SetNodeReference(element->source_node_reference(), /*is_source=*/true, &new_element_table)); ZETASQL_RETURN_IF_ERROR(SetNodeReference(element->dest_node_reference(), /*is_source=*/false, &new_element_table)); graph_builder->add_edge_table(new_element_table); return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::PopulatePropertyGraph( const ddl::CreatePropertyGraph& ddl_create_property_graph, const zetasql::ResolvedCreatePropertyGraphStmt* const graph_stmt, PropertyGraph::Builder* graph_builder) { // Property graph name and ddl body. graph_builder->set_name(ddl_create_property_graph.name()); graph_builder->set_ddl_body(ddl_create_property_graph.ddl_body()); // Property declarations. for (const auto& property : graph_stmt->property_declaration_list()) { PropertyGraph::PropertyDeclaration property_declaration; property_declaration.name = property->name(); property_declaration.type = property->type()->TypeName( zetasql::PRODUCT_EXTERNAL, /*use_external_float32_unused=*/true); graph_builder->add_property_declaration(property_declaration); } // Labels. for (const auto& label : graph_stmt->label_list()) { PropertyGraph::Label label_declaration; label_declaration.name = label->name(); for (const auto& property_declaration_name : label->property_declaration_name_list()) { label_declaration.property_names.insert(property_declaration_name); } graph_builder->add_label(label_declaration); } // Node tables. for (const auto& node_table : graph_stmt->node_table_list()) { ZETASQL_RETURN_IF_ERROR(AddGraphElementTable(node_table.get(), /*is_node=*/true, graph_builder)); } // Edge tables. for (const auto& edge_table : graph_stmt->edge_table_list()) { ZETASQL_RETURN_IF_ERROR(AddGraphElementTable(edge_table.get(), /*is_node=*/false, graph_builder)); } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::CreateModel( const ddl::CreateModel& ddl_model, const database_api::DatabaseDialect& dialect) { const Model* existing_model = latest_schema_->FindModel(ddl_model.model_name()); bool replace = false; if (existing_model && ddl_model.existence_modifier() == ddl::IF_NOT_EXISTS) { return absl::OkStatus(); } else if (existing_model && ddl_model.existence_modifier() == ddl::OR_REPLACE) { replace = true; } else { if (latest_schema_->models().size() >= limits::kMaxModelsPerDatabase) { return error::TooManyModelsPerDatabase(ddl_model.model_name(), limits::kMaxModelsPerDatabase); } ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Model", ddl_model.model_name())); } Model::Builder builder; builder.set_name(ddl_model.model_name()); for (const ddl::ColumnDefinition& input_column : ddl_model.input()) { ZETASQL_ASSIGN_OR_RETURN( Model::ModelColumn column, CreateModelColumn(input_column, builder.get(), &ddl_model, dialect)); builder.add_input_column(column); } for (const ddl::ColumnDefinition& output_column : ddl_model.output()) { ZETASQL_ASSIGN_OR_RETURN( Model::ModelColumn column, CreateModelColumn(output_column, builder.get(), &ddl_model, dialect)); builder.add_output_column(column); } builder.set_remote(ddl_model.remote()); ZETASQL_RETURN_IF_ERROR(SetModelOptions(ddl_model.set_options(), &builder)); const Model* model = builder.get(); if (replace) { return AlterNode<Model>(existing_model, [&](Model::Editor* editor) -> absl::Status { editor->copy_from(model); return absl::OkStatus(); }); } else { return AddNode(builder.build()); } } absl::Status SchemaUpdaterImpl::AlterModel(const ddl::AlterModel& alter_model) { const Model* model = latest_schema_->FindModel(alter_model.model_name()); if (model == nullptr) { if (alter_model.if_exists()) { return absl::OkStatus(); } return error::ModelNotFound(alter_model.model_name()); } const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& options = alter_model.set_options().options(); ZETASQL_RETURN_IF_ERROR(AlterNode<Model>( model, [this, options](Model::Editor* editor) -> absl::Status { return SetModelOptions(options, editor); })); return absl::OkStatus(); } template <typename ModelModifier> absl::Status SchemaUpdaterImpl::SetModelOptions( const ::google::protobuf::RepeatedPtrField<ddl::SetOption>& set_options, ModelModifier* modifier) { for (const ddl::SetOption& option : set_options) { if (option.option_name() == ddl::kModelDefaultBatchSizeOptionName) { if (option.has_null_value()) { modifier->set_default_batch_size(std::nullopt); } else { ZETASQL_RET_CHECK(option.has_int64_value()) << "Option " << ddl::kModelDefaultBatchSizeOptionName << " can only take int64_t value"; modifier->set_default_batch_size(option.int64_value()); } } else if (option.option_name() == ddl::kModelEndpointOptionName) { if (option.has_null_value()) { modifier->set_endpoint(std::nullopt); } else { ZETASQL_RET_CHECK(option.has_string_value()) << "Option " << ddl::kModelEndpointOptionName << " can only take string value"; modifier->set_endpoint(option.string_value()); } } else if (option.option_name() == ddl::kModelEndpointsOptionName) { if (option.has_null_value()) { modifier->set_endpoints({}); } else { ZETASQL_RET_CHECK(!option.string_list_value().empty()) << "Option " << ddl::kModelEndpointsOptionName << " can only take string list value"; modifier->set_endpoints({option.string_list_value().begin(), option.string_list_value().end()}); } } else { ZETASQL_RET_CHECK_FAIL() << "Invalid column option: " << option.option_name(); } } return absl::OkStatus(); } absl::Status SchemaUpdaterImpl::DropModel(const ddl::DropModel& drop_model) { const Model* model = latest_schema_->FindModel(drop_model.model_name()); if (model == nullptr) { if (drop_model.if_exists()) { return absl::OkStatus(); } return error::ModelNotFound(drop_model.model_name()); } return DropNode(model); } absl::Status SchemaUpdaterImpl::DropPropertyGraph( const ddl::DropPropertyGraph& ddl_drop_property_graph) { const PropertyGraph* graph = latest_schema_->FindPropertyGraph(ddl_drop_property_graph.name()); if (graph == nullptr) { if (ddl_drop_property_graph.existence_modifier() == ddl::IF_EXISTS) { return absl::OkStatus(); } return error::PropertyGraphNotFound(ddl_drop_property_graph.name()); } return DropNode(graph); } absl::StatusOr<std::shared_ptr<const ProtoBundle>> SchemaUpdaterImpl::DropProtoBundle() { if (latest_schema_->proto_bundle()->empty()) { return absl::FailedPreconditionError( "Proto bundle does not yet exist; cannot drop it"); } return ProtoBundle::CreateEmpty(); } const Schema* EmptySchema(std::string_view database_id, database_api::DatabaseDialect dialect) { if (dialect == database_api::DatabaseDialect::POSTGRESQL) { static const Schema* empty_pg_schema = new Schema(SchemaGraph::CreateEmpty(), ProtoBundle::CreateEmpty(), dialect, database_id); return empty_pg_schema; } static const Schema* empty_gsql_schema = new Schema; return empty_gsql_schema; } } // namespace absl::StatusOr<std::unique_ptr<const Schema>> SchemaUpdater::ValidateSchemaFromDDL( const SchemaChangeOperation& schema_change_operation, const SchemaChangeContext& context, const Schema* existing_schema) { if (existing_schema == nullptr) { existing_schema = EmptySchema(context.database_id, schema_change_operation.database_dialect); } ZETASQL_ASSIGN_OR_RETURN(SchemaUpdaterImpl updater, SchemaUpdaterImpl::Build( context.type_factory, context.table_id_generator, context.column_id_generator, context.storage, context.schema_change_timestamp, context.pg_oid_assigner, existing_schema, context.database_id)); context.pg_oid_assigner->BeginAssignment(); ZETASQL_ASSIGN_OR_RETURN(pending_work_, updater.ApplyDDLStatements(schema_change_operation)); intermediate_schemas_ = updater.GetIntermediateSchemas(); std::unique_ptr<const Schema> new_schema = nullptr; if (!intermediate_schemas_.empty()) { new_schema = std::move(*intermediate_schemas_.rbegin()); ZETASQL_RETURN_IF_ERROR(context.pg_oid_assigner->EndAssignment()); } pending_work_.clear(); intermediate_schemas_.clear(); return new_schema; } // TODO : These should run in a ReadWriteTransaction with rollback // capability so that changes to the database can be reversed. absl::Status SchemaUpdater::RunPendingActions(int* num_succesful) { for (const auto& pending_statement : pending_work_) { ZETASQL_RETURN_IF_ERROR(pending_statement.RunSchemaChangeActions()); ++(*num_succesful); } return absl::OkStatus(); } absl::StatusOr<SchemaChangeResult> SchemaUpdater::UpdateSchemaFromDDL( const Schema* existing_schema, const SchemaChangeOperation& schema_change_operation, const SchemaChangeContext& context) { ZETASQL_ASSIGN_OR_RETURN(SchemaUpdaterImpl updater, SchemaUpdaterImpl::Build( context.type_factory, context.table_id_generator, context.column_id_generator, context.storage, context.schema_change_timestamp, context.pg_oid_assigner, existing_schema, context.database_id)); context.pg_oid_assigner->BeginAssignment(); ZETASQL_ASSIGN_OR_RETURN(pending_work_, updater.ApplyDDLStatements(schema_change_operation)); intermediate_schemas_ = updater.GetIntermediateSchemas(); // Use the schema snapshot for the last succesful statement. int num_successful = 0; std::unique_ptr<const Schema> new_schema = nullptr; absl::Status backfill_status = RunPendingActions(&num_successful); if (num_successful > 0) { new_schema = std::move(intermediate_schemas_[num_successful - 1]); ZETASQL_RETURN_IF_ERROR(context.pg_oid_assigner->EndAssignmentAtIntermediateSchema( num_successful - 1)); } ZETASQL_RET_CHECK_LE(num_successful, intermediate_schemas_.size()); return SchemaChangeResult{ .num_successful_statements = num_successful, .updated_schema = std::move(new_schema), .backfill_status = backfill_status, }; } absl::StatusOr<std::unique_ptr<const Schema>> SchemaUpdater::CreateSchemaFromDDL( const SchemaChangeOperation& schema_change_operation, const SchemaChangeContext& context) { ZETASQL_ASSIGN_OR_RETURN( SchemaChangeResult result, UpdateSchemaFromDDL(EmptySchema(context.database_id, schema_change_operation.database_dialect), schema_change_operation, context)); ZETASQL_RETURN_IF_ERROR(result.backfill_status); return std::move(result.updated_schema); } absl::StatusOr<std::unique_ptr<ddl::DDLStatement>> ParseDDLByDialect( absl::string_view statement, database_api::DatabaseDialect dialect) { if (dialect == database_api::DatabaseDialect::POSTGRESQL) { ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<postgres_translator::interfaces::PGArena> arena, postgres_translator::spangres::MemoryContextPGArena::Init(nullptr)); ZETASQL_ASSIGN_OR_RETURN( postgres_translator::interfaces::ParserOutput parser_output, postgres_translator::CheckedPgRawParserFullOutput( std::string(statement).c_str()), _ << "failed to parse the DDL statements."); ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr< postgres_translator::spangres::PostgreSQLToSpannerDDLTranslator> translator, postgres_translator::spangres:: CreatePostgreSQLToSpannerDDLTranslator()); const postgres_translator::spangres::TranslationOptions options{ .enable_nulls_ordering = true, .enable_generated_column = true, .enable_column_default = true, .enable_identity_column = EmulatorFeatureFlags::instance().flags().enable_identity_columns, .enable_default_sequence_kind = EmulatorFeatureFlags::instance().flags().enable_identity_columns, .enable_default_time_zone = EmulatorFeatureFlags::instance().flags().enable_default_time_zone, .enable_jsonb_type = true, .enable_array_jsonb_type = true, .enable_create_view = true, // This enables Spangres ddl translator to record the original // expression in PG. .enable_expression_string = true, .enable_if_not_exists = true, .enable_change_streams = true, .enable_change_streams_mod_type_filter_options = true, .enable_change_streams_ttl_deletes_filter_option = true, .enable_search_index = EmulatorFeatureFlags::instance().flags().enable_search_index, .enable_sequence = true, .enable_virtual_generated_column = true, .enable_hidden_column = EmulatorFeatureFlags::instance().flags().enable_hidden_column, .enable_serial_types = EmulatorFeatureFlags::instance() .flags() .enable_serial_auto_increment, }; ZETASQL_ASSIGN_OR_RETURN(ddl::DDLStatementList ddl_statement_list, translator->TranslateForEmulator(parser_output, options)); ZETASQL_RET_CHECK_EQ(ddl_statement_list.statement_size(), 1); return std::make_unique<ddl::DDLStatement>(ddl_statement_list.statement(0)); } else { auto ddl_statement = std::make_unique<ddl::DDLStatement>(); ZETASQL_RETURN_IF_ERROR(ddl::ParseDDLStatement(statement, ddl_statement.get())); return std::move(ddl_statement); } } } // namespace backend } // namespace emulator } // namespace spanner } // namespace google