backend/transaction/read_write_transaction.cc (518 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "backend/transaction/read_write_transaction.h" #include <functional> #include <memory> #include <optional> #include <queue> #include <string> #include <utility> #include <vector> #include "zetasql/public/value.h" #include "absl/container/flat_hash_map.h" #include "absl/log/check.h" #include "absl/log/log.h" #include "absl/random/random.h" #include "absl/random/uniform_int_distribution.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/cord.h" #include "absl/strings/str_cat.h" #include "absl/synchronization/mutex.h" #include "absl/time/time.h" #include "absl/types/span.h" #include "backend/access/read.h" #include "backend/access/write.h" #include "backend/actions/change_stream.h" #include "backend/actions/context.h" #include "backend/actions/manager.h" #include "backend/actions/ops.h" #include "backend/common/ids.h" #include "backend/common/rows.h" #include "backend/datamodel/key.h" #include "backend/datamodel/key_range.h" #include "backend/datamodel/value.h" #include "backend/locking/manager.h" #include "backend/schema/catalog/column.h" #include "backend/schema/catalog/foreign_key.h" #include "backend/schema/catalog/schema.h" #include "backend/schema/catalog/table.h" #include "backend/schema/catalog/versioned_catalog.h" #include "backend/storage/iterator.h" #include "backend/storage/storage.h" #include "backend/transaction/actions.h" #include "backend/transaction/commit_timestamp.h" #include "backend/transaction/flush.h" #include "backend/transaction/foreign_key_restrictions.h" #include "backend/transaction/options.h" #include "backend/transaction/resolve.h" #include "backend/transaction/row_cursor.h" #include "backend/transaction/transaction_store.h" #include "common/change_stream.h" #include "common/clock.h" #include "common/config.h" #include "common/constants.h" #include "common/errors.h" #include "third_party/spanner_pg/interface/pg_arena.h" #include "third_party/spanner_pg/shims/memory_context_pg_arena.h" #include "zetasql/base/ret_check.h" #include "zetasql/base/status_macros.h" namespace google { namespace spanner { namespace emulator { namespace backend { namespace { // Flattens delete mutation to one write op for each key being deleted. absl::StatusOr<std::vector<WriteOp>> FlattenDeleteOp( const Table* table, const std::vector<KeyRange>& key_ranges, const TransactionStore* transaction_store) { std::vector<WriteOp> write_ops; for (const KeyRange& key_range : key_ranges) { std::unique_ptr<StorageIterator> itr; ZETASQL_RETURN_IF_ERROR(transaction_store->Read(table, key_range, /*columns= */ {}, &itr)); while (itr->Next()) { write_ops.push_back(DeleteOp{table, itr->Key()}); } } return std::move(write_ops); } // Converts each MutationOp row to a WriteOp based on the MutationOpType: // - MutatioOpTyp::kReplace: converts to DeleteOp followed by InsertOp. // - MutationOpType::kInsertOrUpdate: if the row already exists, converts // to UpdateOp. Otherwise converts to InsertOp. // - MutationOpType::kInsert | kDelete | kUpdate: converts to // corresponding WriteOp of the same type. absl::StatusOr<std::vector<WriteOp>> FlattenNonDeleteOpRow( MutationOpType type, const Table* table, const std::vector<const Column*>& columns, const Key& key, const ValueList& row, const TransactionStore* transaction_store) { std::vector<WriteOp> write_ops; switch (type) { case MutationOpType::kInsert: { write_ops.push_back(InsertOp{table, key, columns, row}); break; } case MutationOpType::kUpdate: { write_ops.push_back(UpdateOp{table, key, columns, row}); break; } case MutationOpType::kInsertOrUpdate: { absl::StatusOr<ValueList> maybe_row = transaction_store->Lookup(table, key, /*columns= */ {}); if (maybe_row.ok()) { // Row exists and therefore we should only update. write_ops.push_back(UpdateOp{table, key, columns, row}); } else if (maybe_row.status().code() == absl::StatusCode::kNotFound) { write_ops.push_back(InsertOp{table, key, columns, row}); } else { return maybe_row.status(); } break; } case MutationOpType::kReplace: { write_ops.push_back(DeleteOp{table, key}); write_ops.push_back(InsertOp{table, key, columns, row}); break; } case MutationOpType::kDelete: { break; } } return std::move(write_ops); } bool ShouldAbortOnFirstCommit() { absl::BitGen gen; return config::fault_injection_enabled() && absl::uniform_int_distribution<int>(1, 100)(gen) <= 5; } RetryState MakeRetryState(const RetryState& retry_state, Clock* clock) { RetryState state = retry_state; state.priority = (retry_state.priority == 0 ? absl::ToUnixMicros(clock->Now()) : retry_state.priority); return state; } // Splits key_range into 2 parts by key with the key itself being removed. The // resulting key ranges are appended to range_list. Returns true if the key was // found within the given range, false otherwise. bool SplitKeyRangeAndAppend(const KeyRange& key_range, const Key& key, std::vector<KeyRange>* range_list) { if (!key_range.Contains(key)) { return false; } if (key == key_range.start_key() && key == key_range.limit_key()) { // Range only contains the one key. return true; } else if (key == key_range.start_key()) { range_list->push_back(KeyRange(EndpointType::kOpen, key, key_range.limit_type(), key_range.limit_key())); } else if (key == key_range.limit_key()) { range_list->push_back(KeyRange(key_range.start_type(), key_range.start_key(), EndpointType::kOpen, key)); } else { range_list->push_back(KeyRange(key_range.start_type(), key_range.start_key(), EndpointType::kOpen, key)); range_list->push_back(KeyRange(EndpointType::kOpen, key, key_range.limit_type(), key_range.limit_key())); } return true; } absl::StatusOr<bool> IsMutationInvolvingForeignKeyAction( const MutationOp& mutation_op, const Schema* schema) { const Table* table = schema->FindTable(mutation_op.table); if (IsChangeStreamPartitionTable(mutation_op.table)) { ZETASQL_ASSIGN_OR_RETURN(table, FindChangeStreamPartitionTable(schema, mutation_op.table)); } if (table == nullptr) { return error::TableNotFound(mutation_op.table); } for (const ForeignKey* foreign_key : table->referencing_foreign_keys()) { if (foreign_key->on_delete_action() == ForeignKey::Action::kCascade) { return true; } } return false; } } // namespace ReadWriteTransaction::ReadWriteTransaction( const ReadWriteOptions& options, const RetryState& retry_state, TransactionID transaction_id, Clock* clock, Storage* storage, LockManager* lock_manager, const VersionedCatalog* const versioned_catalog, ActionManager* action_manager) : options_(options), retry_state_(MakeRetryState(retry_state, clock)), id_(transaction_id), clock_(clock), base_storage_(storage), versioned_catalog_(versioned_catalog), lock_handle_(lock_manager->CreateHandle( transaction_id, [&]() -> absl::Status { return TryAbort(); }, retry_state_.priority)), commit_timestamp_tracker_(std::make_unique<CommitTimestampTracker>()), transaction_store_(std::make_unique<TransactionStore>( base_storage_, lock_handle_.get(), commit_timestamp_tracker_.get())), action_manager_(action_manager), action_context_(std::make_unique<ActionContext>( std::make_unique<TransactionReadOnlyStore>(transaction_store_.get()), std::make_unique<TransactionEffectsBuffer>(&write_ops_queue_), clock)), schema_(versioned_catalog_->GetLatestSchema()) {} absl::StatusOr<absl::Time> ReadWriteTransaction::GetCommitTimestamp() { absl::MutexLock lock(&mu_); if (state_ != State::kCommitted) { return error::Internal( absl::StrCat("Commit timestamp is only available after call to " "Transaction Commit. Transaction: ", id(), " is in state: ", state_)); } return commit_timestamp_; } absl::Status ReadWriteTransaction::Read(const ReadArg& read_arg, std::unique_ptr<RowCursor>* cursor) { return GuardedCall(OpType::kRead, [&]() -> absl::Status { mu_.AssertHeld(); ZETASQL_ASSIGN_OR_RETURN(const ResolvedReadArg& resolved_read_arg, ResolveReadArg(read_arg, schema_)); std::vector<std::unique_ptr<StorageIterator>> iterators; for (const auto& key_range : resolved_read_arg.key_ranges) { std::unique_ptr<StorageIterator> itr; ZETASQL_RETURN_IF_ERROR(transaction_store_->Read( resolved_read_arg.table, key_range, resolved_read_arg.columns, &itr, read_arg.allow_pending_commit_timestamps)); iterators.push_back(std::move(itr)); } *cursor = std::make_unique<StorageIteratorRowCursor>( std::move(iterators), resolved_read_arg.columns); return absl::OkStatus(); }); } absl::Status ReadWriteTransaction::ApplyValidators(const WriteOp& op) { return action_registry_->ExecuteValidators(action_context_.get(), op); } absl::Status ReadWriteTransaction::ApplyEffectors(const WriteOp& op) { return action_registry_->ExecuteEffectors(action_context_.get(), op); } absl::Status ReadWriteTransaction::ApplyStatementVerifiers() { for (const auto& write_op : transaction_store_->GetBufferedOps()) { ZETASQL_RETURN_IF_ERROR( action_registry_->ExecuteVerifiers(action_context_.get(), write_op)); } return absl::OkStatus(); } void ReadWriteTransaction::UpdateTrackedCommitTimestamps() { commit_timestamp_tracker_->Track(transaction_store_->GetBufferedOps()); } const Schema* ReadWriteTransaction::schema() const { absl::MutexLock lock(&mu_); if (state_ == State::kUninitialized) { return versioned_catalog_->GetLatestSchema(); } return schema_; } void ReadWriteTransaction::Reset() { mu_.AssertHeld(); lock_handle_->UnlockAll(); transaction_store_->Clear(); std::queue<WriteOp> empty; write_ops_queue_.swap(empty); state_ = State::kUninitialized; } absl::Status ReadWriteTransaction::GuardedCall( OpType op, const std::function<absl::Status()>& fn) { absl::MutexLock lock(&mu_); switch (state_) { case State::kRolledback: { return error::Internal(absl::StrCat( "Invalid call to Rolledback transaction. Transaction: ", id())); break; } case State::kInvalid: { if (op != OpType::kRollback) { return error::Internal(absl::StrCat( "Invalid call to Aborted transaction. Transaction: ", id())); } break; } case State::kCommitted: return error::Internal(absl::StrCat( "Invalid call to Committed transaction. Transaction: ", id())); case State::kAborted: { if (op != OpType::kRollback) { return error::WoundedTransaction(id_); } break; } case State::kUninitialized: { schema_ = versioned_catalog_->GetLatestSchema(); auto maybe_action_registry = action_manager_->GetActionsForSchema(schema_); if (!maybe_action_registry.ok()) { Reset(); return maybe_action_registry.status(); } action_registry_ = maybe_action_registry.value(); state_ = State::kActive; break; } case State::kActive: { if (schema_ != versioned_catalog_->GetLatestSchema()) { Reset(); ++retry_state_.abort_retry_count; return error::AbortDueToConcurrentSchemaChange(id_); } break; } } absl::Status status = fn(); if (!status.ok()) { if (status.code() == absl::StatusCode::kAborted) { // Reset the transaction and release the lock handle. Always reset the // transaction in the case of an abort error. Aborts do not invalidate the // transaction. Reset(); ++retry_state_.abort_retry_count; } else if (op != OpType::kRead) { // A failing read should never invalidate a transaction, but constraint // errors on other operation types will invalidate it. Reset(); status.SetPayload(kConstraintError, absl::Cord("")); } } return status; } absl::Status ReadWriteTransaction::ProcessWriteOps( const std::vector<WriteOp>& write_ops) { mu_.AssertHeld(); for (const auto& write_op : write_ops) { write_ops_queue_.push(write_op); } while (!write_ops_queue_.empty()) { WriteOp write_op = write_ops_queue_.front(); write_ops_queue_.pop(); // Process the operation. ZETASQL_RETURN_IF_ERROR(ApplyValidators(write_op)); ZETASQL_RETURN_IF_ERROR(ApplyEffectors(write_op)); // Apply to transaction store. ZETASQL_RETURN_IF_ERROR(transaction_store_->BufferWriteOp(write_op)); } return absl::OkStatus(); } absl::Status ReadWriteTransaction::ProcessChangeStreamWriteOps() { mu_.AssertHeld(); ZETASQL_ASSIGN_OR_RETURN( auto write_ops, BuildChangeStreamWriteOps(schema_, transaction_store_->GetBufferedOps(), action_context_->store(), id_)); for (const WriteOp& writeop : write_ops) { ZETASQL_RETURN_IF_ERROR(transaction_store_->BufferWriteOp(writeop)); } return absl::OkStatus(); } absl::StatusOr<ResolvedMutationOp> ReadWriteTransaction::ResolveNonDeleteMutationOp(const MutationOp& mutation_op, const Schema* schema) { ZETASQL_RET_CHECK(mutation_op.type != MutationOpType::kDelete); const Table* table = schema->FindTable(mutation_op.table); if (IsChangeStreamPartitionTable(mutation_op.table)) { ZETASQL_ASSIGN_OR_RETURN(table, FindChangeStreamPartitionTable(schema, mutation_op.table)); } if (table == nullptr) { return error::TableNotFound(mutation_op.table); } ResolvedMutationOp resolved_mutation_op; resolved_mutation_op.table = table; resolved_mutation_op.type = mutation_op.type; // Vector of generated values for each row of operation. std::vector<std::vector<zetasql::Value>> generated_values( mutation_op.rows.size(), std::vector<zetasql::Value>()); std::vector<const Column*> columns_with_generated_values; // Compute values for default primary keys that don't appear in this // mutation op: ZETASQL_RETURN_IF_ERROR(action_registry_->ExecuteGeneratedKeyEffectors( mutation_op, &generated_values, &columns_with_generated_values)); ZETASQL_ASSIGN_OR_RETURN(std::vector<const Column*> columns, GetColumnsByName(table, mutation_op.columns)); // If we have key columns with generated default values, append them here: if (!columns_with_generated_values.empty()) { columns.insert(columns.end(), columns_with_generated_values.begin(), columns_with_generated_values.end()); } ZETASQL_ASSIGN_OR_RETURN(std::vector<std::optional<int>> key_indices, ExtractPrimaryKeyIndices(columns, table->primary_key())); for (int i = 0; i < mutation_op.rows.size(); i++) { const ValueList& row = mutation_op.rows[i]; ValueList new_row = row; // If we have key columns with generated/default values, append them here: new_row.insert(new_row.end(), generated_values[i].begin(), generated_values[i].end()); ZETASQL_RET_CHECK_EQ(new_row.size(), columns.size()) << "MutationOp has difference in size of column and value vectors, " "mutation op: " << mutation_op.DebugString(); ZETASQL_ASSIGN_OR_RETURN(resolved_mutation_op.rows.emplace_back(), MaybeSetCommitTimestampSentinel(columns, new_row)); resolved_mutation_op.keys.push_back(ComputeKey( resolved_mutation_op.rows.back(), table->primary_key(), key_indices)); } resolved_mutation_op.columns = std::move(columns); return resolved_mutation_op; } absl::Status ReadWriteTransaction::Write(const Mutation& mutation) { return GuardedCall(OpType::kWrite, [&]() -> absl::Status { mu_.AssertHeld(); ForeignKeyRestrictions fk_restrictions; // When writing, comparison may be required in the process. Comparison of // PG.NUMERIC calls PG to get comparison result (see function // ValueContentLess in datatypes/extended/pg_numeric_type.cc) and therefore // requires a PG arena. ZETASQL_VLOG(1) << "Creating memory context and Processing Write mutations"; ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<postgres_translator::interfaces::PGArena> arena, postgres_translator::spangres::MemoryContextPGArena::Init(nullptr)); for (const MutationOp& mutation_op : mutation.ops()) { ZETASQL_ASSIGN_OR_RETURN( bool has_delete_cascade_foreign_key, IsMutationInvolvingForeignKeyAction(mutation_op, schema_)); if (mutation_op.type == MutationOpType::kDelete) { // Process Delete. ZETASQL_ASSIGN_OR_RETURN( ResolvedMutationOp resolved_mutation_op, ResolveDeleteMutationOp(mutation_op, schema_, clock_->Now())); const std::string& table_name = resolved_mutation_op.table->Name(); if (has_delete_cascade_foreign_key) { ZETASQL_RETURN_IF_ERROR(fk_restrictions.ValidateReferencedDeleteMods( table_name, resolved_mutation_op.key_ranges)); } std::vector<KeyRange>& key_ranges = deleted_key_ranges_by_table_[table_name]; key_ranges.insert(key_ranges.end(), resolved_mutation_op.key_ranges.begin(), resolved_mutation_op.key_ranges.end()); ZETASQL_ASSIGN_OR_RETURN(std::vector<WriteOp> write_ops, FlattenDeleteOp(resolved_mutation_op.table, resolved_mutation_op.key_ranges, transaction_store_.get())); ZETASQL_RETURN_IF_ERROR(ProcessWriteOps(write_ops)); } else { // Process non-delete Mutation ops. ZETASQL_RETURN_IF_ERROR(ValidateNonDeleteMutationOp(mutation_op, schema_)); ZETASQL_ASSIGN_OR_RETURN(ResolvedMutationOp resolved_mutation_op, ResolveNonDeleteMutationOp(mutation_op, schema_)); const std::string& table_name = resolved_mutation_op.table->Name(); // Process Insert, Update, Replace and InsertOrUpdate. for (int i = 0; i < resolved_mutation_op.rows.size(); i++) { // Spanner allows deleted entries to be reinserted within the same // transaction, so we must update the deleted ranges list in this // case. if (resolved_mutation_op.type == MutationOpType::kInsert || resolved_mutation_op.type == MutationOpType::kInsertOrUpdate) { std::vector<KeyRange> split_key_ranges; auto& deleted_key_ranges = deleted_key_ranges_by_table_[table_name]; for (auto it = deleted_key_ranges.begin(); it != deleted_key_ranges.end();) { if (SplitKeyRangeAndAppend(*it, resolved_mutation_op.keys[i], &split_key_ranges)) { it = deleted_key_ranges.erase(it); } else { ++it; } } // Insert split key ranges back into the list of deleted key ranges. for (const KeyRange& key_range : split_key_ranges) { deleted_key_ranges.push_back(key_range); } } if (resolved_mutation_op.type == MutationOpType::kUpdate) { for (const KeyRange& key_range : deleted_key_ranges_by_table_[table_name]) { if (key_range.Contains(resolved_mutation_op.keys[i])) { return error::UpdateDeletedRowInTransaction( table_name, resolved_mutation_op.keys[i].DebugString()); } } } ZETASQL_ASSIGN_OR_RETURN( std::vector<WriteOp> write_ops, FlattenNonDeleteOpRow( resolved_mutation_op.type, resolved_mutation_op.table, resolved_mutation_op.columns, resolved_mutation_op.keys[i], resolved_mutation_op.rows[i], transaction_store_.get())); if (has_delete_cascade_foreign_key) { ZETASQL_RETURN_IF_ERROR(fk_restrictions.ValidateReferencedMods( write_ops, table_name, schema_)); } ZETASQL_RETURN_IF_ERROR(ProcessWriteOps(write_ops)); } } } ZETASQL_RETURN_IF_ERROR(ApplyStatementVerifiers()); // We defer all commit timestamp tracking to the end of the write to avoid // effector reads being rejected because of a previously written op in the // same call to ReadWriteTransaction::Write (e.g. updates to two rows in the // same table with an indexed commit timestamp column can be rejected due // to effector reads if the updates are split into separate Write calls, but // should succeed if written together). UpdateTrackedCommitTimestamps(); return absl::OkStatus(); }); } absl::Status ReadWriteTransaction::Commit() { return GuardedCall(OpType::kCommit, [&]() -> absl::Status { mu_.AssertHeld(); if (retry_state_.abort_retry_count == 0 && ShouldAbortOnFirstCommit()) { return error::AbortReadWriteTransactionOnFirstCommit(id_); } ZETASQL_RETURN_IF_ERROR(ProcessChangeStreamWriteOps()); // When committing, comparison may be required in the process. Comparison of // PG.NUMERIC calls PG to get comparison result (see function // ValueContentLess in datatypes/extended/pg_numeric_type.cc) and therefore // requires a PG arena. ZETASQL_VLOG(1) << "Creating memory context and Committing transaction"; ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<postgres_translator::interfaces::PGArena> arena, postgres_translator::spangres::MemoryContextPGArena::Init(nullptr)); // Pick a commit timestamp. ZETASQL_ASSIGN_OR_RETURN(commit_timestamp_, lock_handle_->ReserveCommitTimestamp()); // Write the mutations to the base storage. absl::Status flush_status = FlushWriteOpsToStorage( transaction_store_->GetBufferedOps(), base_storage_, commit_timestamp_); ZETASQL_RETURN_IF_ERROR(lock_handle_->MarkCommitted()); if (!flush_status.ok()) { return flush_status; } // Mark the transaction as committed. state_ = State::kCommitted; // Unlock all locks. lock_handle_->UnlockAll(); return absl::OkStatus(); }); } absl::Status ReadWriteTransaction::Rollback() { return GuardedCall(OpType::kRollback, [&]() -> absl::Status { mu_.AssertHeld(); // Reset the transaction state. This is done to release the locks. The // transaction cannot be reused. Reset(); // Mark the transaction as rolledback. state_ = State::kRolledback; return absl::OkStatus(); }); } absl::Status ReadWriteTransaction::TryAbort() { if (!mu_.TryLock()) { return error::CouldNotObtainTransactionMutex(id()); } if (state_ != State::kActive) { mu_.Unlock(); return error::TransactionClosed(id()); } // Reset the transaction and release the lock handle. // Aborts do not invalidate the transaction. // Reset(); ++retry_state_.abort_retry_count; // Mark the transaction as aborted. state_ = State::kAborted; mu_.Unlock(); return absl::OkStatus(); } absl::Status ReadWriteTransaction::Invalidate() { return GuardedCall(OpType::kInvalidate, [&]() -> absl::Status { mu_.AssertHeld(); // Reset the transaction state. This is done to release the locks. The // transaction cannot be reused. Reset(); // Mark the transaction as invalidated. state_ = State::kInvalid; return absl::OkStatus(); }); } } // namespace backend } // namespace emulator } // namespace spanner } // namespace google