backend/transaction/read_write_transaction.h (118 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_TRANSACTION_READ_WRITE_TRANSACTION_H_
#define THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_TRANSACTION_READ_WRITE_TRANSACTION_H_
#include <memory>
#include <queue>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "backend/access/read.h"
#include "backend/access/write.h"
#include "backend/actions/context.h"
#include "backend/actions/manager.h"
#include "backend/common/case.h"
#include "backend/common/ids.h"
#include "backend/datamodel/key.h"
#include "backend/datamodel/key_range.h"
#include "backend/locking/handle.h"
#include "backend/locking/manager.h"
#include "backend/schema/catalog/schema.h"
#include "backend/schema/catalog/table.h"
#include "backend/schema/catalog/versioned_catalog.h"
#include "backend/storage/storage.h"
#include "backend/transaction/actions.h"
#include "backend/transaction/commit_timestamp.h"
#include "backend/transaction/options.h"
#include "backend/transaction/resolve.h"
#include "backend/transaction/transaction_store.h"
#include "common/clock.h"
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
// ReadWriteTransaction is a transaction that can modify the database. All the
// reads and writes in the transaction are logically performed at the same
// timestamp (commit timestamp).
class ReadWriteTransaction : public RowReader, public RowWriter {
public:
enum class State {
// Uninitialized transaction.
kUninitialized,
// Active transaction.
kActive,
// Committed transaction.
kCommitted,
// Rolledback transaction (initiated by user), cannot be retried.
kRolledback,
// Transaction was aborted by another transaction.
kAborted,
// Transaction has been invalidated due to non-recoverable constraint
// errors. It cannot be retried. This is not the same as the transaction
// returning kAborted status error, in that case the transaction can be
// retried.
kInvalid,
};
ReadWriteTransaction(const ReadWriteOptions& options,
const RetryState& retry_state,
TransactionID transaction_id, Clock* clock,
Storage* storage, LockManager* lock_manager,
const VersionedCatalog* const versioned_catalog,
ActionManager* action_manager);
absl::Status Read(const ReadArg& read_arg,
std::unique_ptr<RowCursor>* cursor) override
ABSL_LOCKS_EXCLUDED(mu_);
absl::Status Write(const Mutation& mutation) override
ABSL_LOCKS_EXCLUDED(mu_);
absl::Status Commit() ABSL_LOCKS_EXCLUDED(mu_);
absl::Status Rollback() ABSL_LOCKS_EXCLUDED(mu_);
// Tries to abort the transaction. This is a best effort attempt and returns
// OK only if the transaction could successfully be aborted.
absl::Status TryAbort() ABSL_LOCKS_EXCLUDED(mu_);
absl::Status Invalidate() ABSL_LOCKS_EXCLUDED(mu_);
absl::StatusOr<absl::Time> GetCommitTimestamp() ABSL_LOCKS_EXCLUDED(mu_);
const State state() const ABSL_LOCKS_EXCLUDED(mu_) {
absl::MutexLock lock(&mu_);
return state_;
}
// Returns the schema used by this transaction.
const Schema* schema() const ABSL_LOCKS_EXCLUDED(mu_);
// Returns the ID of this transaction.
const TransactionID id() const { return id_; }
// Returns the options for this transaction.
const ReadWriteOptions& options() const { return options_; }
// Returns the retry state for this transaction.
const RetryState retry_state() const ABSL_LOCKS_EXCLUDED(mu_) {
absl::MutexLock lock(&mu_);
return retry_state_;
}
// Returns the commit timestamp tracker for this transaction.
const CommitTimestampTracker* commit_timestamp_tracker() const {
return commit_timestamp_tracker_.get();
}
private:
friend class TransactionOpsProcessor;
enum class OpType {
kRead,
kWrite,
kCommit,
kRollback,
kInvalidate,
};
absl::Status GuardedCall(OpType op, const std::function<absl::Status()>& fn)
ABSL_LOCKS_EXCLUDED(mu_);
absl::Status ProcessWriteOps(const std::vector<WriteOp>& write_ops);
absl::Status ProcessChangeStreamWriteOps();
// Resets the transaction and marks it Active.
void Reset();
// Apply the constraint checks and effects to the writes.
absl::Status ApplyValidators(const WriteOp& op);
absl::Status ApplyEffectors(const WriteOp& op);
absl::Status ApplyStatementVerifiers();
// Updates commit timestamp tracking to reflect currently buffered ops.
void UpdateTrackedCommitTimestamps();
// Converts input non-delete MutationOp into ResolvedMutationOp after
// validating that input table, columns and rows are valid schema objects.
absl::StatusOr<ResolvedMutationOp> ResolveNonDeleteMutationOp(
const MutationOp& mutation_op, const Schema* schema);
// Returns true if the given key exists within the table.
bool KeyExists(const Table* table, const Key& key) const;
// Mutex that guards the state of this transaction.
mutable absl::Mutex mu_;
// Options with which the transaction was created.
ReadWriteOptions options_;
// Initial state of the transaction between retry attempts.
RetryState retry_state_ ABSL_GUARDED_BY(mu_);
// ID for this transaction.
const TransactionID id_;
// System-wide monotonic clock.
Clock* clock_;
// Underlying storage of the database.
Storage* base_storage_;
// Catalog of schemas.
const VersionedCatalog* const versioned_catalog_;
// Transaction lock management.
std::unique_ptr<LockHandle> lock_handle_;
// Tracks tables/columns containing pending commit timestamp.
std::unique_ptr<CommitTimestampTracker> commit_timestamp_tracker_;
// The overlay storage layer that handles mutations and read-your-write
// semantics.
std::unique_ptr<TransactionStore> transaction_store_;
// Action Manager for the transaction.
ActionManager* action_manager_;
ActionRegistry* action_registry_;
std::unique_ptr<ActionContext> action_context_;
// The commit timestamp chosen for this transaction.
absl::Time commit_timestamp_ ABSL_GUARDED_BY(mu_);
// Queue of mutations being processed by this transaction.
std::queue<WriteOp> write_ops_queue_ ABSL_GUARDED_BY(mu_);
// The state of this transaction.
State state_ ABSL_GUARDED_BY(mu_) = State::kUninitialized;
// The schema that is in effect at the timestamp picked for this transaction.
const Schema* schema_ ABSL_GUARDED_BY(mu_);
CaseInsensitiveStringMap<std::vector<KeyRange>> deleted_key_ranges_by_table_;
};
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google
#endif // THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_TRANSACTION_READ_WRITE_TRANSACTION_H_