func streamCommandAction()

in cmd/benchmark.go [416:518]


func streamCommandAction(cmd *cobra.Command, args []string) error {
	cmd.Println("Run stream benchmarks for the package")

	variant, err := cmd.Flags().GetString(cobraext.VariantFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.VariantFlagName)
	}

	benchName, err := cmd.Flags().GetString(cobraext.BenchNameFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.BenchNameFlagName)
	}

	backFill, err := cmd.Flags().GetDuration(cobraext.BenchStreamBackFillFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.BenchStreamBackFillFlagName)
	}

	if backFill < 0 {
		return cobraext.FlagParsingError(errors.New("cannot be a negative duration"), cobraext.BenchStreamBackFillFlagName)
	}

	eventsPerPeriod, err := cmd.Flags().GetUint64(cobraext.BenchStreamEventsPerPeriodFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.BenchStreamEventsPerPeriodFlagName)
	}

	if eventsPerPeriod <= 0 {
		return cobraext.FlagParsingError(errors.New("cannot be zero or negative"), cobraext.BenchStreamEventsPerPeriodFlagName)
	}

	periodDuration, err := cmd.Flags().GetDuration(cobraext.BenchStreamPeriodDurationFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.BenchStreamPeriodDurationFlagName)
	}

	if periodDuration < time.Nanosecond {
		return cobraext.FlagParsingError(errors.New("cannot be a negative duration"), cobraext.BenchStreamPeriodDurationFlagName)
	}

	performCleanup, err := cmd.Flags().GetBool(cobraext.BenchStreamPerformCleanupFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.BenchStreamPerformCleanupFlagName)
	}

	timestampField, err := cmd.Flags().GetString(cobraext.BenchStreamTimestampFieldFlagName)
	if err != nil {
		return cobraext.FlagParsingError(err, cobraext.BenchStreamTimestampFieldFlagName)
	}

	packageRootPath, found, err := packages.FindPackageRoot()
	if !found {
		return errors.New("package root not found")
	}
	if err != nil {
		return fmt.Errorf("locating package root failed: %w", err)
	}

	profile, err := cobraext.GetProfileFlag(cmd)
	if err != nil {
		return err
	}

	ctx, stop := signal.Enable(cmd.Context(), logger.Info)
	defer stop()

	esClient, err := stack.NewElasticsearchClientFromProfile(profile)
	if err != nil {
		return fmt.Errorf("can't create Elasticsearch client: %w", err)
	}
	err = esClient.CheckHealth(ctx)
	if err != nil {
		return err
	}

	kc, err := stack.NewKibanaClientFromProfile(profile)
	if err != nil {
		return fmt.Errorf("can't create Kibana client: %w", err)
	}

	withOpts := []stream.OptionFunc{
		stream.WithVariant(variant),
		stream.WithBenchmarkName(benchName),
		stream.WithBackFill(backFill),
		stream.WithEventsPerPeriod(eventsPerPeriod),
		stream.WithPeriodDuration(periodDuration),
		stream.WithPerformCleanup(performCleanup),
		stream.WithTimestampField(timestampField),
		stream.WithPackageRootPath(packageRootPath),
		stream.WithESAPI(esClient.API),
		stream.WithKibanaClient(kc),
		stream.WithProfile(profile),
	}

	runner := stream.NewStreamBenchmark(stream.NewOptions(withOpts...))

	_, err = benchrunner.Run(ctx, runner)
	if err != nil {
		return fmt.Errorf("error running package stream benchmarks: %w", err)
	}

	return nil
}