cmd/kperf/commands/runner/runner.go (195 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package runner import ( "context" "encoding/json" "fmt" "os" "path/filepath" "github.com/Azure/kperf/api/types" "github.com/Azure/kperf/cmd/kperf/commands/utils" "github.com/Azure/kperf/metrics" "github.com/Azure/kperf/request" "github.com/urfave/cli" "gopkg.in/yaml.v2" ) // Command represents runner subcommand. var Command = cli.Command{ Name: "runner", Usage: "Setup benchmark to kube-apiserver from one endpoint", Subcommands: []cli.Command{ runCommand, }, } var runCommand = cli.Command{ Name: "run", Usage: "run a benchmark test to kube-apiserver", Flags: []cli.Flag{ cli.StringFlag{ Name: "kubeconfig", Usage: "Path to the kubeconfig file", Value: utils.DefaultKubeConfigPath, }, cli.IntFlag{ Name: "client", Usage: "Total number of HTTP clients", Value: 1, }, cli.StringFlag{ Name: "config", Usage: "Path to the configuration file", Required: true, }, cli.IntFlag{ Name: "conns", Usage: "Total number of connections. It can override corresponding value defined by --config", Value: 1, }, cli.StringFlag{ Name: "content-type", Usage: fmt.Sprintf("Content type (%v or %v)", types.ContentTypeJSON, types.ContentTypeProtobuffer), Value: string(types.ContentTypeJSON), }, cli.Float64Flag{ Name: "rate", Usage: "Maximum requests per second (Zero means no limitation). It can override corresponding value defined by --config", }, cli.IntFlag{ Name: "total", Usage: "Total number of requests. It can override corresponding value defined by --config", Value: 1000, }, cli.StringFlag{ Name: "user-agent", Usage: "User Agent", }, cli.BoolFlag{ Name: "disable-http2", Usage: "Disable HTTP2 protocol", }, cli.IntFlag{ Name: "max-retries", Usage: "Retry request after receiving 429 http code (<=0 means no retry)", Value: 0, }, cli.StringFlag{ Name: "result", Usage: "Path to the file which stores results", }, cli.BoolFlag{ Name: "raw-data", Usage: "show raw letencies data in result", }, }, Action: func(cliCtx *cli.Context) error { kubeCfgPath := cliCtx.String("kubeconfig") profileCfg, err := loadConfig(cliCtx) if err != nil { return err } clientNum := profileCfg.Spec.Conns restClis, err := request.NewClients(kubeCfgPath, clientNum, request.WithClientUserAgentOpt(cliCtx.String("user-agent")), request.WithClientQPSOpt(profileCfg.Spec.Rate), request.WithClientContentTypeOpt(profileCfg.Spec.ContentType), request.WithClientDisableHTTP2Opt(profileCfg.Spec.DisableHTTP2), ) if err != nil { return err } stats, err := request.Schedule(context.TODO(), &profileCfg.Spec, restClis) if err != nil { return err } var f *os.File = os.Stdout outputFilePath := cliCtx.String("result") if outputFilePath != "" { outputFileDir := filepath.Dir(outputFilePath) _, err = os.Stat(outputFileDir) if err != nil && os.IsNotExist(err) { err = os.MkdirAll(outputFileDir, 0750) } if err != nil { return fmt.Errorf("failed to ensure output's dir %s: %w", outputFileDir, err) } f, err = os.Create(outputFilePath) if err != nil { return err } defer f.Close() } rawDataFlagIncluded := cliCtx.Bool("raw-data") err = printResponseStats(f, rawDataFlagIncluded, stats) if err != nil { return fmt.Errorf("error while printing response stats: %w", err) } return nil }, } // loadConfig loads and validates the config. func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { var profileCfg types.LoadProfile cfgPath := cliCtx.String("config") cfgInRaw, err := os.ReadFile(cfgPath) if err != nil { return nil, fmt.Errorf("failed to read file %s: %w", cfgPath, err) } if err := yaml.Unmarshal(cfgInRaw, &profileCfg); err != nil { return nil, fmt.Errorf("failed to unmarshal %s from yaml format: %w", cfgPath, err) } // override value by flags if v := "rate"; cliCtx.IsSet(v) { profileCfg.Spec.Rate = cliCtx.Float64(v) } if v := "conns"; cliCtx.IsSet(v) || profileCfg.Spec.Conns == 0 { profileCfg.Spec.Conns = cliCtx.Int(v) } if v := "client"; cliCtx.IsSet(v) || profileCfg.Spec.Client == 0 { profileCfg.Spec.Client = cliCtx.Int(v) } if v := "total"; cliCtx.IsSet(v) || profileCfg.Spec.Total == 0 { profileCfg.Spec.Total = cliCtx.Int(v) } if v := "content-type"; cliCtx.IsSet(v) || profileCfg.Spec.ContentType == "" { profileCfg.Spec.ContentType = types.ContentType(cliCtx.String(v)) } if v := "disable-http2"; cliCtx.IsSet(v) { profileCfg.Spec.DisableHTTP2 = cliCtx.Bool(v) } if v := "max-retries"; cliCtx.IsSet(v) { profileCfg.Spec.MaxRetries = cliCtx.Int(v) } if err := profileCfg.Validate(); err != nil { return nil, err } return &profileCfg, nil } // printResponseStats prints types.RunnerMetricReport into underlying file. func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error { output := types.RunnerMetricReport{ Total: stats.Total, ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors), Duration: stats.Duration.String(), TotalReceivedBytes: stats.TotalReceivedBytes, PercentileLatenciesByURL: map[string][][2]float64{}, } total := 0 for _, latencies := range stats.LatenciesByURL { total += len(latencies) } latencies := make([]float64, 0, total) for _, l := range stats.LatenciesByURL { latencies = append(latencies, l...) } output.PercentileLatencies = metrics.BuildPercentileLatencies(latencies) for u, l := range stats.LatenciesByURL { output.PercentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(l) } if rawDataFlagIncluded { output.LatenciesByURL = stats.LatenciesByURL output.Errors = stats.Errors } encoder := json.NewEncoder(f) encoder.SetIndent("", " ") err := encoder.Encode(output) if err != nil { return fmt.Errorf("failed to encode json: %w", err) } return nil }