contrib/cmd/runkperf/commands/bench/node100_pod10k.go (178 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package bench import ( "context" "encoding/json" "fmt" "sync" "time" internaltypes "github.com/Azure/kperf/contrib/internal/types" "github.com/Azure/kperf/contrib/log" "github.com/Azure/kperf/contrib/utils" "github.com/urfave/cli" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var benchNode100DeploymentNPod10KCase = cli.Command{ Name: "node100_pod10k", Usage: ` The test suite is to setup 100 virtual nodes and deploy N deployments for 10k pods on that nodes. It repeats to rolling-update deployments one by one during benchmark. `, Flags: append( []cli.Flag{ cli.IntFlag{ Name: "deployments", Usage: "The total number of deployments for 10k pods", Value: 20, }, cli.IntFlag{ Name: "total", Usage: "Total requests per runner (There are 10 runners totally and runner's rate is 10)", Value: 36000, }, cli.IntFlag{ Name: "padding-bytes", Usage: "Add <key=data, value=randomStringByLen(padding-bytes)> in pod's annotation to increase pod size", Value: 0, }, cli.DurationFlag{ Name: "interval", Usage: "Interval to restart deployments", Value: time.Second * 10, }, }, commonFlags..., ), Action: func(cliCtx *cli.Context) error { _, err := renderBenchmarkReportInterceptor( addAPIServerCoresInfoInterceptor(benchNode100DeploymentNPod10KRun), )(cliCtx) return err }, } // benchNode100DeploymentNPod10KCase is for subcommand benchNode100DeploymentNPod10KCase. func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { ctx := context.Background() kubeCfgPath := cliCtx.GlobalString("kubeconfig") rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx, "loadprofile/node100_pod10k.yaml") if err != nil { return nil, err } defer func() { _ = rgCfgFileDone() }() // NOTE: The nodepool name should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml. vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node100pod10k", 100, cliCtx.Int("cpu"), cliCtx.Int("memory"), cliCtx.Int("max-pods"), ) if err != nil { return nil, fmt.Errorf("failed to deploy virtual node: %w", err) } defer func() { _ = vcDone() }() dpCtx, dpCancel := context.WithCancel(ctx) defer dpCancel() var wg sync.WaitGroup wg.Add(1) restartInterval := cliCtx.Duration("interval") log.GetLogger(dpCtx). WithKeyValues("level", "info"). LogKV("msg", fmt.Sprintf("the interval is %v for restaring deployments", restartInterval)) paddingBytes := cliCtx.Int("padding-bytes") total := cliCtx.Int("deployments") replica := 10000 / total // NOTE: The name pattern should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml. deploymentNamePattern := "benchmark" rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx, kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval) if err != nil { dpCancel() return nil, fmt.Errorf("failed to setup workload: %w", err) } defer ruCleanupFn() err = dumpDeploymentReplicas(ctx, kubeCfgPath, deploymentNamePattern, total) if err != nil { return nil, err } podSize, err := getDeploymentPodSize(ctx, kubeCfgPath, deploymentNamePattern) if err != nil { return nil, err } podSize = (podSize / 1024) * 1024 go func() { defer wg.Done() // FIXME(weifu): // // DeployRunnerGroup should return ready notification. // The rolling update should run after runners. rollingUpdateFn() }() rgResult, derr := utils.DeployRunnerGroup(ctx, cliCtx.GlobalString("kubeconfig"), cliCtx.GlobalString("runner-image"), rgCfgFile, cliCtx.GlobalString("runner-flowcontrol"), cliCtx.GlobalString("rg-affinity"), ) dpCancel() wg.Wait() if derr != nil { return nil, derr } return &internaltypes.BenchmarkReport{ Description: fmt.Sprintf(` Environment: 100 virtual nodes managed by kwok-controller, Workload: Deploy %d deployments with %d pods. Rolling-update deployments one by one and the interval is %v`, total, total*replica, restartInterval), LoadSpec: *rgSpec, Result: *rgResult, Info: map[string]interface{}{ "podSizeInBytes": podSize, "interval": restartInterval.String(), }, }, nil } // dumpDeploymentReplicas dumps deployment's replica. func dumpDeploymentReplicas(ctx context.Context, kubeCfgPath string, namePattern string, total int) error { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") infoLogger.LogKV("msg", "dump deployment's replica information") cli, err := utils.BuildClientset(kubeCfgPath) if err != nil { return err } for i := 0; i < total; i++ { name := fmt.Sprintf("%s-%d", namePattern, i) ns := name dp, err := cli.AppsV1().Deployments(ns).Get(ctx, name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get deployment %s in namespace %s: %w", name, ns, err) } infoLogger.LogKV("msg", "dump Deployment status", "name", name, "ns", ns, "replica", *dp.Spec.Replicas, "readyReplicas", dp.Status.ReadyReplicas) } return nil } // getDeploymentPodSize gets the size of pod created by deployment. func getDeploymentPodSize(ctx context.Context, kubeCfgPath string, namePattern string) (int, error) { ns := fmt.Sprintf("%s-0", namePattern) labelSelector := fmt.Sprintf("app=%s", namePattern) log.GetLogger(ctx). WithKeyValues("level", "info"). LogKV("msg", "get the size of pod", "labelSelector", labelSelector, "namespace", ns) cli, err := utils.BuildClientset(kubeCfgPath) if err != nil { return 0, err } resp, err := cli.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{ LabelSelector: labelSelector, Limit: 1, }) if err != nil { return 0, fmt.Errorf("failed to list pods with labelSelector %s: %w", labelSelector, err) } if len(resp.Items) == 0 { return 0, fmt.Errorf("no pod with labelSelector %s in namespace %s: %w", labelSelector, ns, err) } pod := resp.Items[0] data, err := json.Marshal(pod) if err != nil { return 0, fmt.Errorf("failed to json.Marshal pod: %w", err) } return len(data), nil }