void IssueQueryController::IssueQueriesInternal()

in benchmarks/rnnt/ootb/inference/loadgen/issue_query_controller.cc [410:615]


void IssueQueryController::IssueQueriesInternal(size_t query_stride,
                                                size_t thread_idx) {
  // Get all the needed information.
  auto sut = state->sut;
  auto& queries = *state->queries;
  auto& response_logger = *state->response_delegate;

  // Some book-keeping about the number of queries issued.
  size_t queries_issued = 0;
  size_t queries_issued_per_iter = 0;
  size_t queries_count = queries.size();

  // Calculate the min/max queries per issue thread.
  const auto& settings = *state->settings;
  const size_t min_query_count = settings.min_query_count;
  const size_t min_query_count_for_thread =
      (thread_idx < (min_query_count % query_stride))
          ? (min_query_count / query_stride + 1)
          : (min_query_count / query_stride);
  const size_t max_query_count = settings.max_query_count;
  const size_t max_query_count_for_thread =
      (thread_idx < (max_query_count % query_stride))
          ? (max_query_count / query_stride + 1)
          : (max_query_count / query_stride);

  // Create query scheduler.
  const auto start = state->start_time;
  QueryScheduler<scenario> query_scheduler(settings, start);
  auto last_now = start;

  // We can never run out of generated queries in the server scenario,
  // since the duration depends on the scheduled query time and not
  // the actual issue time.
  bool ran_out_of_generated_queries = scenario != TestScenario::Server;
  // This is equal to the sum of numbers of samples issued.
  size_t expected_latencies = 0;

  for (size_t queries_idx = thread_idx; queries_idx < queries_count;
       queries_idx += query_stride) {
    queries_issued_per_iter = 0;
    auto& query = queries[queries_idx];
    auto tracer1 =
        MakeScopedTracer([](AsyncTrace& trace) { trace("SampleLoop"); });
    last_now = query_scheduler.Wait(&query);

    // If in Server scenario and server_coalesce_queries is enabled, multiple
    // queries are coalesed into one big query if the current time has already
    // passed the scheduled time of multiple queries.
    if (scenario == TestScenario::Server &&
        settings.requested.server_coalesce_queries) {
      auto current_query_idx = queries_idx;
      for (; queries_idx + query_stride < queries_count;
           queries_idx += query_stride) {
        auto next_scheduled_time =
            start + queries[queries_idx + query_stride].scheduled_delta;
        // If current time hasn't reached the next query's scheduled time yet,
        // don't include next query.
        if (last_now < next_scheduled_time) {
          break;
        }
        queries_issued_per_iter++;
      }
      if (queries_idx > current_query_idx) {
        // Coalesced all the pass due queries.
        query.CoalesceQueries(queries.data(), current_query_idx + query_stride,
                              queries_idx, query_stride);
      }
    }

    // Issue the query to the SUT.
    {
      auto tracer3 =
          MakeScopedTracer([](AsyncTrace& trace) { trace("IssueQuery"); });
      sut->IssueQuery(query.query_to_send);
    }

    // Increment the counter.
    expected_latencies += query.query_to_send.size();
    queries_issued_per_iter++;
    queries_issued += queries_issued_per_iter;

    if (scenario == TestScenario::Server &&
        settings.requested.server_coalesce_queries) {
      // Set the query back to its clean state.
      query.Decoalesce();
    }

    if (state->mode == TestMode::AccuracyOnly) {
      // TODO: Rate limit in accuracy mode so accuracy mode works even
      //       if the expected/target performance is way off.
      continue;
    }

    auto duration = (last_now - start);
    if (scenario == TestScenario::Server) {
      if (settings.max_async_queries != 0) {
        // Checks if there are too many outstanding queries.
        size_t queries_issued_total{0};
        if (multi_thread) {
          // To check actual number of async queries in multi-thread case,
          // we would have to combine the number of queries_issued from all
          // issue threads.
          {
            std::lock_guard<std::mutex> lock(state->mtx);
            state->queries_issued += queries_issued_per_iter;
            queries_issued_total = state->queries_issued;
          }
        } else {
          queries_issued_total = queries_issued;
        }
        size_t queries_outstanding =
            queries_issued_total -
            response_logger.queries_completed.load(std::memory_order_relaxed);
        if (queries_outstanding > settings.max_async_queries) {
          LogDetail([thread_idx, queries_issued_total,
                     queries_outstanding](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
            std::stringstream ss;
            ss << "IssueQueryThread " << thread_idx
               << " Ending early: Too many outstanding queries."
               << " issued " << queries_issued_total << " outstanding "
               << queries_outstanding;
            MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
            detail.Error("IssueQueryThread ", std::to_string(thread_idx),
                         " Ending early: Too many outstanding queries.",
                         "issued", std::to_string(queries_issued_total),
                         "outstanding", std::to_string(queries_outstanding));
#endif
          });
          break;
        }
      }
    } else {
      // Checks if we end normally.
      if (queries_issued >= min_query_count_for_thread &&
          duration >= settings.target_duration) {
        LogDetail([thread_idx](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
          MLPERF_LOG(
              detail, "generic_message",
              "Ending naturally: Minimum query count and test duration met.");
#else
          detail(
              " Ending naturally: Minimum query count and test duration met.");
#endif
        });
        ran_out_of_generated_queries = false;
        break;
      }
    }

    // Checks if we have exceeded max_query_count for this thread.
    if (settings.max_query_count != 0 &&
        queries_issued >= max_query_count_for_thread) {
      LogDetail([thread_idx, queries_issued](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
        std::stringstream ss;
        ss << "IssueQueryThread " << thread_idx
           << " Ending early: Max query count reached."
           << " query_count " << queries_issued;
        MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
        detail.Error("IssueQueryThread ", std::to_string(thread_idx),
                     " Ending early: Max query count reached.", "query_count",
                     std::to_string(queries_issued));
#endif
      });
      ran_out_of_generated_queries = false;
      break;
    }

    // Checks if we have exceeded max_duration.
    if (settings.max_duration.count() != 0 &&
        duration > settings.max_duration) {
      LogDetail([thread_idx, duration](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
        std::stringstream ss;
        ss << "IssueQueryThread " << thread_idx
           << " Ending early: Max test duration reached."
           << " duration_ns " << duration.count();
        MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
        detail.Error("IssueQueryThread ", std::to_string(thread_idx),
                     " Ending early: Max test duration reached.", "duration_ns",
                     std::to_string(duration.count()));
#endif
      });
      ran_out_of_generated_queries = false;
      break;
    }
  }

  // Combine the issuing statistics from multiple issue threads.
  {
    std::lock_guard<std::mutex> lock(state->mtx);
    state->ran_out_of_generated_queries |= ran_out_of_generated_queries;
    // In Server scenario and when max_async_queries != 0, we would have set
    // state->queries_issued when we check max_async_queries in the loop.
    if (!(scenario == TestScenario::Server && settings.max_async_queries != 0 &&
          multi_thread)) {
      state->queries_issued += queries_issued;
    }
    state->expected_latencies += expected_latencies;
  }
}