in internal/benchrunner/runners/system/runner.go [136:229]
func (r *runner) setUp(ctx context.Context) error {
locationManager, err := locations.NewLocationManager()
if err != nil {
return fmt.Errorf("reading service logs directory failed: %w", err)
}
serviceLogsDir := locationManager.ServiceLogDir()
r.svcInfo.Logs.Folder.Local = serviceLogsDir
r.svcInfo.Logs.Folder.Agent = ServiceLogsAgentDir
r.svcInfo.Test.RunID = common.NewRunID()
outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.svcInfo.Test.RunID)
if err != nil {
return fmt.Errorf("could not create output dir for terraform deployer %w", err)
}
r.svcInfo.OutputDir = outputDir
scenario, err := readConfig(r.options.BenchPath, r.options.BenchName, r.svcInfo)
if err != nil {
return err
}
r.scenario = scenario
if r.scenario.Corpora.Generator != nil {
var err error
r.generator, err = r.initializeGenerator(ctx)
if err != nil {
return fmt.Errorf("can't initialize generator: %w", err)
}
}
pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("reading package manifest failed: %w", err)
}
policy, err := r.createBenchmarkPolicy(ctx, pkgManifest)
if err != nil {
return err
}
r.benchPolicy = policy
// Delete old data
logger.Debug("deleting old data in data stream...")
dataStreamManifest, err := packages.ReadDataStreamManifest(
filepath.Join(
common.DataStreamPath(r.options.PackageRootPath, r.scenario.DataStream.Name),
packages.DataStreamManifestFile,
),
)
if err != nil {
return fmt.Errorf("reading data stream manifest failed: %w", err)
}
r.runtimeDataStream = fmt.Sprintf(
"%s-%s.%s-%s",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
policy.Namespace,
)
r.pipelinePrefix = fmt.Sprintf(
"%s-%s.%s-%s",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
r.scenario.Version,
)
r.wipeDataStreamHandler = func(ctx context.Context) error {
logger.Debugf("deleting data in data stream...")
if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil {
return fmt.Errorf("error deleting data in data stream: %w", err)
}
return nil
}
if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil {
return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err)
}
cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, r.runtimeDataStream)
return hits == 0, err
}, 5*time.Second, 2*time.Minute)
if err != nil || !cleared {
if err == nil {
err = errors.New("unable to clear previous data")
}
return err
}
return nil
}