func()

in internal/benchrunner/runners/stream/runner.go [155:225]


func (r *runner) setUp(ctx context.Context) error {
	err := r.initialize()
	if err != nil {
		return err
	}

	err = r.collectGenerators(ctx)
	if err != nil {
		return fmt.Errorf("can't initialize generator: %w", err)
	}

	r.runtimeDataStreams = make(map[string]string)

	r.svcInfo.Test.RunID = common.NewRunID()

	pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
	if err != nil {
		return fmt.Errorf("reading package manifest failed: %w", err)
	}

	if err = r.installPackage(ctx); err != nil {
		return fmt.Errorf("error installing package: %w", err)
	}

	for scenarioName, scenario := range r.scenarios {
		var err error
		dataStreamManifest, err := packages.ReadDataStreamManifest(
			filepath.Join(
				common.DataStreamPath(r.options.PackageRootPath, scenario.DataStream.Name),
				packages.DataStreamManifestFile,
			),
		)
		if err != nil {
			return fmt.Errorf("reading data stream manifest failed: %w", err)
		}
		r.runtimeDataStreams[scenarioName] = fmt.Sprintf(
			"%s-%s.%s-ep",
			dataStreamManifest.Type,
			pkgManifest.Name,
			dataStreamManifest.Name,
		)
	}

	if !r.options.PerformCleanup {
		return nil
	}

	if err := r.wipeDataStreamsOnSetup(ctx); err != nil {
		return fmt.Errorf("error cleaning up old data in data streams: %w", err)
	}

	cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
		totalHits := 0
		for _, runtimeDataStream := range r.runtimeDataStreams {
			hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, runtimeDataStream)
			if err != nil {
				return false, err
			}
			totalHits += hits
		}
		return totalHits == 0, nil
	}, 5*time.Second, 2*time.Minute)
	if err != nil || !cleared {
		if err == nil {
			err = errors.New("unable to clear previous data")
		}
		return err
	}

	return nil
}