in internal/benchrunner/runners/system/runner.go [231:284]
func (r *runner) run(ctx context.Context) (report reporters.Reportable, err error) {
var service servicedeployer.DeployedService
if r.scenario.Corpora.InputService != nil {
s, err := r.setupService(ctx)
if errors.Is(err, os.ErrNotExist) {
logger.Debugf("No service deployer defined for this benchmark")
} else if err != nil {
return nil, err
}
service = s
}
r.startMetricsColletion(ctx)
defer r.mcollector.stop()
// if there is a generator config, generate the data
if r.generator != nil {
logger.Debugf("generating corpus data to %s...", r.svcInfo.Logs.Folder.Local)
if err := r.runGenerator(r.svcInfo.Logs.Folder.Local); err != nil {
return nil, fmt.Errorf("can't generate benchmarks data corpus for data stream: %w", err)
}
}
// once data is generated, enroll agents and assign policy
if err := r.enrollAgents(ctx); err != nil {
return nil, err
}
// Signal to the service that the agent is ready (policy is assigned).
if service != nil && r.scenario.Corpora.InputService != nil && r.scenario.Corpora.InputService.Signal != "" {
if err = service.Signal(ctx, r.scenario.Corpora.InputService.Signal); err != nil {
return nil, fmt.Errorf("failed to notify benchmark service: %w", err)
}
}
finishedOnTime, err := r.waitUntilBenchmarkFinishes(ctx)
if err != nil {
return nil, err
}
if !finishedOnTime {
return nil, errors.New("timeout exceeded")
}
msum, err := r.collectAndSummarizeMetrics()
if err != nil {
return nil, fmt.Errorf("can't summarize metrics: %w", err)
}
if err := r.reindexData(ctx); err != nil {
return nil, fmt.Errorf("can't reindex data: %w", err)
}
return createReport(r.options.BenchName, r.corporaFile, r.scenario, msum)
}