backend/transaction/commit_timestamp.cc (237 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "backend/transaction/commit_timestamp.h" #include <queue> #include <variant> #include "absl/log/check.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" #include "absl/synchronization/mutex.h" #include "absl/types/span.h" #include "backend/actions/ops.h" #include "backend/datamodel/key.h" #include "backend/datamodel/value.h" #include "backend/schema/catalog/column.h" #include "backend/schema/catalog/table.h" #include "common/constants.h" #include "common/errors.h" namespace google { namespace spanner { namespace emulator { namespace backend { namespace { bool IsPendingCommitTimestampStringValue(zetasql::Value column_value) { return (!column_value.is_null() && column_value.type()->IsString() && column_value.string_value() == kCommitTimestampIdentifier); } bool IsPendingCommitTimestampSentinelValue(zetasql::Value column_value) { return (!column_value.is_null() && column_value.type()->IsTimestamp() && column_value.ToTime() == kCommitTimestampValueSentinel); } absl::Status ValidateCommitTimestampKeyForDeleteOp(const Table* table, const Key& key, absl::Time now) { const absl::Span<const KeyColumn* const> primary_key = table->primary_key(); for (int i = 0; i < key.NumColumns(); ++i) { const Column* column = primary_key.at(i)->column(); if (column->GetType()->IsTimestamp() && column->allows_commit_timestamp()) { ZETASQL_RETURN_IF_ERROR( ValidateCommitTimestampValueNotInFuture(key.ColumnValue(i), now)); } } return absl::OkStatus(); } absl::Status ValidateCommitTimestampEnabledInHeirarchy(const Column* column) { const Table* table = column->table(); // Validate that each of the parent table which contains this column as part // of a primary key has commit timestamp enabled. while (table->parent() != nullptr) { table = table->parent(); const KeyColumn* key_column = table->FindKeyColumn(column->Name()); if (key_column == nullptr) { break; } if (!key_column->column()->allows_commit_timestamp()) { return error::CommitTimestampOptionNotEnabled(column->FullName()); } } // If the given column is a key_column, check that all the children which // must contain the same key as part of their primary keys, also have // commit timestamp enabled. Perform the validation in a breadth first order. std::queue<const Table*> children_tables; children_tables.push(table); while (!children_tables.empty()) { const Table* child_table = children_tables.front(); children_tables.pop(); const KeyColumn* key_column = child_table->FindKeyColumn(column->Name()); if (key_column == nullptr) { // This shouldn't really happen, but would be caught by later action // framework. continue; } if (!key_column->column()->allows_commit_timestamp()) { return error::CommitTimestampOptionNotEnabled(column->FullName()); } for (const Table* child : child_table->children()) { children_tables.push(child); } } return absl::OkStatus(); } absl::StatusOr<zetasql::Value> MaybeSetCommitTimestampSentinel( const Column* column, zetasql::Value column_value) { if (!column->GetType()->IsTimestamp()) return column_value; if (column->allows_commit_timestamp()) { if (IsPendingCommitTimestampStringValue(column_value)) { ZETASQL_RETURN_IF_ERROR(ValidateCommitTimestampEnabledInHeirarchy(column)); return zetasql::values::Timestamp(kCommitTimestampValueSentinel); } else if (IsPendingCommitTimestampSentinelValue(column_value)) { return error::CommitTimestampInFuture(column_value.ToTime()); } } else if (IsPendingCommitTimestampStringValue(column_value)) { return error::CommitTimestampOptionNotEnabled(column->FullName()); } return column_value; } absl::StatusOr<Key> MaybeSetCommitTimestampSentinel( absl::Span<const KeyColumn* const> primary_key, Key key) { for (int i = 0; i < key.NumColumns(); i++) { ZETASQL_ASSIGN_OR_RETURN(zetasql::Value value, MaybeSetCommitTimestampSentinel(primary_key[i]->column(), key.ColumnValue(i))); key.SetColumnValue(i, value); } return key; } } // namespace absl::Status ValidateCommitTimestampValueNotInFuture( const zetasql::Value& value, absl::Time now) { if (!value.is_null() && value.type()->IsTimestamp() && value.ToTime() > now) { return error::CommitTimestampInFuture(value.ToTime()); } return absl::OkStatus(); } absl::Status ValidateCommitTimestampKeySetForDeleteOp(const Table* table, const KeySet& set, absl::Time now) { for (const Key& key : set.keys()) { ZETASQL_RETURN_IF_ERROR(ValidateCommitTimestampKeyForDeleteOp(table, key, now)); } for (const KeyRange& key_range : set.ranges()) { auto closed_open = key_range.ToClosedOpen(); if (closed_open.start_key() >= closed_open.limit_key()) { // No-op empty key ranges are ignored. continue; } ZETASQL_RETURN_IF_ERROR(ValidateCommitTimestampKeyForDeleteOp( table, key_range.start_key(), now)); ZETASQL_RETURN_IF_ERROR(ValidateCommitTimestampKeyForDeleteOp( table, key_range.limit_key(), now)); } return absl::OkStatus(); } absl::StatusOr<ValueList> MaybeSetCommitTimestampSentinel( absl::Span<const Column* const> columns, const ValueList& row) { if (row.empty()) return row; ValueList ret_val; for (int i = 0; i < row.size(); i++) { ZETASQL_ASSIGN_OR_RETURN(ret_val.emplace_back(), MaybeSetCommitTimestampSentinel(columns[i], row[i])); } return ret_val; } absl::StatusOr<KeyRange> MaybeSetCommitTimestampSentinel( absl::Span<const KeyColumn* const> primary_key, const KeyRange& key_range) { ZETASQL_RET_CHECK(key_range.IsClosedOpen()); if (key_range.start_key() >= key_range.limit_key()) { // Nothing to be done for empty key range. return key_range; } ZETASQL_ASSIGN_OR_RETURN(Key start_key, MaybeSetCommitTimestampSentinel( primary_key, key_range.start_key())); ZETASQL_ASSIGN_OR_RETURN(Key limit_key, MaybeSetCommitTimestampSentinel( primary_key, key_range.limit_key())); return KeyRange(key_range.start_type(), start_key, key_range.limit_type(), limit_key); } // Returns true if the given column value contains sentinel timestamp value // and column allows commit timestamp to be set automatically. Signals that // value should be replaced with commit timestamp of transaction during flush. bool IsPendingCommitTimestamp(const Column* column, const zetasql::Value& column_value) { if ((column->allows_commit_timestamp() || (column->source_column() && column->source_column()->allows_commit_timestamp())) && IsPendingCommitTimestampSentinelValue(column_value)) { return true; } return false; } bool HasPendingCommitTimestampInKey(const Table* table, const Key& key) { for (int i = 0; i < key.NumColumns(); i++) { if (IsPendingCommitTimestamp(table->primary_key()[i]->column(), key.ColumnValue(i))) { return true; } } return false; } zetasql::Value MaybeSetCommitTimestamp(const Column* column, const zetasql::Value& column_value, absl::Time commit_timestamp) { if (IsPendingCommitTimestamp(column, column_value)) { return zetasql::values::Timestamp(commit_timestamp); } return column_value; } Key MaybeSetCommitTimestamp(absl::Span<const KeyColumn* const> primary_key, Key key, absl::Time commit_timestamp) { for (int i = 0; i < key.NumColumns(); i++) { key.SetColumnValue( i, MaybeSetCommitTimestamp(primary_key[i]->column(), key.ColumnValue(i), commit_timestamp)); } return key; } absl::Status CommitTimestampTracker::CheckRead( const Table* table, absl::Span<const Column* const> columns) const { absl::MutexLock lock(&mu_); if (commit_ts_tables_.contains(table)) { return error::CannotReadPendingCommitTimestamp( absl::StrCat("Table ", table->Name())); } for (const auto column : columns) { if (commit_ts_columns_.contains(column) || (column->source_column() != nullptr && commit_ts_columns_.contains(column->source_column()))) { return error::CannotReadPendingCommitTimestamp( absl::StrCat("Column ", column->Name())); } } return absl::OkStatus(); } void CommitTimestampTracker::TrackColumns( absl::Span<const Column* const> columns, const ValueList& values) { ABSL_DCHECK_EQ(columns.size(), values.size()); for (int i = 0; i < columns.size(); ++i) { if (IsPendingCommitTimestamp(columns[i], values[i])) { commit_ts_columns_.insert(columns[i]); } } } void CommitTimestampTracker::TrackTable(const Table* table, const Key& key) { if (HasPendingCommitTimestampInKey(table, key)) { commit_ts_tables_.insert(table); // Any time a table has a pending commit-ts in key, include all its indexes. for (const Index* index : table->indexes()) { commit_ts_tables_.insert(index->index_data_table()); } } } void CommitTimestampTracker::Track(absl::Span<const WriteOp> write_ops) { absl::MutexLock lock(&mu_); for (auto& op : write_ops) { if (std::holds_alternative<InsertOp>(op)) { const InsertOp& insert = std::get<InsertOp>(op); TrackColumns(insert.columns, insert.values); TrackTable(insert.table, insert.key); } else if (std::holds_alternative<UpdateOp>(op)) { const UpdateOp& update = std::get<UpdateOp>(op); TrackColumns(update.columns, update.values); TrackTable(update.table, update.key); } else { const DeleteOp& delete_op = std::get<DeleteOp>(op); TrackTable(delete_op.table, delete_op.key); } } } } // namespace backend } // namespace emulator } // namespace spanner } // namespace google