frontend/entities/transaction.cc (358 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "frontend/entities/transaction.h"
#include <cstddef>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <variant>
#include "google/spanner/v1/spanner.pb.h"
#include "zetasql/public/value.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "absl/types/variant.h"
#include "backend/access/read.h"
#include "backend/access/write.h"
#include "backend/common/ids.h"
#include "backend/common/variant.h"
#include "backend/database/database.h"
#include "backend/query/query_context.h"
#include "backend/query/query_engine.h"
#include "backend/transaction/options.h"
#include "backend/transaction/read_write_transaction.h"
#include "common/constants.h"
#include "common/errors.h"
#include "frontend/converters/time.h"
#include "frontend/converters/types.h"
#include "frontend/converters/values.h"
#include "frontend/entities/database.h"
#include "zetasql/base/ret_check.h"
#include "absl/status/status.h"
#include "zetasql/base/status_macros.h"
#include "zetasql/base/time_proto_util.h"
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
namespace spanner_api = ::google::spanner::v1;
namespace {
Transaction::Type TypeFromTransactionOptions(
const spanner_api::TransactionOptions& options) {
switch (options.mode_case()) {
case v1::TransactionOptions::kReadWrite: {
return Transaction::Type::kReadWrite;
}
case v1::TransactionOptions::kReadOnly: {
return Transaction::Type::kReadOnly;
}
case v1::TransactionOptions::kPartitionedDml: {
return Transaction::Type::kPartitionedDml;
}
case v1::TransactionOptions::MODE_NOT_SET: {
return Transaction::Type::kReadOnly;
}
}
}
bool HasPayload(const absl::Status& status, const std::string& url) {
return status.GetPayload(url).has_value();
}
} // namespace
using ReadWriteTransactionPtr = std::unique_ptr<backend::ReadWriteTransaction>;
using ReadOnlyTransactionPtr = std::unique_ptr<backend::ReadOnlyTransaction>;
Transaction::Transaction(
std::variant<std::unique_ptr<backend::ReadWriteTransaction>,
std::unique_ptr<backend::ReadOnlyTransaction>>
backend_transaction,
const backend::QueryEngine* query_engine,
const spanner_api::TransactionOptions& options, const Usage& usage)
: transaction_(std::move(backend_transaction)),
query_engine_(query_engine),
usage_type_(usage),
type_(TypeFromTransactionOptions(options)),
options_(options) {}
void Transaction::Close() {
absl::MutexLock lock(&mu_);
closed_ = true;
if (type_ == kReadWrite || type_ == kPartitionedDml) {
read_write()->Rollback().IgnoreError();
}
}
absl::StatusOr<spanner_api::Transaction> Transaction::ToProto() {
spanner_api::Transaction txn;
if (usage_type_ != kSingleUse) {
*txn.mutable_id() = std::to_string(id());
}
if (options_.has_read_only() &&
options_.read_only().return_read_timestamp()) {
ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, GetReadTimestamp());
ZETASQL_ASSIGN_OR_RETURN(*txn.mutable_read_timestamp(),
TimestampToProto(read_timestamp));
}
return txn;
}
bool Transaction::IsClosed() const {
absl::MutexLock lock(&mu_);
return closed_;
}
bool Transaction::HasState(
const backend::ReadWriteTransaction::State& state) const {
switch (type_) {
case kReadOnly: {
return false;
}
case kReadWrite:
case kPartitionedDml: {
return read_write()->state() == state;
}
}
}
bool Transaction::IsRolledback() const {
mu_.AssertHeld();
return HasState(backend::ReadWriteTransaction::State::kRolledback);
}
bool Transaction::IsInvalid() const {
mu_.AssertHeld();
return HasState(backend::ReadWriteTransaction::State::kInvalid);
}
bool Transaction::IsAborted() const {
absl::MutexLock lock(&mu_);
return type_ == kReadWrite && status_.code() == absl::StatusCode::kAborted;
}
bool Transaction::IsCommitted() const {
mu_.AssertHeld();
return HasState(backend::ReadWriteTransaction::State::kCommitted);
}
const backend::Schema* Transaction::schema() const {
switch (type_) {
case kReadOnly: {
return read_only()->schema();
}
case kReadWrite:
case kPartitionedDml: {
return read_write()->schema();
}
}
}
backend::TransactionID Transaction::id() const {
switch (type_) {
case kReadOnly: {
return read_only()->id();
}
case kReadWrite:
case kPartitionedDml: {
return read_write()->id();
}
}
}
absl::Status Transaction::Read(const backend::ReadArg& read_arg,
std::unique_ptr<backend::RowCursor>* cursor) {
mu_.AssertHeld();
switch (type_) {
case kReadOnly: {
return read_only()->Read(read_arg, cursor);
}
case kReadWrite: {
return read_write()->Read(read_arg, cursor);
}
case kPartitionedDml: {
return error::InvalidOperationUsingPartitionedDmlTransaction();
}
}
}
absl::StatusOr<backend::QueryResult> Transaction::ExecuteSql(
const backend::Query& query) {
return ExecuteSql(query, v1::ExecuteSqlRequest::NORMAL);
}
absl::StatusOr<backend::QueryResult> Transaction::ExecuteSql(
const backend::Query& query,
const v1::ExecuteSqlRequest_QueryMode query_mode) {
mu_.AssertHeld();
switch (type_) {
case kReadOnly: {
return query_engine_->ExecuteSql(
query,
backend::QueryContext{.schema = schema(),
.reader = read_only(),
.writer = nullptr,
.is_read_only_txn = true},
query_mode);
}
case kReadWrite: {
return query_engine_->ExecuteSql(
query,
backend::QueryContext{.schema = schema(),
.reader = read_write(),
.writer = read_write(),
.commit_timestamp_tracker =
read_write()->commit_timestamp_tracker(),
.allow_read_write_only_functions = true,
.is_read_only_txn = false},
query_mode);
}
case kPartitionedDml: {
auto context = backend::QueryContext{
.schema = schema(),
.reader = read_write(),
.writer = read_write(),
.commit_timestamp_tracker = read_write()->commit_timestamp_tracker(),
.allow_read_write_only_functions = true,
.is_read_only_txn = false};
ZETASQL_RETURN_IF_ERROR(query_engine_->IsValidPartitionedDML(query, context));
// PartitionedDml will auto-commit transactions and cannot be reused.
ZETASQL_ASSIGN_OR_RETURN(backend::QueryResult result,
query_engine_->ExecuteSql(query, context, query_mode));
ZETASQL_RETURN_IF_ERROR(read_write()->Commit());
return result;
}
}
}
absl::Status Transaction::Write(const backend::Mutation& mutation) {
mu_.AssertHeld();
if (type_ == kReadWrite) {
return read_write()->Write(mutation);
}
return error::CannotCommitRollbackReadOnlyOrPartitionedDmlTransaction();
}
absl::Status Transaction::Commit() {
mu_.AssertHeld();
if (type_ == kReadWrite) {
return read_write()->Commit();
}
return error::CannotCommitRollbackReadOnlyOrPartitionedDmlTransaction();
}
absl::Status Transaction::Invalidate() {
mu_.AssertHeld();
if (type_ == kReadWrite) {
return read_write()->Invalidate();
}
return error::Internal("Read only transaction cannot be invalidated.");
}
absl::Status Transaction::Rollback() {
mu_.AssertHeld();
if (type_ == kReadWrite) {
return read_write()->Rollback();
}
return error::CannotCommitRollbackReadOnlyOrPartitionedDmlTransaction();
}
absl::StatusOr<absl::Time> Transaction::GetReadTimestamp() const {
if (type_ == kReadOnly) {
return read_only()->read_timestamp();
}
return error::CannotReturnReadTimestampForReadWriteTransaction();
}
absl::StatusOr<absl::Time> Transaction::GetCommitTimestamp() const {
mu_.AssertHeld();
if (type_ == kReadWrite) {
return read_write()->GetCommitTimestamp();
}
return error::CannotCommitRollbackReadOnlyOrPartitionedDmlTransaction();
}
absl::Status Transaction::Status() const {
mu_.AssertHeld();
return status_;
}
void Transaction::MaybeInvalidate(const absl::Status& status) {
mu_.AssertHeld();
if (HasPayload(status, kConstraintError)) {
status_ = absl::Status(status.code(), status.message());
Invalidate().IgnoreError();
}
}
std::optional<Transaction::RequestReplayState>
Transaction::LookupOrRegisterDmlRequest(int64_t seqno, int64_t request_hash,
const std::string& sql_statement) {
mu_.AssertHeld();
current_dml_seqno_ = seqno;
const auto request = dml_requests_.find(seqno);
if (request == dml_requests_.end()) {
// If the request was not found, then it is a new request. Check to see that
// it isn't out of order.
if (!dml_requests_.empty() && seqno < dml_requests_.rbegin()->first) {
Transaction::RequestReplayState state;
state.status = error::DmlSequenceOutOfOrder(
seqno, dml_requests_.rbegin()->first, sql_statement);
// This is marked as a dml replay for error handling purposes. We do not
// want this status to be recorded within SetDmlRequestReplayStatus.
dml_error_mode_ = DMLErrorHandlingMode::kDmlRegistrationError;
return state;
}
// Order was valid, so we record the new sequence number.
dml_requests_.emplace(
seqno, Transaction::RequestReplayState{.status = absl::OkStatus(),
.request_hash = request_hash});
dml_error_mode_ = DMLErrorHandlingMode::kDmlRequest;
return std::nullopt;
}
// Request was found, check to see that the request hash matches.
if (request_hash != request->second.request_hash) {
Transaction::RequestReplayState state = request->second;
state.status = error::ReplayRequestMismatch(seqno, sql_statement);
dml_error_mode_ = DMLErrorHandlingMode::kDmlRegistrationError;
return state;
}
// Return the saved status for this sequence.
dml_error_mode_ = DMLErrorHandlingMode::kDmlReplay;
return request->second;
}
void Transaction::SetDmlRequestReplayStatus(const absl::Status& status) {
mu_.AssertHeld();
// Ignore replays and registration errors.
if (dml_error_mode_ == DMLErrorHandlingMode::kDmlReplay ||
dml_error_mode_ == DMLErrorHandlingMode::kDmlRegistrationError) {
return;
}
const auto request = dml_requests_.find(current_dml_seqno_);
ABSL_DCHECK(request != dml_requests_.end());
if (request != dml_requests_.end()) {
request->second.status = status;
}
}
void Transaction::SetDmlReplayOutcome(
std::variant<spanner_api::ResultSet, spanner_api::ExecuteBatchDmlResponse>
outcome) {
mu_.AssertHeld();
// Ignore invalid transactions.
if (IsInvalid()) {
return;
}
const auto request = dml_requests_.find(current_dml_seqno_);
ABSL_DCHECK(request != dml_requests_.end())
<< "DML sequence number was not registered.";
if (request != dml_requests_.end()) {
request->second.outcome = outcome;
}
}
Transaction::DMLErrorHandlingMode Transaction::DMLErrorType() const {
return dml_error_mode_;
}
absl::Status Transaction::GuardedCall(OpType op,
const std::function<absl::Status()>& fn) {
absl::MutexLock lock(&mu_);
// Cannot reuse a transaction that previously encountered an error.
// Replay the last error status for the given transaction. Status will not be
// replayed for rollback operations. DML/BatchDML operations will check for
// this inside of the handler since dml sequence number replay must be checked
// first.
if (op != OpType::kDml && op != OpType::kRollback) {
ZETASQL_RETURN_IF_ERROR(status_);
}
// We only want to record the status for non-read operations, since read-only
// operations can never cause the transaction to be aborted and never repeat
// status errors. Non-DML SQL statements are read-only.
const absl::Status call_status = fn();
if (!call_status.ok()) {
if (op == OpType::kCommit || HasPayload(call_status, kConstraintError) ||
call_status.code() == absl::StatusCode::kAborted) {
status_ = absl::Status(call_status.code(), call_status.message());
Invalidate().IgnoreError();
}
if (op == OpType::kDml) {
SetDmlRequestReplayStatus(call_status);
}
}
if (op == OpType::kRollback) {
status_ = call_status;
}
// Strip payload from return status.
return absl::Status(call_status.code(), call_status.message());
}
bool ShouldReturnTransaction(
const google::spanner::v1::TransactionSelector& selector) {
if (selector.selector_case() ==
spanner_api::TransactionSelector::SelectorCase::kBegin) {
return true;
}
if (selector.selector_case() ==
spanner_api::TransactionSelector::SelectorCase::kSingleUse) {
return selector.single_use().has_read_only() &&
selector.single_use().read_only().return_read_timestamp();
}
return false;
}
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google