absl::Status ExecuteSql()

in frontend/handlers/queries.cc [219:360]


absl::Status ExecuteSql(RequestContext* ctx,
                        const spanner_api::ExecuteSqlRequest* request,
                        spanner_api::ResultSet* response) {
  // Take shared ownerships of session and transaction so that they will keep
  // valid throughout this function.
  ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
                   GetSession(ctx, request->session()));

  // Get underlying transaction.
  bool is_dml_query = backend::IsDMLQuery(request->sql());
  ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
                                                      is_dml_query));
  ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
                   session->FindOrInitTransaction(request->transaction()));
  ZETASQL_RETURN_IF_ERROR(
      ValidateDirectedReadsOption(request->directed_read_options(), txn));

  // Wrap all operations on this transaction so they are atomic.
  return txn->GuardedCall(
      is_dml_query ? Transaction::OpType::kDml : Transaction::OpType::kSql,
      [&]() -> absl::Status {
        // Register DML request and check for status replay.
        if (is_dml_query) {
          const auto state = txn->LookupOrRegisterDmlRequest(
              request->seqno(), HashRequest(request), request->sql());
          if (state.has_value()) {
            if (!state->status.ok()) {
              return state->status;
            }
            if (!std::holds_alternative<spanner_api::ResultSet>(
                    state->outcome)) {
              return error::ReplayRequestMismatch(request->seqno(),
                                                  request->sql());
            }
            *response = std::get<spanner_api::ResultSet>(state->outcome);
            return state->status;
          }

          // DML needs to explicitly check the transaction status since
          // the DML sequence number replay should take priority over returning
          // a previously encountered error status.
          ZETASQL_RETURN_IF_ERROR(txn->Status());
        }

        // Cannot query after commit, rollback, or non-recoverable error.
        if (txn->IsInvalid()) {
          return error::CannotUseTransactionAfterConstraintError();
        }
        if (txn->IsCommitted() || txn->IsRolledback()) {
          if (txn->IsPartitionedDml()) {
            return error::CannotReusePartitionedDmlTransaction();
          }
          return error::CannotReadOrQueryAfterCommitOrRollback();
        }
        if (txn->IsReadOnly()) {
          if (is_dml_query) {
            return error::ReadOnlyTransactionDoesNotSupportDml("ReadOnly");
          }
          ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, txn->GetReadTimestamp());
          ZETASQL_RETURN_IF_ERROR(ValidateReadTimestampNotTooFarInFuture(
              read_timestamp, ctx->env()->clock()->Now()));
        }

        // Convert and execute provided SQL statement.
        ZETASQL_ASSIGN_OR_RETURN(const backend::Query query,
                         QueryFromProto(request->sql(), request->params(),
                                        request->param_types(),
                                        txn->query_engine()->type_factory(),
                                        txn->schema()->proto_bundle()));
        auto maybe_result = txn->ExecuteSql(query, request->query_mode());
        if (!maybe_result.ok()) {
          absl::Status error = maybe_result.status();
          if (txn->IsPartitionedDml()) {
            // A Partitioned DML transaction will become invalidated on any
            // error.
            error.SetPayload(kConstraintError, absl::Cord(""));
          }
          return error;
        }
        backend::QueryResult& result = maybe_result.value();

        // Populate transaction metadata.
        if (ShouldReturnTransaction(request->transaction())) {
          ZETASQL_ASSIGN_OR_RETURN(*response->mutable_metadata()->mutable_transaction(),
                           txn->ToProto());
        }
        // Return query parameter types.
        ZETASQL_RETURN_IF_ERROR(AddUndeclaredParametersFromQueryResult(
            &result.parameter_types, response->mutable_metadata()));

        if (is_dml_query) {
          if (txn->IsPartitionedDml()) {
            response->mutable_stats()->set_row_count_lower_bound(
                result.modified_row_count);
          } else {
            response->mutable_stats()->set_row_count_exact(
                result.modified_row_count);
          }

          if (result.rows == nullptr) {
            // Set empty row type.
            response->mutable_metadata()->mutable_row_type();
          } else {
            // It contains DML THEN RETURN row results.
            ZETASQL_RETURN_IF_ERROR(RowCursorToResultSetProto(result.rows.get(),
                                                      /*limit=*/0, response));
          }
        } else {
          ZETASQL_RETURN_IF_ERROR(RowCursorToResultSetProto(result.rows.get(),
                                                    /*limit=*/0, response));
        }

        if (!request->partition_token().empty()) {
          ZETASQL_ASSIGN_OR_RETURN(
              auto partition_token,
              PartitionTokenFromString(request->partition_token()));
          ZETASQL_RETURN_IF_ERROR(ValidatePartitionToken(partition_token, request));
          if (partition_token.empty_query_partition()) {
            response->clear_rows();
          }
        }

        // Add basic stats for PROFILE mode. We do this to interoperate with
        // REPL applications written for Cloud Spanner. The profile will not
        // contain statistics for plan nodes.
        if (request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
          AddQueryStatsFromQueryResult(
              result, response->mutable_stats()->mutable_query_stats());
        }
        // Add an empty query plan if the user requested either PLAN or PROFILE
        // query mode.
        if (request->query_mode() == spanner_api::ExecuteSqlRequest::PLAN ||
            request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
          AddEmptyQueryPlan(response->mutable_stats());
        }

        if (is_dml_query) {
          txn->SetDmlReplayOutcome(*response);
        }
        return absl::OkStatus();
      });
}