func pipelineCommandAction()

in cmd/benchmark.go [109:223]


func pipelineCommandAction(cmd *cobra.Command, args []string) error {
	cmd.Println("Run pipeline benchmarks for the package")

	failOnMissing, err := cmd.Flags().GetBool(cobraext.FailOnMissingFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.FailOnMissingFlagName)
	}

	reportFormat, err := cmd.Flags().GetString(cobraext.ReportFormatFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.ReportFormatFlagName)
	}

	reportOutput, err := cmd.Flags().GetString(cobraext.ReportOutputFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.ReportOutputFlagName)
	}

	useTestSamples, err := cmd.Flags().GetBool(cobraext.BenchWithTestSamplesFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.BenchWithTestSamplesFlagName)
	}

	numTopProcs, err := cmd.Flags().GetInt(cobraext.BenchNumTopProcsFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.BenchNumTopProcsFlagName)
	}

	packageRootPath, found, err := packages.FindPackageRoot()
	if !found {
		return errors.New("package root not found")
	}
	if err != nil {
		return fmt.Errorf("locating package root failed: %w", err)
	}

	dataStreams, err := cmd.Flags().GetStringSlice(cobraext.DataStreamsFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.DataStreamsFlagName)
	}

	if len(dataStreams) > 0 {
		common.TrimStringSlice(dataStreams)

		if err := validateDataStreamsFlag(packageRootPath, dataStreams); err != nil {
			return cobraext.FlagParsingError(err, cobraext.DataStreamsFlagName)
		}
	}

	ctx, stop := signal.Enable(cmd.Context(), logger.Info)
	defer stop()

	benchFolders, err := pipeline.FindBenchmarkFolders(packageRootPath, dataStreams)
	if err != nil {
		return fmt.Errorf("unable to determine benchmark folder paths: %w", err)
	}

	if useTestSamples {
		testFolders, err := testrunner.FindTestFolders(packageRootPath, dataStreams, testrunner.TestType(pipeline.BenchType))
		if err != nil {
			return fmt.Errorf("unable to determine test folder paths: %w", err)
		}
		benchFolders = append(benchFolders, testFolders...)
	}

	if failOnMissing && len(benchFolders) == 0 {
		if len(dataStreams) > 0 {
			return fmt.Errorf("no pipeline benchmarks found for %s data stream(s)", strings.Join(dataStreams, ","))
		}
		return errors.New("no pipeline benchmarks found")
	}

	profile, err := cobraext.GetProfileFlag(cmd)
	if err != nil {
		return err
	}

	esClient, err := stack.NewElasticsearchClientFromProfile(profile)
	if err != nil {
		return fmt.Errorf("can't create Elasticsearch client: %w", err)
	}
	err = esClient.CheckHealth(ctx)
	if err != nil {
		return err
	}

	var results []reporters.Reportable
	for idx, folder := range benchFolders {
		opts := pipeline.NewOptions(
			pipeline.WithBenchmarkName(fmt.Sprintf("%s-%d", folder.Package, idx+1)),
			pipeline.WithFolder(folder),
			pipeline.WithPackageRootPath(packageRootPath),
			pipeline.WithESAPI(esClient.API),
			pipeline.WithNumTopProcs(numTopProcs),
			pipeline.WithFormat(reportFormat),
		)
		runner := pipeline.NewPipelineBenchmark(opts)

		r, err := benchrunner.Run(ctx, runner)

		if err != nil {
			return fmt.Errorf("error running package pipeline benchmarks: %w", err)
		}

		results = append(results, r)
	}

	for _, report := range results {
		if err := reporters.WriteReportable(reporters.Output(reportOutput), report); err != nil {
			return fmt.Errorf("error writing benchmark report: %w", err)
		}
	}

	return nil
}