absl::Status ExecuteStreamingSql()

in frontend/handlers/queries.cc [368:540]


absl::Status ExecuteStreamingSql(
    RequestContext* ctx, const spanner_api::ExecuteSqlRequest* request,
    ServerStream<spanner_api::PartialResultSet>* stream) {
  // Take shared ownerships of session and transaction so that they will keep
  // valid throughout this function.
  ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Session> session,
                   GetSession(ctx, request->session()));

  backend::ChangeStreamQueryValidator::ChangeStreamMetadata
      change_stream_metadata;

  // Get underlying transaction.
  bool is_dml_query = backend::IsDMLQuery(request->sql());

  ZETASQL_RETURN_IF_ERROR(ValidateTransactionSelectorForQuery(request->transaction(),
                                                      is_dml_query));
  ZETASQL_ASSIGN_OR_RETURN(std::shared_ptr<Transaction> txn,
                   session->FindOrInitTransaction(request->transaction()));
  ZETASQL_RETURN_IF_ERROR(
      ValidateDirectedReadsOption(request->directed_read_options(), txn));

  // Wrap all operations on this transaction so they are atomic.
  absl::Status status = txn->GuardedCall(
      is_dml_query ? Transaction::OpType::kDml : Transaction::OpType::kSql,
      [&]() -> absl::Status {
        // Register DML request and check for status replay.
        if (is_dml_query) {
          const auto state = txn->LookupOrRegisterDmlRequest(
              request->seqno(), HashRequest(request), request->sql());
          if (state.has_value()) {
            if (!state->status.ok()) {
              return state->status;
            }
            if (!std::holds_alternative<spanner_api::ResultSet>(
                    state->outcome)) {
              return error::ReplayRequestMismatch(request->seqno(),
                                                  request->sql());
            }
            spanner_api::PartialResultSet response;
            spanner_api::ResultSet replay_result =
                std::get<spanner_api::ResultSet>(state->outcome);
            *response.mutable_stats() = replay_result.stats();
            *response.mutable_metadata() = replay_result.metadata();
            stream->Send(response);
            return state->status;
          }

          // DML needs to explicitly check the transaction status since
          // the DML sequence number replay should take priority over returning
          // a previously encountered error status.
          ZETASQL_RETURN_IF_ERROR(txn->Status());
        }

        // Cannot query after commit, rollback, or non-recoverable error.
        if (txn->IsInvalid()) {
          return error::CannotUseTransactionAfterConstraintError();
        }
        if (txn->IsCommitted() || txn->IsRolledback()) {
          if (txn->IsPartitionedDml()) {
            return error::CannotReusePartitionedDmlTransaction();
          }
          return error::CannotReadOrQueryAfterCommitOrRollback();
        }
        if (txn->IsReadOnly()) {
          if (is_dml_query) {
            return error::ReadOnlyTransactionDoesNotSupportDml("ReadOnly");
          }
          ZETASQL_ASSIGN_OR_RETURN(absl::Time read_timestamp, txn->GetReadTimestamp());
          ZETASQL_RETURN_IF_ERROR(ValidateReadTimestampNotTooFarInFuture(
              read_timestamp, ctx->env()->clock()->Now()));
        }
        // Convert and execute provided SQL statement.
        ZETASQL_ASSIGN_OR_RETURN(const backend::Query query,
                         QueryFromProto(request->sql(), request->params(),
                                        request->param_types(),
                                        txn->query_engine()->type_factory(),
                                        txn->schema()->proto_bundle()));
        bool in_read_write_txn = txn->IsReadWrite() || txn->IsPartitionedDml();
        ZETASQL_ASSIGN_OR_RETURN(change_stream_metadata,
                         backend::QueryEngine::TryGetChangeStreamMetadata(
                             query, txn->schema(), in_read_write_txn));
        // if current query is a change stream query, return and exit current
        // transaction lambda to avoid nested transaction call.
        if (change_stream_metadata.is_change_stream_query) {
          return absl::OkStatus();
        }
        auto maybe_result = txn->ExecuteSql(query, request->query_mode());
        if (!maybe_result.ok()) {
          absl::Status error = maybe_result.status();
          if (txn->IsPartitionedDml()) {
            // A Partitioned DML transaction will become invalidated on any
            // error.
            error.SetPayload(kConstraintError, absl::Cord(""));
          }
          return error;
        }
        backend::QueryResult& result = maybe_result.value();

        std::vector<spanner_api::PartialResultSet> responses;
        if (is_dml_query) {
          responses.emplace_back();
          if (result.rows == nullptr) {
            // Set empty row type.
            responses.back().mutable_metadata()->mutable_row_type();
          } else {
            // It contains DML THEN RETURN row results.
            ZETASQL_ASSIGN_OR_RETURN(responses, RowCursorToPartialResultSetProtos(
                                            result.rows.get(), /*limit=*/0));
          }
          if (txn->IsPartitionedDml()) {
            responses.back().mutable_stats()->set_row_count_lower_bound(
                result.modified_row_count);
          } else {
            responses.back().mutable_stats()->set_row_count_exact(
                result.modified_row_count);
          }
        } else {
          ZETASQL_ASSIGN_OR_RETURN(responses, RowCursorToPartialResultSetProtos(
                                          result.rows.get(), /*limit=*/0));
        }

        if (!request->partition_token().empty()) {
          ZETASQL_ASSIGN_OR_RETURN(
              auto partition_token,
              PartitionTokenFromString(request->partition_token()));
          ZETASQL_RETURN_IF_ERROR(ValidatePartitionToken(partition_token, request));
          if (partition_token.empty_query_partition()) {
            // Clear all partial responses except the first one. Return only
            // metadata in the first partial response.
            responses.resize(1);
            responses.front().clear_values();
            responses.front().clear_chunked_value();
          }
        }

        // Populate transaction metadata.
        if (ShouldReturnTransaction(request->transaction())) {
          ZETASQL_ASSIGN_OR_RETURN(
              *responses.front().mutable_metadata()->mutable_transaction(),
              txn->ToProto());
        }
        // Return query parameter types.
        ZETASQL_RETURN_IF_ERROR(AddUndeclaredParametersFromQueryResult(
            &result.parameter_types, responses.front().mutable_metadata()));

        // Add basic stats for PROFILE mode. We do this to interoperate with
        // REPL applications written for Cloud Spanner. The profile will not
        // contain statistics for plan nodes.
        if (request->query_mode() == spanner_api::ExecuteSqlRequest::PROFILE) {
          AddQueryStatsFromQueryResult(
              result, responses.front().mutable_stats()->mutable_query_stats());
        }

        // Send results back to client.
        for (const auto& response : responses) {
          stream->Send(response);
        }

        if (is_dml_query) {
          spanner_api::ResultSet replay_result;
          *replay_result.mutable_stats() = responses[0].stats();
          *replay_result.mutable_metadata() = responses[0].metadata();
          txn->SetDmlReplayOutcome(replay_result);
        }
        return absl::OkStatus();
      });
  if (change_stream_metadata.is_change_stream_query) {
    ChangeStreamsHandler change_streams_handler{change_stream_metadata};
    return change_streams_handler.ExecuteChangeStreamQuery(request, stream,
                                                           session);
  }
  return status;
}