in benchmarks/rnnt/ootb/inference/loadgen/loadgen.cc [351:512]
PerformanceResult IssueQueries(SystemUnderTest* sut,
const TestSettingsInternal& settings,
const LoadableSampleSet& loaded_sample_set,
SequenceGen* sequence_gen) {
// Create reponse handler.
ResponseDelegateDetailed<scenario, mode> response_logger;
std::uniform_real_distribution<double> accuracy_log_offset_dist =
std::uniform_real_distribution<double>(0.0, 1.0);
std::mt19937 accuracy_log_offset_rng(settings.accuracy_log_rng_seed);
response_logger.accuracy_log_offset =
accuracy_log_offset_dist(accuracy_log_offset_rng);
response_logger.accuracy_log_prob = settings.accuracy_log_probability;
// Generate queries.
auto sequence_id_start = sequence_gen->CurrentSampleId();
std::vector<QueryMetadata> queries = GenerateQueries<scenario, mode>(
settings, loaded_sample_set, sequence_gen, &response_logger);
// Calculated expected number of queries
uint64_t expected_queries =
settings.target_qps * settings.min_duration.count() / 1000;
uint64_t minimum_queries =
settings.min_query_count * settings.samples_per_query;
if (scenario != TestScenario::Offline) {
expected_queries *= settings.samples_per_query;
} else {
minimum_queries = settings.min_sample_count;
}
expected_queries =
expected_queries < minimum_queries ? minimum_queries : expected_queries;
if (settings.accuracy_log_sampling_target > 0) {
response_logger.accuracy_log_prob =
(double)settings.accuracy_log_sampling_target / expected_queries;
}
auto sequence_id_end = sequence_gen->CurrentSampleId();
size_t max_latencies_to_record = sequence_id_end - sequence_id_start;
// Initialize logger for latency recording.
GlobalLogger().RestartLatencyRecording(sequence_id_start,
max_latencies_to_record);
// Create and initialize an IssueQueryState.
IssueQueryState state{
sut, &queries, &response_logger, &settings, mode, {}, {}, false, 0,
0, {}};
auto& controller = IssueQueryController::GetInstance();
// Set number of IssueQueryThreads and wait for the threads to register.
controller.SetNumThreads(settings.requested.server_num_issue_query_threads);
// Start issuing the queries.
controller.StartIssueQueries<scenario>(&state);
// Gather query issuing statistics.
const auto start_for_power = state.start_for_power;
const auto start = state.start_time;
const auto ran_out_of_generated_queries = state.ran_out_of_generated_queries;
const auto queries_issued = state.queries_issued;
const auto expected_latencies = state.expected_latencies;
// Let the SUT know it should not expect any more queries.
sut->FlushQueries();
if (mode == TestMode::PerformanceOnly && ran_out_of_generated_queries) {
LogDetail([](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
MLPERF_LOG_ERROR(
detail, "error_runtime",
"Ending early: Ran out of generated queries to issue before the "
"minimum query count and test duration were reached. "
"Please update the relevant expected latency or target qps in the "
"TestSettings so they are more accurate.");
#else
detail.Error(
"Ending early: Ran out of generated queries to issue before the "
"minimum query count and test duration were reached.");
detail(
"Please update the relevant expected latency or target qps in the "
"TestSettings so they are more accurate.");
#endif
});
}
// Wait for tail queries to complete and collect all the latencies.
// We have to keep the synchronization primitives alive until the SUT
// is done with them.
auto& final_query = queries[queries_issued - 1];
std::vector<QuerySampleLatency> sample_latencies(
GlobalLogger().GetLatenciesBlocking(expected_latencies));
// Log contention counters after every test as a sanity check.
GlobalLogger().LogContentionAndAllocations();
// This properly accounts for the fact that the max completion time may not
// belong to the final query. It also excludes any time spent postprocessing
// in the loadgen itself after final completion, which may be significant
// in the offline scenario.
PerfClock::time_point max_completion_time =
GlobalLogger().GetMaxCompletionTime();
auto sut_active_duration = max_completion_time - start;
LogDetail([start_for_power, sut_active_duration](AsyncDetail& detail) {
auto end_for_power =
start_for_power +
std::chrono::duration_cast<std::chrono::system_clock::duration>(
sut_active_duration);
#if USE_NEW_LOGGING_FORMAT
MLPERF_LOG_INTERVAL_START(detail, "power_begin",
DateTimeStringForPower(start_for_power));
MLPERF_LOG_INTERVAL_END(detail, "power_end",
DateTimeStringForPower(end_for_power));
#else
detail("POWER_BEGIN: ", "mode", ToString(mode), "time",
DateTimeStringForPower(start_for_power));
detail("POWER_END: ", "mode", ToString(mode), "time",
DateTimeStringForPower(end_for_power));
#endif
});
double max_latency =
QuerySampleLatencyToSeconds(GlobalLogger().GetMaxLatencySoFar());
double final_query_scheduled_time =
DurationToSeconds(final_query.scheduled_delta);
double final_query_issued_time =
DurationToSeconds(final_query.issued_start_time - start);
double final_query_all_samples_done_time =
DurationToSeconds(final_query.all_samples_done_time - start);
std::vector<QuerySampleLatency> query_latencies;
std::vector<size_t> query_intervals;
if (scenario == TestScenario::MultiStream ||
scenario == TestScenario::MultiStreamFree) {
query_latencies.resize(queries_issued);
query_intervals.resize(queries_issued);
for (size_t i = 0; i < queries_issued; i++) {
query_latencies[i] = DurationGeneratorNs{queries[i].scheduled_time}.delta(
queries[i].all_samples_done_time);
if (i < queries_issued - settings.max_async_queries) {
// For all queries except the last few, take into account actual
// skipped intervals to the next query.
query_intervals[i] =
queries[i + settings.max_async_queries].scheduled_intervals;
} else {
// For the last queries, use query latency to guess if imaginary
// queries issued at the end would have skipped intervals.
query_intervals[i] =
std::ceil(settings.target_qps *
QuerySampleLatencyToSeconds(query_latencies[i]));
}
}
}
return PerformanceResult{std::move(sample_latencies),
std::move(query_latencies),
std::move(query_intervals),
queries_issued,
max_latency,
final_query_scheduled_time,
final_query_issued_time,
final_query_all_samples_done_time};
}