func()

in internal/benchrunner/runners/system/runner.go [136:229]


func (r *runner) setUp(ctx context.Context) error {
	locationManager, err := locations.NewLocationManager()
	if err != nil {
		return fmt.Errorf("reading service logs directory failed: %w", err)
	}

	serviceLogsDir := locationManager.ServiceLogDir()
	r.svcInfo.Logs.Folder.Local = serviceLogsDir
	r.svcInfo.Logs.Folder.Agent = ServiceLogsAgentDir
	r.svcInfo.Test.RunID = common.NewRunID()

	outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.svcInfo.Test.RunID)
	if err != nil {
		return fmt.Errorf("could not create output dir for terraform deployer %w", err)
	}
	r.svcInfo.OutputDir = outputDir

	scenario, err := readConfig(r.options.BenchPath, r.options.BenchName, r.svcInfo)
	if err != nil {
		return err
	}
	r.scenario = scenario

	if r.scenario.Corpora.Generator != nil {
		var err error
		r.generator, err = r.initializeGenerator(ctx)
		if err != nil {
			return fmt.Errorf("can't initialize generator: %w", err)
		}
	}

	pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
	if err != nil {
		return fmt.Errorf("reading package manifest failed: %w", err)
	}

	policy, err := r.createBenchmarkPolicy(ctx, pkgManifest)
	if err != nil {
		return err
	}
	r.benchPolicy = policy

	// Delete old data
	logger.Debug("deleting old data in data stream...")
	dataStreamManifest, err := packages.ReadDataStreamManifest(
		filepath.Join(
			common.DataStreamPath(r.options.PackageRootPath, r.scenario.DataStream.Name),
			packages.DataStreamManifestFile,
		),
	)
	if err != nil {
		return fmt.Errorf("reading data stream manifest failed: %w", err)
	}

	r.runtimeDataStream = fmt.Sprintf(
		"%s-%s.%s-%s",
		dataStreamManifest.Type,
		pkgManifest.Name,
		dataStreamManifest.Name,
		policy.Namespace,
	)
	r.pipelinePrefix = fmt.Sprintf(
		"%s-%s.%s-%s",
		dataStreamManifest.Type,
		pkgManifest.Name,
		dataStreamManifest.Name,
		r.scenario.Version,
	)

	r.wipeDataStreamHandler = func(ctx context.Context) error {
		logger.Debugf("deleting data in data stream...")
		if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil {
			return fmt.Errorf("error deleting data in data stream: %w", err)
		}
		return nil
	}

	if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil {
		return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err)
	}

	cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
		hits, err := common.CountDocsInDataStream(ctx, r.options.ESAPI, r.runtimeDataStream)
		return hits == 0, err
	}, 5*time.Second, 2*time.Minute)
	if err != nil || !cleared {
		if err == nil {
			err = errors.New("unable to clear previous data")
		}
		return err
	}

	return nil
}