in internal/benchrunner/runners/stream/runner.go [155:225]
func (r *runner) setUp(ctx context.Context) error {
err := r.initialize()
if err != nil {
return err
}
err = r.collectGenerators(ctx)
if err != nil {
return fmt.Errorf("can't initialize generator: %w", err)
}
r.runtimeDataStreams = make(map[string]string)
r.svcInfo.Test.RunID = common.NewRunID()
pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("reading package manifest failed: %w", err)
}
if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
}
for scenarioName, scenario := range r.scenarios {
var err error
dataStreamManifest, err := packages.ReadDataStreamManifest(
filepath.Join(
common.DataStreamPath(r.options.PackageRootPath, scenario.DataStream.Name),
packages.DataStreamManifestFile,
),
)
if err != nil {
return fmt.Errorf("reading data stream manifest failed: %w", err)
}
r.runtimeDataStreams[scenarioName] = fmt.Sprintf(
"%s-%s.%s-ep",
dataStreamManifest.Type,
pkgManifest.Name,
dataStreamManifest.Name,
)
}
if !r.options.PerformCleanup {
return nil
}
if err := r.wipeDataStreamsOnSetup(ctx); err != nil {
return fmt.Errorf("error cleaning up old data in data streams: %w", err)
}
cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
totalHits := 0
for _, runtimeDataStream := range r.runtimeDataStreams {
hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, runtimeDataStream)
if err != nil {
return false, err
}
totalHits += hits
}
return totalHits == 0, nil
}, 5*time.Second, 2*time.Minute)
if err != nil || !cleared {
if err == nil {
err = errors.New("unable to clear previous data")
}
return err
}
return nil
}