in cmd/benchmark.go [416:518]
func streamCommandAction(cmd *cobra.Command, args []string) error {
cmd.Println("Run stream benchmarks for the package")
variant, err := cmd.Flags().GetString(cobraext.VariantFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.VariantFlagName)
}
benchName, err := cmd.Flags().GetString(cobraext.BenchNameFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchNameFlagName)
}
backFill, err := cmd.Flags().GetDuration(cobraext.BenchStreamBackFillFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamBackFillFlagName)
}
if backFill < 0 {
return cobraext.FlagParsingError(errors.New("cannot be a negative duration"), cobraext.BenchStreamBackFillFlagName)
}
eventsPerPeriod, err := cmd.Flags().GetUint64(cobraext.BenchStreamEventsPerPeriodFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamEventsPerPeriodFlagName)
}
if eventsPerPeriod <= 0 {
return cobraext.FlagParsingError(errors.New("cannot be zero or negative"), cobraext.BenchStreamEventsPerPeriodFlagName)
}
periodDuration, err := cmd.Flags().GetDuration(cobraext.BenchStreamPeriodDurationFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamPeriodDurationFlagName)
}
if periodDuration < time.Nanosecond {
return cobraext.FlagParsingError(errors.New("cannot be a negative duration"), cobraext.BenchStreamPeriodDurationFlagName)
}
performCleanup, err := cmd.Flags().GetBool(cobraext.BenchStreamPerformCleanupFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamPerformCleanupFlagName)
}
timestampField, err := cmd.Flags().GetString(cobraext.BenchStreamTimestampFieldFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamTimestampFieldFlagName)
}
packageRootPath, found, err := packages.FindPackageRoot()
if !found {
return errors.New("package root not found")
}
if err != nil {
return fmt.Errorf("locating package root failed: %w", err)
}
profile, err := cobraext.GetProfileFlag(cmd)
if err != nil {
return err
}
ctx, stop := signal.Enable(cmd.Context(), logger.Info)
defer stop()
esClient, err := stack.NewElasticsearchClientFromProfile(profile)
if err != nil {
return fmt.Errorf("can't create Elasticsearch client: %w", err)
}
err = esClient.CheckHealth(ctx)
if err != nil {
return err
}
kc, err := stack.NewKibanaClientFromProfile(profile)
if err != nil {
return fmt.Errorf("can't create Kibana client: %w", err)
}
withOpts := []stream.OptionFunc{
stream.WithVariant(variant),
stream.WithBenchmarkName(benchName),
stream.WithBackFill(backFill),
stream.WithEventsPerPeriod(eventsPerPeriod),
stream.WithPeriodDuration(periodDuration),
stream.WithPerformCleanup(performCleanup),
stream.WithTimestampField(timestampField),
stream.WithPackageRootPath(packageRootPath),
stream.WithESAPI(esClient.API),
stream.WithKibanaClient(kc),
stream.WithProfile(profile),
}
runner := stream.NewStreamBenchmark(stream.NewOptions(withOpts...))
_, err = benchrunner.Run(ctx, runner)
if err != nil {
return fmt.Errorf("error running package stream benchmarks: %w", err)
}
return nil
}