in frontend/handlers/queries.cc [544:648]
absl::Status ExecuteBatchDml(RequestContext* ctx,
const spanner_api::ExecuteBatchDmlRequest* request,
spanner_api::ExecuteBatchDmlResponse* response) {
// Verify the request has DML statement(s).
if (request->statements().empty()) {
return error::InvalidBatchDmlRequest();
}
// Take shared ownerships of session and transaction so that they will keep
// valid throughout this function.
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
GetSession(ctx, request->session()));
// Get underlying transaction.
ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
/*is_dml=*/true));
ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
session->FindOrInitTransaction(request->transaction()));
if (txn->IsPartitionedDml()) {
return error::BatchDmlOnlySupportsReadWriteTransaction();
}
// Set default response status to OK. Any error will override this.
*response->mutable_status() = StatusToProto(absl::OkStatus());
// Wrap all operations on this transaction so they are atomic.
return txn->GuardedCall(Transaction::OpType::kDml, [&]() -> absl::Status {
// Register DML request and check for status replay.
const auto state = txn->LookupOrRegisterDmlRequest(
request->seqno(), HashRequest(request), request->statements(0).sql());
if (state.has_value()) {
if (!state->status.ok() &&
txn->DMLErrorType() ==
Transaction::DMLErrorHandlingMode::kDmlRegistrationError) {
return state->status;
}
if (!std::holds_alternative<spanner_api::ExecuteBatchDmlResponse>(
state->outcome)) {
return error::ReplayRequestMismatch(request->seqno(),
request->statements(0).sql());
}
*response =
std::get<spanner_api::ExecuteBatchDmlResponse>(state->outcome);
// BatchDml always returns OK status with the error being populated in the
// response.
return absl::OkStatus();
}
// DML needs to explicitly check the transaction status since
// the DML sequence number replay should take priority over returning
// a previously encountered error status.
ZETASQL_RETURN_IF_ERROR(txn->Status());
// Cannot query after commit, rollback, or non-recoverable error.
if (txn->IsInvalid()) {
return error::CannotUseTransactionAfterConstraintError();
}
if (txn->IsCommitted() || txn->IsRolledback()) {
return error::CannotReadOrQueryAfterCommitOrRollback();
}
for (int index = 0; index < request->statements_size(); ++index) {
const auto& statement = request->statements(index);
if (!backend::IsDMLQuery(statement.sql())) {
absl::Status error = error::ExecuteBatchDmlOnlySupportsDmlStatements(
index, statement.sql());
*response->mutable_status() = StatusToProto(error);
txn->SetDmlReplayOutcome(*response);
return absl::OkStatus();
}
const auto maybe_result = ExecuteQuery(statement, txn);
if (!maybe_result.ok() &&
maybe_result.status().code() != absl::StatusCode::kAborted) {
absl::Status error = maybe_result.status();
*response->mutable_status() = StatusToProto(error);
txn->SetDmlReplayOutcome(*response);
txn->MaybeInvalidate(error);
return absl::OkStatus();
} else if (maybe_result.status().code() == absl::StatusCode::kAborted) {
return maybe_result.status();
}
const auto& result = maybe_result.value();
spanner_api::ResultSet* result_set = response->add_result_sets();
result_set->mutable_stats()->set_row_count_exact(
result.modified_row_count);
// Only populate metadata for first result set.
if (index == 0) {
result_set->mutable_metadata()->mutable_row_type();
if (ShouldReturnTransaction(request->transaction())) {
ZETASQL_ASSIGN_OR_RETURN(
*result_set->mutable_metadata()->mutable_transaction(),
txn->ToProto());
}
}
}
// Set the replay outcome.
txn->SetDmlReplayOutcome(*response);
return absl::OkStatus();
});
}