contrib/cmd/runkperf/commands/warmup/command.go (219 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package warmup import ( "context" "fmt" "sync" "time" "github.com/Azure/kperf/api/types" kperfcmdutils "github.com/Azure/kperf/cmd/kperf/commands/utils" "github.com/Azure/kperf/contrib/log" "github.com/Azure/kperf/contrib/utils" "github.com/urfave/cli" "gopkg.in/yaml.v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) // Command represents warmup subcommand. var Command = cli.Command{ Name: "warmup", Usage: "Warmup cluster and try best to scale it to 8 cores at least", Flags: []cli.Flag{ cli.StringFlag{ Name: "kubeconfig", Usage: "Path to the kubeconfig file", }, cli.StringFlag{ Name: "runner-image", Usage: "The runner's conainer image", // TODO(weifu): // // We should build release pipeline so that we can // build with fixed public release image as default value. // Right now, we need to set image manually. Required: true, }, cli.StringFlag{ Name: "runner-flowcontrol", Usage: "Apply flowcontrol to runner group. (FORMAT: PriorityLevel:MatchingPrecedence)", Value: "workload-low:1000", }, cli.Float64Flag{ Name: "rate", Usage: "The maximum requests per second per runner (There are 10 runners totally)", Value: 20, }, cli.IntFlag{ Name: "total", Usage: "Total requests per runner (There are 10 runners totally and runner's rate is 20)", Value: 10000, }, cli.StringFlag{ Name: "vc-affinity", Usage: "Deploy virtualnode's controller with a specific labels (FORMAT: KEY=VALUE[,VALUE])", Value: "node.kubernetes.io/instance-type=Standard_D8s_v3,m4.2xlarge,n1-standard-8", }, cli.StringFlag{ Name: "rg-affinity", Usage: "Deploy runner group with a specific labels (FORMAT: KEY=VALUE[,VALUE])", Value: "node.kubernetes.io/instance-type=Standard_D16s_v3,m4.4xlarge,n1-standard-16", }, cli.IntFlag{ Name: "core-warmup-ready-threshold", Usage: "Indicates the threshold for core during warm-up", Value: 8, }, cli.BoolFlag{ Name: "eks", Usage: "Indicates the target kubernetes cluster is EKS", Hidden: true, }, }, Action: func(cliCtx *cli.Context) (retErr error) { ctx := context.Background() infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") rgCfgFile, rgCfgFileDone, err := utils.NewRunnerGroupSpecFileFromEmbed( "loadprofile/warmup.yaml", func(spec *types.RunnerGroupSpec) error { reqs := cliCtx.Int("total") if reqs < 0 { return fmt.Errorf("invalid total value: %v", reqs) } rate := cliCtx.Float64("rate") if rate <= 0 { return fmt.Errorf("invalid rate value: %v", rate) } rgAffinity := cliCtx.String("rg-affinity") affinityLabels, err := kperfcmdutils.KeyValuesMap([]string{rgAffinity}) if err != nil { return fmt.Errorf("failed to parse %s affinity: %w", rgAffinity, err) } spec.Profile.Spec.Total = reqs spec.Profile.Spec.Rate = rate spec.NodeAffinity = affinityLabels data, _ := yaml.Marshal(spec) infoLogger.LogKV("msg", "dump load profile", "config", string(data)) return nil }, ) if err != nil { return err } defer func() { _ = rgCfgFileDone() }() kubeCfgPath := cliCtx.String("kubeconfig") isEKS := cliCtx.Bool("eks") virtualNodeAffinity := cliCtx.String("vc-affinity") if isEKS { perr := patchEKSDaemonsetWithoutToleration(ctx, kubeCfgPath) if perr != nil { return perr } } cores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath) if ferr == nil { if isReady(cliCtx, cores) { infoLogger.LogKV("msg", fmt.Sprintf("apiserver resource is ready: %v", cores)) return nil } } else { warnLogger.LogKV("msg", "failed to fetch apiserver cores", "error", ferr) } delNP, err := deployWarmupVirtualNodepool(ctx, kubeCfgPath, isEKS, virtualNodeAffinity) if err != nil { return err } defer func() { derr := delNP() if retErr == nil { retErr = derr } }() var wg sync.WaitGroup wg.Add(1) jobCtx, jobCancel := context.WithCancel(ctx) go func() { defer wg.Done() utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second) }() _, derr := utils.DeployRunnerGroup(ctx, kubeCfgPath, cliCtx.String("runner-image"), rgCfgFile, cliCtx.String("runner-flowcontrol"), "", ) jobCancel() wg.Wait() cores, ferr = utils.FetchAPIServerCores(ctx, kubeCfgPath) if ferr == nil { if isReady(cliCtx, cores) { infoLogger.LogKV("msg", fmt.Sprintf("apiserver resource is ready: %v", cores)) return nil } } return derr }, } // isReady returns true if there are more than two instances using 8 cores. func isReady(cliCtx *cli.Context, cores map[string]int) bool { target := cliCtx.Int("core-warmup-ready-threshold") isEKS := cliCtx.Bool("eks") n := 0 for _, c := range cores { if c >= target { n++ } } return (isEKS && n >= 2) || (!isEKS && n >= 1) } // deployWarmupVirtualNodepool deploys virtual nodepool. func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string, isEKS bool, nodeAffinity string) (func() error, error) { target := "warmup" infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") infoLogger.LogKV("msg", "deploying virtual nodepool", "name", target) kr := utils.NewKperfRunner(kubeCfgPath, "") sharedProviderID := "" var err error if isEKS { sharedProviderID, err = utils.FetchNodeProviderIDByType(ctx, kubeCfgPath, utils.EKSIdleNodepoolInstanceType) if err != nil { return nil, fmt.Errorf("failed to get placeholder providerID: %w", err) } } infoLogger.LogKV("msg", "trying to delete", "nodepool", target) if err = kr.DeleteNodepool(ctx, 0, target); err != nil { warnLogger.LogKV("msg", "failed to delete", "nodepool", target, "error", err) } err = kr.NewNodepool(ctx, 0, target, 100, 32, 96, 110, nodeAffinity, sharedProviderID) if err != nil { return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err) } return func() error { return kr.DeleteNodepool(ctx, 0, target) }, nil } // patchEKSDaemonsetWithoutToleration removes tolerations to avoid pod scheduled // to virtual nodes. func patchEKSDaemonsetWithoutToleration(ctx context.Context, kubeCfgPath string) error { log.GetLogger(ctx).WithKeyValues("level", "info"). LogKV("msg", "trying to removes EKS Daemonset's tolerations to avoid pod scheduled to virtual nodes") clientset := mustClientset(kubeCfgPath) ds := clientset.AppsV1().DaemonSets("kube-system") for _, dn := range []string{"aws-node", "kube-proxy"} { d, err := ds.Get(ctx, dn, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get daemonset %s: %w", dn, err) } d.Spec.Template.Spec.Tolerations = []corev1.Toleration{} d.ResourceVersion = "" _, err = ds.Update(ctx, d, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("failed to delete toleration for daemonset %s: %w", dn, err) } } return nil } // mustClientset returns kubernetes clientset without error. func mustClientset(kubeCfgPath string) *kubernetes.Clientset { config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) if err != nil { panic(fmt.Errorf("failed to build client-go config: %w", err)) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(fmt.Errorf("failed to build client-go rest client: %w", err)) } return clientset }