backend/locking/manager.cc (95 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "backend/locking/manager.h" #include <functional> #include <memory> #include "absl/memory/memory.h" #include "absl/random/random.h" #include "absl/random/uniform_int_distribution.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/substitute.h" #include "absl/synchronization/mutex.h" #include "absl/time/time.h" #include "backend/common/ids.h" #include "common/config.h" #include "common/errors.h" #include "zetasql/base/ret_check.h" namespace google { namespace spanner { namespace emulator { namespace backend { std::unique_ptr<LockHandle> LockManager::CreateHandle( TransactionID tid, const std::function<absl::Status()>& abort_fn, TransactionPriority priority) { return absl::WrapUnique(new LockHandle(this, tid, abort_fn, priority)); } void LockManager::EnqueueLock(LockHandle* handle, const LockRequest& request) { absl::MutexLock lock(&mu_); // Don't hand out locks to aborted handles. if (handle->IsAborted()) { return; } // If there is no transaction holding the lock, we grant it. if (active_handle_ == nullptr) { active_handle_ = handle; return; } // If the requesting transaction is already holding the lock, we grant it. if (active_handle_->tid() == handle->tid()) { return; } // If we reached here, another transaction is already holding the lock. // Randomly abort the current transaction to ensure that starting a new // transaction is not blocked by the current transaction if this is waiting // for a new transaction to finish. absl::BitGen gen; if (absl::uniform_int_distribution<int>(1, 100)(gen) <= config::abort_current_transaction_probability()) { auto could_be_aborted = active_handle_->TryAbortTransaction( error::AbortCurrentTransaction(active_handle_->tid(), handle->tid())); if (could_be_aborted.ok()) { active_handle_ = handle; return; } } // Couldn't abort the transaction currently holding the lock, so abort the // new transaction. handle->Abort( error::AbortConcurrentTransaction(handle->tid(), active_handle_->tid())); } void LockManager::UnlockAll(LockHandle* handle) { absl::MutexLock lock(&mu_); // If the transaction does not hold the lock, there is nothing to do. if (active_handle_ == nullptr || active_handle_->tid() != handle->tid()) { handle->Reset(); return; } // Clear the active transaction if it holds the lock. active_handle_ = nullptr; handle->Reset(); } absl::StatusOr<absl::Time> LockManager::ReserveCommitTimestamp( LockHandle* handle) { absl::MutexLock lock(&mu_); // If there is no transaction holding the lock, we grant it to the transaction // requesting commit timestamp. This can happen if transaction has empty // mutations and write locks weren't thus acquired yet. if (active_handle_ == nullptr) { active_handle_ = handle; } else if (active_handle_->tid() != handle->tid()) { // There is another active transaction, abort this transaction. return error::AbortConcurrentTransaction(handle->tid(), active_handle_->tid()); } pending_commit_timestamp_ = clock_->Now(); return pending_commit_timestamp_; } absl::Status LockManager::MarkCommitted(LockHandle* handle) { absl::MutexLock lock(&mu_); // This transaction should have been set as the active transaction. ZETASQL_RET_CHECK_EQ(active_handle_->tid(), handle->tid()) << absl::Substitute("Transaction $0 is not active.", handle->tid()); last_commit_timestamp_ = pending_commit_timestamp_; pending_commit_timestamp_ = absl::InfiniteFuture(); pending_commit_cvar_.SignalAll(); return absl::OkStatus(); } void LockManager::WaitForSafeRead(absl::Time read_time) { absl::MutexLock lock(&mu_); // Wait for read time to become current if passed a future timestamp for the // case of exact timestamp bound for snapshot read. // https://cloud.google.com/spanner/docs/timestamp-bounds#introduction bool f = false; mu_.AwaitWithDeadline(absl::Condition(&f), read_time); while (pending_commit_timestamp_ < read_time) { pending_commit_cvar_.Wait(&mu_); } } absl::Time LockManager::LastCommitTimestamp() { absl::ReaderMutexLock lock(&mu_); return last_commit_timestamp_; } } // namespace backend } // namespace emulator } // namespace spanner } // namespace google