contrib/cmd/runkperf/commands/bench/utils.go (140 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package bench import ( "context" "encoding/json" "fmt" "os" "path/filepath" "github.com/Azure/kperf/api/types" kperfcmdutils "github.com/Azure/kperf/cmd/kperf/commands/utils" internaltypes "github.com/Azure/kperf/contrib/internal/types" "github.com/Azure/kperf/contrib/log" "github.com/Azure/kperf/contrib/utils" "github.com/urfave/cli" "gopkg.in/yaml.v2" ) // subcmdActionFunc is to unify each subcommand's interface. They should return // benchmark report as result. type subcmdActionFunc func(*cli.Context) (*internaltypes.BenchmarkReport, error) // addAPIServerCoresInfoInterceptor adds apiserver's cores into benchmark report. func addAPIServerCoresInfoInterceptor(handler subcmdActionFunc) subcmdActionFunc { return func(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { ctx := context.Background() kubeCfgPath := cliCtx.GlobalString("kubeconfig") beforeCores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath) if ferr != nil { log.GetLogger(ctx). WithKeyValues("level", "warn"). LogKV("msg", "failed to fetch apiserver cores", "error", ferr) } report, err := handler(cliCtx) if err != nil { return nil, err } afterCores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath) if ferr != nil { log.GetLogger(ctx). WithKeyValues("level", "warn"). LogKV("msg", "failed to fetch apiserver cores", "error", ferr) } report.Info["apiserver"] = map[string]interface{}{ "cores": map[string]interface{}{ "before": beforeCores, "after": afterCores, }, } return report, nil } } // renderBenchmarkReportInterceptor renders benchmark report into file or stdout. func renderBenchmarkReportInterceptor(handler subcmdActionFunc) subcmdActionFunc { return func(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { report, err := handler(cliCtx) if err != nil { return nil, err } outF := os.Stdout if targetFile := cliCtx.GlobalString("result"); targetFile != "" { targetFileDir := filepath.Dir(targetFile) _, err = os.Stat(targetFileDir) if err != nil && os.IsNotExist(err) { err = os.MkdirAll(targetFileDir, 0750) } if err != nil { return nil, fmt.Errorf("failed to ensure output's dir %s: %w", targetFileDir, err) } outF, err = os.Create(targetFile) if err != nil { return nil, err } defer outF.Close() } encoder := json.NewEncoder(outF) encoder.SetIndent("", " ") if err := encoder.Encode(report); err != nil { return nil, fmt.Errorf("failed to encode json: %w", err) } return report, nil } } // deployVirtualNodepool deploys virtual nodepool. func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target string, nodes, cpu, memory, maxPods int) (func() error, error) { log.GetLogger(ctx). WithKeyValues("level", "info"). LogKV("msg", "deploying virtual nodepool", "name", target) kubeCfgPath := cliCtx.GlobalString("kubeconfig") virtualNodeAffinity := cliCtx.GlobalString("vc-affinity") kr := utils.NewKperfRunner(kubeCfgPath, "") var sharedProviderID string var err error if cliCtx.GlobalBool("eks") { sharedProviderID, err = utils.FetchNodeProviderIDByType(ctx, kubeCfgPath, utils.EKSIdleNodepoolInstanceType) if err != nil { return nil, fmt.Errorf("failed to get EKS idle node (type: %s) providerID: %w", utils.EKSIdleNodepoolInstanceType, err) } } log.GetLogger(ctx). WithKeyValues("level", "info"). LogKV("msg", "trying to delete nodepool if necessary", "name", target) if err = kr.DeleteNodepool(ctx, 0, target); err != nil { log.GetLogger(ctx). WithKeyValues("level", "warn"). LogKV("msg", "failed to delete nodepool", "name", target, "error", err) } err = kr.NewNodepool(ctx, 0, target, nodes, cpu, memory, maxPods, virtualNodeAffinity, sharedProviderID) if err != nil { return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err) } return func() error { return kr.DeleteNodepool(ctx, 0, target) }, nil } func NewRunnerGroupSpecFromYamlFile() {} // newLoadProfileFromEmbed loads load profile from embed and tweaks that load // profile. func newLoadProfileFromEmbed(cliCtx *cli.Context, name string) (_name string, _spec *types.RunnerGroupSpec, _cleanup func() error, _err error) { var rgSpec types.RunnerGroupSpec rgCfgFile, rgCfgFileDone, err := utils.NewRunnerGroupSpecFileFromEmbed( name, func(spec *types.RunnerGroupSpec) error { reqs := cliCtx.Int("total") if reqs < 0 { return fmt.Errorf("invalid total-requests value: %v", reqs) } rgAffinity := cliCtx.GlobalString("rg-affinity") affinityLabels, err := kperfcmdutils.KeyValuesMap([]string{rgAffinity}) if err != nil { return fmt.Errorf("failed to parse %s affinity: %w", rgAffinity, err) } if reqs != 0 { spec.Profile.Spec.Total = reqs } spec.NodeAffinity = affinityLabels spec.Profile.Spec.ContentType = types.ContentType(cliCtx.String("content-type")) data, _ := yaml.Marshal(spec) log.GetLogger(context.TODO()). WithKeyValues("level", "info"). LogKV("msg", "dump load profile", "config", string(data)) rgSpec = *spec return nil }, ) if err != nil { return "", nil, nil, err } return rgCfgFile, &rgSpec, rgCfgFileDone, nil }