PerformanceResult IssueQueries()

in benchmarks/rnnt/ootb/inference/loadgen/loadgen.cc [351:512]


PerformanceResult IssueQueries(SystemUnderTest* sut,
                               const TestSettingsInternal& settings,
                               const LoadableSampleSet& loaded_sample_set,
                               SequenceGen* sequence_gen) {
  // Create reponse handler.
  ResponseDelegateDetailed<scenario, mode> response_logger;
  std::uniform_real_distribution<double> accuracy_log_offset_dist =
      std::uniform_real_distribution<double>(0.0, 1.0);
  std::mt19937 accuracy_log_offset_rng(settings.accuracy_log_rng_seed);
  response_logger.accuracy_log_offset =
      accuracy_log_offset_dist(accuracy_log_offset_rng);
  response_logger.accuracy_log_prob = settings.accuracy_log_probability;

  // Generate queries.
  auto sequence_id_start = sequence_gen->CurrentSampleId();
  std::vector<QueryMetadata> queries = GenerateQueries<scenario, mode>(
      settings, loaded_sample_set, sequence_gen, &response_logger);

  // Calculated expected number of queries
  uint64_t expected_queries =
      settings.target_qps * settings.min_duration.count() / 1000;
  uint64_t minimum_queries =
      settings.min_query_count * settings.samples_per_query;
  if (scenario != TestScenario::Offline) {
    expected_queries *= settings.samples_per_query;
  } else {
    minimum_queries = settings.min_sample_count;
  }

  expected_queries =
      expected_queries < minimum_queries ? minimum_queries : expected_queries;

  if (settings.accuracy_log_sampling_target > 0) {
    response_logger.accuracy_log_prob =
        (double)settings.accuracy_log_sampling_target / expected_queries;
  }
  auto sequence_id_end = sequence_gen->CurrentSampleId();
  size_t max_latencies_to_record = sequence_id_end - sequence_id_start;

  // Initialize logger for latency recording.
  GlobalLogger().RestartLatencyRecording(sequence_id_start,
                                         max_latencies_to_record);

  // Create and initialize an IssueQueryState.
  IssueQueryState state{
      sut, &queries, &response_logger, &settings, mode, {}, {}, false, 0,
      0,   {}};
  auto& controller = IssueQueryController::GetInstance();

  // Set number of IssueQueryThreads and wait for the threads to register.
  controller.SetNumThreads(settings.requested.server_num_issue_query_threads);

  // Start issuing the queries.
  controller.StartIssueQueries<scenario>(&state);

  // Gather query issuing statistics.
  const auto start_for_power = state.start_for_power;
  const auto start = state.start_time;
  const auto ran_out_of_generated_queries = state.ran_out_of_generated_queries;
  const auto queries_issued = state.queries_issued;
  const auto expected_latencies = state.expected_latencies;

  // Let the SUT know it should not expect any more queries.
  sut->FlushQueries();

  if (mode == TestMode::PerformanceOnly && ran_out_of_generated_queries) {
    LogDetail([](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
      MLPERF_LOG_ERROR(
          detail, "error_runtime",
          "Ending early: Ran out of generated queries to issue before the "
          "minimum query count and test duration were reached. "
          "Please update the relevant expected latency or target qps in the "
          "TestSettings so they are more accurate.");
#else
      detail.Error(
          "Ending early: Ran out of generated queries to issue before the "
          "minimum query count and test duration were reached.");
      detail(
          "Please update the relevant expected latency or target qps in the "
          "TestSettings so they are more accurate.");
#endif
    });
  }

  // Wait for tail queries to complete and collect all the latencies.
  // We have to keep the synchronization primitives alive until the SUT
  // is done with them.
  auto& final_query = queries[queries_issued - 1];
  std::vector<QuerySampleLatency> sample_latencies(
      GlobalLogger().GetLatenciesBlocking(expected_latencies));

  // Log contention counters after every test as a sanity check.
  GlobalLogger().LogContentionAndAllocations();

  // This properly accounts for the fact that the max completion time may not
  // belong to the final query. It also excludes any time spent postprocessing
  // in the loadgen itself after final completion, which may be significant
  // in the offline scenario.
  PerfClock::time_point max_completion_time =
      GlobalLogger().GetMaxCompletionTime();
  auto sut_active_duration = max_completion_time - start;
  LogDetail([start_for_power, sut_active_duration](AsyncDetail& detail) {
    auto end_for_power =
        start_for_power +
        std::chrono::duration_cast<std::chrono::system_clock::duration>(
            sut_active_duration);
#if USE_NEW_LOGGING_FORMAT
    MLPERF_LOG_INTERVAL_START(detail, "power_begin",
                              DateTimeStringForPower(start_for_power));
    MLPERF_LOG_INTERVAL_END(detail, "power_end",
                            DateTimeStringForPower(end_for_power));
#else
    detail("POWER_BEGIN: ", "mode", ToString(mode), "time",
           DateTimeStringForPower(start_for_power));
    detail("POWER_END: ", "mode", ToString(mode), "time",
           DateTimeStringForPower(end_for_power));
#endif
  });

  double max_latency =
      QuerySampleLatencyToSeconds(GlobalLogger().GetMaxLatencySoFar());
  double final_query_scheduled_time =
      DurationToSeconds(final_query.scheduled_delta);
  double final_query_issued_time =
      DurationToSeconds(final_query.issued_start_time - start);
  double final_query_all_samples_done_time =
      DurationToSeconds(final_query.all_samples_done_time - start);

  std::vector<QuerySampleLatency> query_latencies;
  std::vector<size_t> query_intervals;
  if (scenario == TestScenario::MultiStream ||
      scenario == TestScenario::MultiStreamFree) {
    query_latencies.resize(queries_issued);
    query_intervals.resize(queries_issued);
    for (size_t i = 0; i < queries_issued; i++) {
      query_latencies[i] = DurationGeneratorNs{queries[i].scheduled_time}.delta(
          queries[i].all_samples_done_time);
      if (i < queries_issued - settings.max_async_queries) {
        // For all queries except the last few, take into account actual
        // skipped intervals to the next query.
        query_intervals[i] =
            queries[i + settings.max_async_queries].scheduled_intervals;
      } else {
        // For the last queries, use query latency to guess if imaginary
        // queries issued at the end would have skipped intervals.
        query_intervals[i] =
            std::ceil(settings.target_qps *
                      QuerySampleLatencyToSeconds(query_latencies[i]));
      }
    }
  }

  return PerformanceResult{std::move(sample_latencies),
                           std::move(query_latencies),
                           std::move(query_intervals),
                           queries_issued,
                           max_latency,
                           final_query_scheduled_time,
                           final_query_issued_time,
                           final_query_all_samples_done_time};
}