backend/database/database.cc (152 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "backend/database/database.h" #include <memory> #include <thread> // NOLINT #include <utility> #include "google/spanner/admin/database/v1/common.pb.h" #include "zetasql/public/types/type_factory.h" #include "absl/functional/bind_front.h" #include "absl/memory/memory.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/time/time.h" #include "absl/types/variant.h" #include "backend/actions/manager.h" #include "backend/common/ids.h" #include "backend/database/change_stream/change_stream_partition_churner.h" #include "backend/database/pg_oid_assigner/pg_oid_assigner.h" #include "backend/locking/manager.h" #include "backend/query/query_engine.h" #include "backend/schema/catalog/proto_bundle.h" #include "backend/schema/catalog/schema.h" #include "backend/schema/catalog/versioned_catalog.h" #include "backend/schema/graph/schema_graph.h" #include "backend/schema/updater/schema_updater.h" #include "backend/schema/updater/scoped_schema_change_lock.h" #include "backend/storage/in_memory_storage.h" #include "backend/transaction/options.h" #include "backend/transaction/read_only_transaction.h" #include "backend/transaction/read_write_transaction.h" #include "common/clock.h" #include "common/errors.h" #include "absl/status/status.h" #include "zetasql/base/status_macros.h" namespace google { namespace spanner { namespace emulator { namespace backend { // TransactionIDGenerator is initialized to 1 because 0 is used as a sentinel // value for an invalid transaction. Database::Database() : transaction_id_generator_(1) {} absl::StatusOr<std::unique_ptr<Database>> Database::Create( Clock* clock, std::string_view database_id, const SchemaChangeOperation& schema_change_operation) { auto database = absl::WrapUnique(new Database()); database->clock_ = clock; database->database_id_ = database_id; database->storage_ = std::make_unique<InMemoryStorage>(); database->lock_manager_ = std::make_unique<LockManager>(clock); database->type_factory_ = std::make_unique<zetasql::TypeFactory>(); database->query_engine_ = std::make_unique<QueryEngine>(database->type_factory_.get()); database->action_manager_ = std::make_unique<ActionManager>(); database->dialect_ = schema_change_operation.database_dialect; database->pg_oid_assigner_ = std::make_unique<PgOidAssigner>( schema_change_operation.database_dialect == database_api::DatabaseDialect::POSTGRESQL); if (schema_change_operation.statements.empty()) { if (database->dialect_ == database_api::DatabaseDialect::POSTGRESQL) { // Create an empty schema with the dialect set. database->versioned_catalog_ = std::make_unique<VersionedCatalog>(std::make_unique<const Schema>( SchemaGraph::CreateEmpty(), ProtoBundle::CreateEmpty(), database->dialect_, database_id)); } else { database->versioned_catalog_ = std::make_unique<VersionedCatalog>(); } } else { SchemaUpdater updater; ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<const Schema> schema, updater.CreateSchemaFromDDL(schema_change_operation, database->GetSchemaChangeContext())); database->versioned_catalog_ = std::make_unique<VersionedCatalog>(std::move(schema)); } database->action_manager_->AddActionsForSchema( database->versioned_catalog_->GetLatestSchema(), database->query_engine_->function_catalog(), database->query_engine_->type_factory()); database->change_stream_partition_churner_ = std::make_unique<ChangeStreamPartitionChurner>( absl::bind_front(&Database::CreateReadWriteTransaction, database.get()), database->clock_); database->change_stream_partition_churner_->Update( database->versioned_catalog_->GetLatestSchema()); // Some functions need to access the schema (e.g. sequence functions), so // set the latest schema to the function catalog here. database->query_engine_->SetLatestSchemaForFunctionCatalog( database->versioned_catalog_->GetLatestSchema()); return database; } absl::StatusOr<std::unique_ptr<ReadOnlyTransaction>> Database::CreateReadOnlyTransaction(const ReadOnlyOptions& options) { return std::make_unique<ReadOnlyTransaction>( options, transaction_id_generator_.NextId(), clock_, storage_.get(), lock_manager_.get(), versioned_catalog_.get()); } absl::StatusOr<std::unique_ptr<ReadWriteTransaction>> Database::CreateReadWriteTransaction(const ReadWriteOptions& options, const RetryState& retry_state) { return std::make_unique<ReadWriteTransaction>( options, retry_state, transaction_id_generator_.NextId(), clock_, storage_.get(), lock_manager_.get(), versioned_catalog_.get(), action_manager_.get()); } SchemaChangeContext Database::GetSchemaChangeContext() { return SchemaChangeContext{ .type_factory = type_factory_.get(), .table_id_generator = &table_id_generator_, .column_id_generator = &column_id_generator_, .storage = storage_.get(), .pg_oid_assigner = pg_oid_assigner_.get(), .database_id = database_id_, }; } absl::Status Database::UpdateSchema( const SchemaChangeOperation& schema_change_operation, int* num_succesful_statements, absl::Time* commit_timestamp, absl::Status* backfill_status) { if (schema_change_operation.statements.empty()) { return error::UpdateDatabaseMissingStatements(); } // Make an exclusive lock request for the database. If there are any // concurrent transactions it will be denied and the operation aborted. ScopedSchemaChangeLock lock{transaction_id_generator_.NextId(), lock_manager_.get()}; ZETASQL_RETURN_IF_ERROR(lock.Wait()); // Reserve a commit timestamp for the schema changes. Even if the // schema change fails, it will result in a no-op commit that will // be invisible to other read-only/read-write transactions. ZETASQL_ASSIGN_OR_RETURN(auto update_timestamp, lock.ReserveCommitTimestamp()); auto context = GetSchemaChangeContext(); context.schema_change_timestamp = update_timestamp; const Schema* existing_schema = versioned_catalog_->GetLatestSchema(); SchemaUpdater updater; ZETASQL_ASSIGN_OR_RETURN(auto result, updater.UpdateSchemaFromDDL( existing_schema, schema_change_operation, context)); *commit_timestamp = update_timestamp; *num_succesful_statements = result.num_successful_statements; *backfill_status = result.backfill_status; // We update the schema even if the backfill status was not OK, the returned // schema will be the schema for the last valid statement before the statement // for which the backfill/verification failed. if (result.updated_schema != nullptr) { ZETASQL_RETURN_IF_ERROR(versioned_catalog_->AddSchema( update_timestamp, std::move(result.updated_schema))); action_manager_->AddActionsForSchema(versioned_catalog_->GetLatestSchema(), query_engine_->function_catalog(), query_engine_->type_factory()); } change_stream_partition_churner_->Update( versioned_catalog_->GetLatestSchema()); // Some functions need to access the schema (e.g. sequence functions), so // set the latest schema to the function catalog here. query_engine_->SetLatestSchemaForFunctionCatalog( versioned_catalog_->GetLatestSchema()); return absl::OkStatus(); } const Schema* Database::GetLatestSchema() const { return versioned_catalog_->GetLatestSchema(); } } // namespace backend } // namespace emulator } // namespace spanner } // namespace google