absl::Status ExecuteBatchDml()

in frontend/handlers/queries.cc [544:648]


absl::Status ExecuteBatchDml(RequestContext* ctx,
                             const spanner_api::ExecuteBatchDmlRequest* request,
                             spanner_api::ExecuteBatchDmlResponse* response) {
  // Verify the request has DML statement(s).
  if (request->statements().empty()) {
    return error::InvalidBatchDmlRequest();
  }

  // Take shared ownerships of session and transaction so that they will keep
  // valid throughout this function.
  ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
                   GetSession(ctx, request->session()));

  // Get underlying transaction.
  ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
                                                      /*is_dml=*/true));
  ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
                   session->FindOrInitTransaction(request->transaction()));

  if (txn->IsPartitionedDml()) {
    return error::BatchDmlOnlySupportsReadWriteTransaction();
  }

  // Set default response status to OK. Any error will override this.
  *response->mutable_status() = StatusToProto(absl::OkStatus());

  // Wrap all operations on this transaction so they are atomic.
  return txn->GuardedCall(Transaction::OpType::kDml, [&]() -> absl::Status {
    // Register DML request and check for status replay.
    const auto state = txn->LookupOrRegisterDmlRequest(
        request->seqno(), HashRequest(request), request->statements(0).sql());
    if (state.has_value()) {
      if (!state->status.ok() &&
          txn->DMLErrorType() ==
              Transaction::DMLErrorHandlingMode::kDmlRegistrationError) {
        return state->status;
      }
      if (!std::holds_alternative<spanner_api::ExecuteBatchDmlResponse>(
              state->outcome)) {
        return error::ReplayRequestMismatch(request->seqno(),
                                            request->statements(0).sql());
      }
      *response =
          std::get<spanner_api::ExecuteBatchDmlResponse>(state->outcome);

      // BatchDml always returns OK status with the error being populated in the
      // response.
      return absl::OkStatus();
    }
    // DML needs to explicitly check the transaction status since
    // the DML sequence number replay should take priority over returning
    // a previously encountered error status.
    ZETASQL_RETURN_IF_ERROR(txn->Status());

    // Cannot query after commit, rollback, or non-recoverable error.
    if (txn->IsInvalid()) {
      return error::CannotUseTransactionAfterConstraintError();
    }
    if (txn->IsCommitted() || txn->IsRolledback()) {
      return error::CannotReadOrQueryAfterCommitOrRollback();
    }

    for (int index = 0; index < request->statements_size(); ++index) {
      const auto& statement = request->statements(index);
      if (!backend::IsDMLQuery(statement.sql())) {
        absl::Status error = error::ExecuteBatchDmlOnlySupportsDmlStatements(
            index, statement.sql());
        *response->mutable_status() = StatusToProto(error);
        txn->SetDmlReplayOutcome(*response);
        return absl::OkStatus();
      }

      const auto maybe_result = ExecuteQuery(statement, txn);
      if (!maybe_result.ok() &&
          maybe_result.status().code() != absl::StatusCode::kAborted) {
        absl::Status error = maybe_result.status();
        *response->mutable_status() = StatusToProto(error);
        txn->SetDmlReplayOutcome(*response);
        txn->MaybeInvalidate(error);
        return absl::OkStatus();
      } else if (maybe_result.status().code() == absl::StatusCode::kAborted) {
        return maybe_result.status();
      }

      const auto& result = maybe_result.value();
      spanner_api::ResultSet* result_set = response->add_result_sets();
      result_set->mutable_stats()->set_row_count_exact(
          result.modified_row_count);

      // Only populate metadata for first result set.
      if (index == 0) {
        result_set->mutable_metadata()->mutable_row_type();
        if (ShouldReturnTransaction(request->transaction())) {
          ZETASQL_ASSIGN_OR_RETURN(
              *result_set->mutable_metadata()->mutable_transaction(),
              txn->ToProto());
        }
      }
    }

    // Set the replay outcome.
    txn->SetDmlReplayOutcome(*response);
    return absl::OkStatus();
  });
}