frontend/entities/session.cc (229 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "frontend/entities/session.h" #include <algorithm> #include <memory> #include <string> #include <utility> #include "google/spanner/v1/spanner.pb.h" #include "google/spanner/v1/transaction.pb.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" #include "absl/synchronization/mutex.h" #include "absl/time/time.h" #include "backend/common/ids.h" #include "backend/transaction/options.h" #include "backend/transaction/read_only_transaction.h" #include "backend/transaction/read_write_transaction.h" #include "common/errors.h" #include "common/limits.h" #include "frontend/common/protos.h" #include "frontend/converters/reads.h" #include "frontend/converters/time.h" #include "frontend/entities/transaction.h" #include "absl/status/status.h" #include "zetasql/base/status_macros.h" namespace google { namespace spanner { namespace emulator { namespace frontend { namespace spanner_api = ::google::spanner::v1; namespace { // Validate that read only options specify a concurrency mode that can be used // with multi use transactions. absl::Status ValidateReadOptionsForMultiUseTransaction( const spanner_api::TransactionOptions::ReadOnly& read_options) { using ReadOnly = spanner_api::TransactionOptions::ReadOnly; switch (read_options.timestamp_bound_case()) { case ReadOnly::kMinReadTimestamp: return error::InvalidReadOptionForMultiUseTransaction( "min_read_timestamp"); case ReadOnly::kMaxStaleness: return error::InvalidReadOptionForMultiUseTransaction("max_staleness"); default: return absl::OkStatus(); } } absl::Status ValidateMultiUseTransactionOptions( const spanner_api::TransactionOptions& options) { switch (options.mode_case()) { case v1::TransactionOptions::kReadOnly: return ValidateReadOptionsForMultiUseTransaction(options.read_only()); case v1::TransactionOptions::kReadWrite: case v1::TransactionOptions::kPartitionedDml: return absl::OkStatus(); case v1::TransactionOptions::MODE_NOT_SET: return error::MissingRequiredFieldError("TransactionOptions.mode"); } } absl::Status ValidateSingleUseTransactionOptions( const spanner_api::TransactionOptions& options) { switch (options.mode_case()) { case v1::TransactionOptions::kReadOnly: case v1::TransactionOptions::kReadWrite: return absl::OkStatus(); case v1::TransactionOptions::kPartitionedDml: return error::DmlDoesNotSupportSingleUseTransaction(); case v1::TransactionOptions::MODE_NOT_SET: return error::MissingRequiredFieldError("TransactionOptions.mode"); } } } // namespace absl::Status Session::ToProto(spanner_api::Session* session, bool include_labels) { absl::ReaderMutexLock lock(&mu_); session->set_name(session_uri_); if (include_labels) { session->mutable_labels()->insert(labels_.begin(), labels_.end()); } ZETASQL_ASSIGN_OR_RETURN(*session->mutable_create_time(), TimestampToProto(create_time_)); ZETASQL_ASSIGN_OR_RETURN(*session->mutable_approximate_last_use_time(), TimestampToProto(approximate_last_use_time_)); session->set_multiplexed(multiplexed_); return absl::OkStatus(); } backend::RetryState Session::MakeRetryState( const spanner_api::TransactionOptions& options, bool is_single_use_txn) { mu_.AssertHeld(); backend::RetryState retry_state; // Client libraries start a new transaction on the same session after // encountering an ABORT exception. Documentation: // https://cloud.google.com/spanner/docs/reference/rest/v1/TransactionOptions#retrying-aborted-transactions // Find if there was an ABORT status returned on the previous read-write // transaction for this session. Re-use the properties of backend read // write transaction to maintain transaction priority and abort retry // counts. if (options.has_read_write() && !is_single_use_txn && active_transaction_ != nullptr && active_transaction_->IsReadWrite() && active_transaction_->IsAborted()) { retry_state = active_transaction_->read_write()->retry_state(); } return retry_state; } absl::StatusOr<std::shared_ptr<Transaction>> Session::CreateMultiUseTransaction( const spanner_api::TransactionOptions& options, const TransactionActivation& activation) { ZETASQL_RETURN_IF_ERROR(ValidateMultiUseTransactionOptions(options)); absl::MutexLock lock(&mu_); // Move-convert unique pointer returned by CreateTransaction to shared pointer // since session will also hold a reference to multi-use transaction object // for future uses. ZETASQL_ASSIGN_OR_RETURN( std::shared_ptr<Transaction> txn, CreateTransaction(options, Transaction::Usage::kMultiUse, MakeRetryState(options, /*is_single_use_txn=*/false))); // Insert shared transaction object into transaction map. transaction_map_.emplace(txn->id(), txn); // Clear older transactions if too many transactions are tracked by session. while (transaction_map_.size() > limits::kMaxTransactionsPerSession) { transaction_map_.begin()->second->Close(); transaction_map_.erase(transaction_map_.begin()); } if (activation == TransactionActivation::kInitializeAndActivate) { // Assign this as the current active transaction. active_transaction_ = txn; // Remove transactions that came before this one. auto it = transaction_map_.find(txn->id()); for (auto prev_txn = transaction_map_.begin(); prev_txn != it; ++prev_txn) { prev_txn->second->Close(); } transaction_map_.erase(transaction_map_.begin(), it); } return txn; } absl::StatusOr<std::unique_ptr<Transaction>> Session::CreateSingleUseTransaction( const spanner_api::TransactionOptions& options) { ZETASQL_RETURN_IF_ERROR(ValidateSingleUseTransactionOptions(options)); absl::MutexLock lock(&mu_); return CreateTransaction(options, Transaction::Usage::kSingleUse, MakeRetryState(options, /*is_single_use_txn=*/true)); } absl::StatusOr<std::unique_ptr<Transaction>> Session::CreateTransaction( const spanner_api::TransactionOptions& options, const Transaction::Usage& usage, const backend::RetryState& retry_state) { if (options.mode_case() != v1::TransactionOptions::kReadWrite && options.isolation_level() == spanner_api::TransactionOptions::REPEATABLE_READ) { return error::RepeatableReadOnlySupportedInReadWriteTransactions(); } if (options.mode_case() == v1::TransactionOptions::kReadWrite && options.isolation_level() == spanner_api::TransactionOptions::REPEATABLE_READ && options.read_write().read_lock_mode() != spanner_api::TransactionOptions::ReadWrite:: READ_LOCK_MODE_UNSPECIFIED) { return error::ReadLockModeInRepeatableReadMustBeUnspecified(); } switch (options.mode_case()) { case v1::TransactionOptions::kReadOnly: return CreateReadOnly(options, usage); case v1::TransactionOptions::kReadWrite: case v1::TransactionOptions::kPartitionedDml: return CreateReadWrite(options, usage, retry_state); default: return error::Internal( "Unexpected TransactionOptions.mode for create transaction."); } } absl::StatusOr<std::unique_ptr<Transaction>> Session::CreateReadOnly( const spanner_api::TransactionOptions& options, const Transaction::Usage& usage) { // Populate read options. ZETASQL_ASSIGN_OR_RETURN(backend::ReadOnlyOptions read_only_options, ReadOnlyOptionsFromProto(options.read_only())); // Create a new backend read only transaction. ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<backend::ReadOnlyTransaction> read_only_transaction, database_->backend()->CreateReadOnlyTransaction(read_only_options)); return std::make_unique<Transaction>(std::move(read_only_transaction), database_->backend()->query_engine(), options, usage); } absl::StatusOr<std::unique_ptr<Transaction>> Session::CreateReadWrite( const spanner_api::TransactionOptions& options, const Transaction::Usage& usage, const backend::RetryState& retry_state) { // Create a new backend read write transaction. ZETASQL_ASSIGN_OR_RETURN( std::unique_ptr<backend::ReadWriteTransaction> read_write_transaction, database_->backend()->CreateReadWriteTransaction( backend::ReadWriteOptions(), retry_state)); return std::make_unique<Transaction>(std::move(read_write_transaction), database_->backend()->query_engine(), options, usage); } absl::StatusOr<std::shared_ptr<Transaction>> Session::FindAndUseTransaction( const std::string& bytes) { const backend::TransactionID& id = TransactionIDFromProto(bytes); absl::MutexLock lock(&mu_); if (id == backend::kInvalidTransactionID) { return error::InvalidTransactionID(backend::kInvalidTransactionID); } if (id < min_valid_id_) { return error::InvalidTransactionID(min_valid_id_); } min_valid_id_ = std::max(id, min_valid_id_); auto it = transaction_map_.find(id); if (it == transaction_map_.end()) { return error::TransactionNotFound(id); } if (it->second->IsClosed()) { return error::TransactionClosed(id); } active_transaction_ = it->second; // Remove transactions that came before this one. for (auto prev_txn = transaction_map_.begin(); prev_txn != it; ++prev_txn) { prev_txn->second->Close(); } transaction_map_.erase(transaction_map_.begin(), it); return active_transaction_; } absl::StatusOr<std::shared_ptr<Transaction>> Session::FindOrInitTransaction( const spanner_api::TransactionSelector& selector) { std::shared_ptr<Transaction> txn; switch (selector.selector_case()) { case spanner_api::TransactionSelector::SelectorCase::kBegin: { ZETASQL_ASSIGN_OR_RETURN(txn, CreateMultiUseTransaction( selector.begin(), TransactionActivation::kInitializeAndActivate)); break; } case spanner_api::TransactionSelector::SelectorCase::kId: { ZETASQL_ASSIGN_OR_RETURN(txn, FindAndUseTransaction(selector.id())); break; } case spanner_api::TransactionSelector::SelectorCase::kSingleUse: { ZETASQL_ASSIGN_OR_RETURN(txn, CreateSingleUseTransaction(selector.single_use())); break; } default: // If no transaction selector is provided, the default is a // temporary read-only transaction with strong concurrency. spanner_api::TransactionOptions options; options.mutable_read_only()->set_strong(true); options.mutable_read_only()->set_return_read_timestamp(false); ZETASQL_ASSIGN_OR_RETURN(txn, CreateSingleUseTransaction(options)); } return txn; } } // namespace frontend } // namespace emulator } // namespace spanner } // namespace google