in internal/benchrunner/runners/rally/runner.go [240:344]
func (r *runner) setUp(ctx context.Context) error {
locationManager, err := locations.NewLocationManager()
if err != nil {
return fmt.Errorf("reading service logs directory failed: %w", err)
}
rallyCorpusDir := locationManager.RallyCorpusDir()
r.svcInfo.Logs.Folder.Local = rallyCorpusDir
r.svcInfo.Logs.Folder.Agent = RallyCorpusAgentDir
r.svcInfo.Test.RunID = common.NewRunID()
outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.svcInfo.Test.RunID)
if err != nil {
return fmt.Errorf("could not create output dir for terraform deployer %w", err)
}
r.svcInfo.OutputDir = outputDir
err = r.createRallyTrackDir(locationManager)
if err != nil {
return fmt.Errorf("could not create local rally track dir %w", err)
}
pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("reading package manifest failed: %w", err)
}
scenario, err := readConfig(r.options.PackageRootPath, r.options.BenchName, pkgManifest.Name, pkgManifest.Version)
if err != nil {
return err
}
r.scenario = scenario
if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
}
if r.scenario.Corpora.Generator != nil && len(r.options.CorpusAtPath) == 0 {
var err error
r.generator, err = r.initializeGenerator(ctx)
if err != nil {
return fmt.Errorf("can't initialize generator: %w", err)
}
}
dataStreamManifest, err := packages.ReadDataStreamManifest(
filepath.Join(
common.DataStreamPath(r.options.PackageRootPath, r.scenario.DataStream.Name),
packages.DataStreamManifestFile,
),
)
if err != nil {
return fmt.Errorf("reading data stream manifest failed: %w", err)
}
r.runtimeDataStream = fmt.Sprintf(
"%s-%s.%s-ep",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
)
r.pipelinePrefix = fmt.Sprintf(
"%s-%s.%s-%s",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
r.scenario.Version,
)
if dataStreamManifest.Elasticsearch != nil {
r.isTSDB = dataStreamManifest.Elasticsearch.IndexMode == "time_series"
}
if r.isTSDB {
indexTemplate := fmt.Sprintf(
"%s-%s.%s",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
)
r.indexTemplateBody, err = r.extractSimulatedTemplate(ctx, indexTemplate)
if err != nil {
return fmt.Errorf("error extracting routing path: %s: %w", indexTemplate, err)
}
}
if err := r.wipeDataStreamOnSetup(ctx); err != nil {
return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err)
}
cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, r.runtimeDataStream)
return hits == 0, err
}, 5*time.Second, 2*time.Minute)
if err != nil || !cleared {
if err == nil {
err = errors.New("unable to clear previous data")
}
return err
}
return nil
}