contrib/cmd/runkperf/commands/bench/cilium_cr_list.go (254 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package bench import ( "context" "fmt" "sync/atomic" "time" internaltypes "github.com/Azure/kperf/contrib/internal/types" "github.com/Azure/kperf/contrib/log" "github.com/Azure/kperf/contrib/utils" "github.com/google/uuid" "golang.org/x/sync/errgroup" "github.com/urfave/cli" ) const ( numCRApplyWorkers = 50 maxNumCRApplyAttempts = 5 kubectlApplyTimeout = 30 * time.Second progressReportInterval = 10 * time.Second installCiliumCRDsFlag = "install-cilium-crds" numCEPFlag = "num-cilium-endpoints" numCIDFlag = "num-cilium-identities" ) var benchCiliumCustomResourceListCase = cli.Command{ Name: "cilium_cr_list", Usage: ` Simulate workload with stale list requests for Cilium custom resources. This benchmark MUST be run in a cluster *without* Cilium installed, so Cilium doesn't delete or modify the synthetic CiliumEndpoint and CiliumIdentity resources created in this test. `, Flags: append( []cli.Flag{ cli.BoolTFlag{ Name: installCiliumCRDsFlag, Usage: "Install Cilium CRDs if they don't already exist (default: true)", }, cli.IntFlag{ Name: numCIDFlag, Usage: "Number of CiliumIdentities to generate (default: 1000)", Value: 1000, }, cli.IntFlag{ Name: numCEPFlag, Usage: "Number of CiliumEndpoints to generate (default: 1000)", Value: 1000, }, }, commonFlags..., ), Action: func(cliCtx *cli.Context) error { _, err := renderBenchmarkReportInterceptor(ciliumCustomResourceListRun)(cliCtx) return err }, } // ciliumCustomResourceListRun runs a benchmark that: // (1) creates many Cilium custom resources (CiliumIdentity and CiliumEndpoint). // (2) executes stale list requests against those resources. // This simulates a "worst case" scenario in which Cilium performs many expensive list requests. func ciliumCustomResourceListRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { ctx := context.Background() rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx, "loadprofile/cilium_cr_list.yaml") if err != nil { return nil, err } defer func() { _ = rgCfgFileDone() }() kubeCfgPath := cliCtx.GlobalString("kubeconfig") kr := utils.NewKubectlRunner(kubeCfgPath, "") if cliCtx.BoolT(installCiliumCRDsFlag) { if err := installCiliumCRDs(ctx, kr); err != nil { return nil, fmt.Errorf("failed to install Cilium CRDs: %w", err) } } numCID := cliCtx.Int(numCIDFlag) numCEP := cliCtx.Int(numCEPFlag) if err := loadCiliumData(ctx, kr, numCID, numCEP); err != nil { return nil, fmt.Errorf("failed to load Cilium data: %w", err) } rgResult, err := utils.DeployRunnerGroup(ctx, cliCtx.GlobalString("kubeconfig"), cliCtx.GlobalString("runner-image"), rgCfgFile, cliCtx.GlobalString("runner-flowcontrol"), cliCtx.GlobalString("rg-affinity"), ) if err != nil { return nil, fmt.Errorf("failed to deploy runner group: %w", err) } return &internaltypes.BenchmarkReport{ Description: fmt.Sprintf(`Deploy %d CiliumIdentities and %d CiliumEndpoints, then run stale list requests against them`, numCID, numCEP), LoadSpec: *rgSpec, Result: *rgResult, Info: map[string]interface{}{ "numCiliumIdentities": numCID, "numCiliumEndpoints": numCEP, }, }, nil } var ciliumCRDs = []string{ "https://raw.githubusercontent.com/cilium/cilium/refs/tags/v1.16.6/pkg/k8s/apis/cilium.io/client/crds/v2/ciliumendpoints.yaml", "https://raw.githubusercontent.com/cilium/cilium/refs/tags/v1.16.6/pkg/k8s/apis/cilium.io/client/crds/v2/ciliumidentities.yaml", } func installCiliumCRDs(ctx context.Context, kr *utils.KubectlRunner) error { log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", "Installing Cilium CRDs...") for _, crdURL := range ciliumCRDs { err := kr.Apply(ctx, kubectlApplyTimeout, crdURL) if err != nil { return fmt.Errorf("failed to apply CRD %s: %v", crdURL, err) } } return nil } func loadCiliumData(ctx context.Context, kr *utils.KubectlRunner, numCID int, numCEP int) error { totalNumResources := numCID + numCEP log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", "Loading Cilium data", "CiliumIdentities", numCID, "CiliumEndpoints", numCEP) // Parallelize kubectl apply to speed it up. Achieves ~80 inserts/sec. taskChan := make(chan string, numCRApplyWorkers*2) var appliedCount atomic.Uint64 g, ctx := errgroup.WithContext(ctx) for i := 0; i < numCRApplyWorkers; i++ { g.Go(func() error { for { select { case <-ctx.Done(): return ctx.Err() case ciliumResourceData, ok := <-taskChan: if !ok { return nil // taskChan closed } var err error for i := 0; i < maxNumCRApplyAttempts; i++ { err = kr.ServerSideApplyWithData(ctx, kubectlApplyTimeout, ciliumResourceData) if err == nil { appliedCount.Add(1) break } else if i < maxNumCRApplyAttempts-1 { log.GetLogger(ctx).WithKeyValues("level", "warn").LogKV("msg", "failed to apply cilium resource data, will retry", "error", err) } } if err != nil { // last retry failed, so give up. return fmt.Errorf("failed to apply cilium resource data: %w", err) } } } }) } // Report progress periodically. reporterDoneChan := make(chan struct{}) g.Go(func() error { timer := time.NewTicker(progressReportInterval) defer timer.Stop() for { select { case <-reporterDoneChan: return nil case <-ctx.Done(): return ctx.Err() case <-timer.C: c := appliedCount.Load() percent := int(float64(c) / float64(totalNumResources) * 100) log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", fmt.Sprintf("applied %d/%d cilium resources (%d%%)", c, totalNumResources, percent)) } } }) // Generate CiliumIdentity and CiliumEndpoint CRs to be applied by the worker goroutines. g.Go(func() error { for i := 0; i < numCID; i++ { select { case <-ctx.Done(): return ctx.Err() case taskChan <- generateCiliumIdentity(): } } for i := 0; i < numCEP; i++ { select { case <-ctx.Done(): return ctx.Err() case taskChan <- generateCiliumEndpoint(): } } close(taskChan) // signal to consumer goroutines that we're done. close(reporterDoneChan) return nil }) if err := g.Wait(); err != nil { return err } log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", fmt.Sprintf("loaded %d CiliumIdentities and %d CiliumEndpoints\n", numCID, numCEP)) return nil } func generateCiliumIdentity() string { identityName := uuid.New().String() return fmt.Sprintf(` apiVersion: cilium.io/v2 kind: CiliumIdentity metadata: name: "%s" security-labels: k8s:io.cilium.k8s.namespace.labels.control-plane: "true" k8s:io.cilium.k8s.namespace.labels.kubernetes.azure.com/managedby: aks k8s:io.cilium.k8s.namespace.labels.kubernetes.io/cluster-service: "true" k8s:io.cilium.k8s.namespace.labels.kubernetes.io/metadata.name: kube-system k8s:io.cilium.k8s.policy.cluster: default k8s:io.cilium.k8s.policy.serviceaccount: coredns k8s:io.kubernetes.pod.namespace: kube-system k8s:k8s-app: kube-dns k8s:kubernetes.azure.com/managedby: aks k8s:version: v20`, identityName) } func generateCiliumEndpoint() string { cepName := uuid.New().String() return fmt.Sprintf(` apiVersion: cilium.io/v2 kind: CiliumEndpoint metadata: name: "%s" status: encryption: {} external-identifiers: container-id: 790d85075c394a8384f8b1a0fec62e2316c2556d175dab0c1fe676e5a6d92f33 k8s-namespace: kube-system k8s-pod-name: coredns-54b69f46b8-dbcdl pod-name: kube-system/coredns-54b69f46b8-dbcdl id: 1453 identity: id: 0000001 labels: - k8s:io.cilium.k8s.namespace.labels.control-plane=true - k8s:io.cilium.k8s.namespace.labels.kubernetes.azure.com/managedby=aks - k8s:io.cilium.k8s.namespace.labels.kubernetes.io/cluster-service=true - k8s:io.cilium.k8s.namespace.labels.kubernetes.io/metadata.name=kube-system - k8s:io.cilium.k8s.policy.cluster=default - k8s:io.cilium.k8s.policy.serviceaccount=coredns - k8s:io.kubernetes.pod.namespace=kube-system - k8s:k8s-app=kube-dns - k8s:kubernetes.azure.com/managedby=aks - k8s:version=v20 named-ports: - name: dns port: 53 protocol: UDP - name: dns-tcp port: 53 protocol: TCP - name: metrics port: 9153 protocol: TCP networking: addressing: - ipv4: 10.244.1.38 node: 10.224.0.4 policy: egress: enforcing: false state: <status disabled> ingress: enforcing: false state: <status disabled> state: ready visibility-policy-status: <status disabled> `, cepName) }