in benchmarks/rnnt/ootb/inference/loadgen/issue_query_controller.cc [410:615]
void IssueQueryController::IssueQueriesInternal(size_t query_stride,
size_t thread_idx) {
// Get all the needed information.
auto sut = state->sut;
auto& queries = *state->queries;
auto& response_logger = *state->response_delegate;
// Some book-keeping about the number of queries issued.
size_t queries_issued = 0;
size_t queries_issued_per_iter = 0;
size_t queries_count = queries.size();
// Calculate the min/max queries per issue thread.
const auto& settings = *state->settings;
const size_t min_query_count = settings.min_query_count;
const size_t min_query_count_for_thread =
(thread_idx < (min_query_count % query_stride))
? (min_query_count / query_stride + 1)
: (min_query_count / query_stride);
const size_t max_query_count = settings.max_query_count;
const size_t max_query_count_for_thread =
(thread_idx < (max_query_count % query_stride))
? (max_query_count / query_stride + 1)
: (max_query_count / query_stride);
// Create query scheduler.
const auto start = state->start_time;
QueryScheduler<scenario> query_scheduler(settings, start);
auto last_now = start;
// We can never run out of generated queries in the server scenario,
// since the duration depends on the scheduled query time and not
// the actual issue time.
bool ran_out_of_generated_queries = scenario != TestScenario::Server;
// This is equal to the sum of numbers of samples issued.
size_t expected_latencies = 0;
for (size_t queries_idx = thread_idx; queries_idx < queries_count;
queries_idx += query_stride) {
queries_issued_per_iter = 0;
auto& query = queries[queries_idx];
auto tracer1 =
MakeScopedTracer([](AsyncTrace& trace) { trace("SampleLoop"); });
last_now = query_scheduler.Wait(&query);
// If in Server scenario and server_coalesce_queries is enabled, multiple
// queries are coalesed into one big query if the current time has already
// passed the scheduled time of multiple queries.
if (scenario == TestScenario::Server &&
settings.requested.server_coalesce_queries) {
auto current_query_idx = queries_idx;
for (; queries_idx + query_stride < queries_count;
queries_idx += query_stride) {
auto next_scheduled_time =
start + queries[queries_idx + query_stride].scheduled_delta;
// If current time hasn't reached the next query's scheduled time yet,
// don't include next query.
if (last_now < next_scheduled_time) {
break;
}
queries_issued_per_iter++;
}
if (queries_idx > current_query_idx) {
// Coalesced all the pass due queries.
query.CoalesceQueries(queries.data(), current_query_idx + query_stride,
queries_idx, query_stride);
}
}
// Issue the query to the SUT.
{
auto tracer3 =
MakeScopedTracer([](AsyncTrace& trace) { trace("IssueQuery"); });
sut->IssueQuery(query.query_to_send);
}
// Increment the counter.
expected_latencies += query.query_to_send.size();
queries_issued_per_iter++;
queries_issued += queries_issued_per_iter;
if (scenario == TestScenario::Server &&
settings.requested.server_coalesce_queries) {
// Set the query back to its clean state.
query.Decoalesce();
}
if (state->mode == TestMode::AccuracyOnly) {
// TODO: Rate limit in accuracy mode so accuracy mode works even
// if the expected/target performance is way off.
continue;
}
auto duration = (last_now - start);
if (scenario == TestScenario::Server) {
if (settings.max_async_queries != 0) {
// Checks if there are too many outstanding queries.
size_t queries_issued_total{0};
if (multi_thread) {
// To check actual number of async queries in multi-thread case,
// we would have to combine the number of queries_issued from all
// issue threads.
{
std::lock_guard<std::mutex> lock(state->mtx);
state->queries_issued += queries_issued_per_iter;
queries_issued_total = state->queries_issued;
}
} else {
queries_issued_total = queries_issued;
}
size_t queries_outstanding =
queries_issued_total -
response_logger.queries_completed.load(std::memory_order_relaxed);
if (queries_outstanding > settings.max_async_queries) {
LogDetail([thread_idx, queries_issued_total,
queries_outstanding](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
std::stringstream ss;
ss << "IssueQueryThread " << thread_idx
<< " Ending early: Too many outstanding queries."
<< " issued " << queries_issued_total << " outstanding "
<< queries_outstanding;
MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
detail.Error("IssueQueryThread ", std::to_string(thread_idx),
" Ending early: Too many outstanding queries.",
"issued", std::to_string(queries_issued_total),
"outstanding", std::to_string(queries_outstanding));
#endif
});
break;
}
}
} else {
// Checks if we end normally.
if (queries_issued >= min_query_count_for_thread &&
duration >= settings.target_duration) {
LogDetail([thread_idx](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
MLPERF_LOG(
detail, "generic_message",
"Ending naturally: Minimum query count and test duration met.");
#else
detail(
" Ending naturally: Minimum query count and test duration met.");
#endif
});
ran_out_of_generated_queries = false;
break;
}
}
// Checks if we have exceeded max_query_count for this thread.
if (settings.max_query_count != 0 &&
queries_issued >= max_query_count_for_thread) {
LogDetail([thread_idx, queries_issued](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
std::stringstream ss;
ss << "IssueQueryThread " << thread_idx
<< " Ending early: Max query count reached."
<< " query_count " << queries_issued;
MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
detail.Error("IssueQueryThread ", std::to_string(thread_idx),
" Ending early: Max query count reached.", "query_count",
std::to_string(queries_issued));
#endif
});
ran_out_of_generated_queries = false;
break;
}
// Checks if we have exceeded max_duration.
if (settings.max_duration.count() != 0 &&
duration > settings.max_duration) {
LogDetail([thread_idx, duration](AsyncDetail& detail) {
#if USE_NEW_LOGGING_FORMAT
std::stringstream ss;
ss << "IssueQueryThread " << thread_idx
<< " Ending early: Max test duration reached."
<< " duration_ns " << duration.count();
MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
detail.Error("IssueQueryThread ", std::to_string(thread_idx),
" Ending early: Max test duration reached.", "duration_ns",
std::to_string(duration.count()));
#endif
});
ran_out_of_generated_queries = false;
break;
}
}
// Combine the issuing statistics from multiple issue threads.
{
std::lock_guard<std::mutex> lock(state->mtx);
state->ran_out_of_generated_queries |= ran_out_of_generated_queries;
// In Server scenario and when max_async_queries != 0, we would have set
// state->queries_issued when we check max_async_queries in the loop.
if (!(scenario == TestScenario::Server && settings.max_async_queries != 0 &&
multi_thread)) {
state->queries_issued += queries_issued;
}
state->expected_latencies += expected_latencies;
}
}