func()

in internal/benchrunner/runners/rally/runner.go [240:344]


func (r *runner) setUp(ctx context.Context) error {
	locationManager, err := locations.NewLocationManager()
	if err != nil {
		return fmt.Errorf("reading service logs directory failed: %w", err)
	}

	rallyCorpusDir := locationManager.RallyCorpusDir()
	r.svcInfo.Logs.Folder.Local = rallyCorpusDir
	r.svcInfo.Logs.Folder.Agent = RallyCorpusAgentDir
	r.svcInfo.Test.RunID = common.NewRunID()

	outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.svcInfo.Test.RunID)
	if err != nil {
		return fmt.Errorf("could not create output dir for terraform deployer %w", err)
	}
	r.svcInfo.OutputDir = outputDir

	err = r.createRallyTrackDir(locationManager)
	if err != nil {
		return fmt.Errorf("could not create local rally track dir %w", err)
	}

	pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
	if err != nil {
		return fmt.Errorf("reading package manifest failed: %w", err)
	}

	scenario, err := readConfig(r.options.PackageRootPath, r.options.BenchName, pkgManifest.Name, pkgManifest.Version)
	if err != nil {
		return err
	}
	r.scenario = scenario

	if err = r.installPackage(ctx); err != nil {
		return fmt.Errorf("error installing package: %w", err)
	}

	if r.scenario.Corpora.Generator != nil && len(r.options.CorpusAtPath) == 0 {
		var err error
		r.generator, err = r.initializeGenerator(ctx)
		if err != nil {
			return fmt.Errorf("can't initialize generator: %w", err)
		}
	}

	dataStreamManifest, err := packages.ReadDataStreamManifest(
		filepath.Join(
			common.DataStreamPath(r.options.PackageRootPath, r.scenario.DataStream.Name),
			packages.DataStreamManifestFile,
		),
	)
	if err != nil {
		return fmt.Errorf("reading data stream manifest failed: %w", err)
	}

	r.runtimeDataStream = fmt.Sprintf(
		"%s-%s.%s-ep",
		dataStreamManifest.Type,
		pkgManifest.Name,
		dataStreamManifest.Name,
	)

	r.pipelinePrefix = fmt.Sprintf(
		"%s-%s.%s-%s",
		dataStreamManifest.Type,
		pkgManifest.Name,
		dataStreamManifest.Name,
		r.scenario.Version,
	)

	if dataStreamManifest.Elasticsearch != nil {
		r.isTSDB = dataStreamManifest.Elasticsearch.IndexMode == "time_series"
	}

	if r.isTSDB {
		indexTemplate := fmt.Sprintf(
			"%s-%s.%s",
			dataStreamManifest.Type,
			pkgManifest.Name,
			dataStreamManifest.Name,
		)

		r.indexTemplateBody, err = r.extractSimulatedTemplate(ctx, indexTemplate)
		if err != nil {
			return fmt.Errorf("error extracting routing path: %s: %w", indexTemplate, err)
		}
	}

	if err := r.wipeDataStreamOnSetup(ctx); err != nil {
		return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err)
	}

	cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
		hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, r.runtimeDataStream)
		return hits == 0, err
	}, 5*time.Second, 2*time.Minute)
	if err != nil || !cleared {
		if err == nil {
			err = errors.New("unable to clear previous data")
		}
		return err
	}

	return nil
}