contrib/cmd/runkperf/commands/data/configmaps/configmap.go (340 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package configmaps import ( "context" "crypto/rand" "fmt" "math/big" "os" "strconv" "strings" "text/tabwriter" "golang.org/x/sync/errgroup" "github.com/Azure/kperf/cmd/kperf/commands/utils" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/flowcontrol" "github.com/urfave/cli" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) var appLebel = "runkperf" var Command = cli.Command{ Name: "configmap", ShortName: "cm", Usage: "Manage configmaps", Flags: []cli.Flag{ cli.StringFlag{ Name: "kubeconfig", Usage: "Path to the kubeconfig file", Value: utils.DefaultKubeConfigPath, }, cli.StringFlag{ Name: "namespace", Usage: "Namespace to use with commands. If the namespace does not exist, it will be created.", Value: "default", }, }, Subcommands: []cli.Command{ configmapAddCommand, configmapDelCommand, configmapListCommand, }, } var configmapAddCommand = cli.Command{ Name: "add", Usage: "Add configmap set", ArgsUsage: "NAME of the configmaps set", Flags: []cli.Flag{ cli.IntFlag{ Name: "size", Usage: "The size of each configmap (Unit: KiB)", Value: 100, }, cli.IntFlag{ Name: "group-size", Usage: "The size of each configmap group", Value: 10, }, cli.IntFlag{ Name: "total", Usage: "Total amount of configmaps", Value: 10, }, }, Action: func(cliCtx *cli.Context) error { if cliCtx.NArg() != 1 { return fmt.Errorf("required only one argument as configmaps set name: %v", cliCtx.Args()) } cmName := strings.TrimSpace(cliCtx.Args().Get(0)) if len(cmName) == 0 { return fmt.Errorf("required non-empty configmap set name") } kubeCfgPath := cliCtx.GlobalString("kubeconfig") size := cliCtx.Int("size") groupSize := cliCtx.Int("group-size") total := cliCtx.Int("total") // Check if the flags are set correctly err := checkConfigmapParams(size, groupSize, total) if err != nil { return err } namespace := cliCtx.GlobalString("namespace") err = prepareNamespace(kubeCfgPath, namespace) if err != nil { return err } clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) if err != nil { return err } err = createConfigmaps(clientset, namespace, cmName, size, groupSize, total) if err != nil { return err } fmt.Printf("Created configmap %s with size %d KiB, group-size %d, total %d\n", cmName, size, groupSize, total) return nil }, } var configmapDelCommand = cli.Command{ Name: "delete", ShortName: "del", ArgsUsage: "NAME", Usage: "Delete a configmaps set", Action: func(cliCtx *cli.Context) error { if cliCtx.NArg() != 1 { return fmt.Errorf("required only one configmaps set name") } cmName := strings.TrimSpace(cliCtx.Args().Get(0)) if len(cmName) == 0 { return fmt.Errorf("required non-empty configmaps set name") } namespace := cliCtx.GlobalString("namespace") kubeCfgPath := cliCtx.GlobalString("kubeconfig") labelSelector := fmt.Sprintf("app=%s,cmName=%s", appLebel, cmName) clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) if err != nil { return err } // Delete each configmap err = deleteConfigmaps(clientset, labelSelector, namespace) if err != nil { return err } fmt.Printf("Deleted configmap %s in %s namespace\n", cmName, namespace) return nil }, } var configmapListCommand = cli.Command{ Name: "list", Usage: "List generated configmaps", ArgsUsage: "NAME", Action: func(cliCtx *cli.Context) error { namespace := cliCtx.GlobalString("namespace") kubeCfgPath := cliCtx.GlobalString("kubeconfig") clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) if err != nil { return err } const ( minWidth = 1 tabWidth = 12 padding = 3 padChar = ' ' flags = 0 ) tw := tabwriter.NewWriter(os.Stdout, minWidth, tabWidth, padding, padChar, flags) fmt.Fprintln(tw, "NAME\tSIZE\tGROUP_SIZE\tTOTAL\t") // Build the label selector // If no args are provided, list all configmaps with the label app=runkperf // If args are provided, list all configmaps with the label app=runkperf and cmName in (args) var labelSelector string if cliCtx.NArg() == 0 { labelSelector = fmt.Sprintf("app=%s", appLebel) } else { args := cliCtx.Args() namesStr := strings.Join(args, ",") labelSelector = fmt.Sprintf("app=%s, cmName in (%s)", appLebel, namesStr) } cmMap := make(map[string][]int) err = listConfigmapsByName(clientset, labelSelector, namespace, cmMap) if err != nil { return err } for key, value := range cmMap { fmt.Fprintf(tw, "%s\t%d\t%d\t%d\n", key, value[0], value[1], value[2], ) } return tw.Flush() }, } func prepareNamespace(kubeCfgPath string, namespace string) error { if namespace == "" { return fmt.Errorf("namespace cannot be empty") } if namespace == "default" { return nil } clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) if err != nil { return err } _, err = clientset.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: namespace, }, }, metav1.CreateOptions{}) if err != nil { // If the namespace already exists, ignore the error if errors.IsAlreadyExists(err) { return nil } return fmt.Errorf("failed to create namespace %s: %v", namespace, err) } return nil } func checkConfigmapParams(size int, groupSize int, total int) error { if size <= 0 { return fmt.Errorf("size must be greater than 0") } if groupSize <= 0 { return fmt.Errorf("group-size must be greater than 0") } if total <= 0 { return fmt.Errorf("total amount must be greater than 0") } if groupSize > total { return fmt.Errorf("group-size must be less than or equal to total") } return nil } func newClientsetWithRateLimiter(kubeCfgPath string, qps float32, burst int) (*kubernetes.Clientset, error) { config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) if err != nil { return nil, err } config.QPS = qps config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst) clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } return clientset, nil } var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randString(n int) (string, error) { if n <= 0 { return "", fmt.Errorf("length must be positive") } b := make([]rune, n) for i := range b { random, err := rand.Int(rand.Reader, big.NewInt(int64(len(letterRunes)))) if err != nil { return "", fmt.Errorf("error generating random number: %w", err) } b[i] = letterRunes[int(random.Int64())] } return string(b), nil } func createConfigmaps(clientset *kubernetes.Clientset, namespace string, cmName string, size int, groupSize int, total int) error { // Generate configmaps in parallel with fixed group size // and random data for i := 0; i < total; i = i + groupSize { ownerID := i g := new(errgroup.Group) for j := i; j < i+groupSize && j < total; j++ { g.Go(func() error { cli := clientset.CoreV1().ConfigMaps(namespace) name := fmt.Sprintf("%s-cm-%s-%d", appLebel, cmName, j) cm := &corev1.ConfigMap{} cm.Name = name // Set the labels for the configmap to easily identify in delete or list commands cm.Labels = map[string]string{ "ownerID": strconv.Itoa(ownerID), "app": appLebel, "cmName": cmName, } data, err := randString(size) if err != nil { return fmt.Errorf("failed to generate random string for configmap %s: %v", name, err) } cm.Data = map[string]string{ "data": data, } _, err = cli.Create(context.TODO(), cm, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("failed to create configmap %s: %v", name, err) } return nil }) } if err := g.Wait(); err != nil { return err } } return nil } func deleteConfigmaps(clientset *kubernetes.Clientset, labelSelector string, namespace string) error { // List all configmaps with the label selector configMaps, err := listConfigmaps(clientset, labelSelector, namespace) if err != nil { return err } if len(configMaps.Items) == 0 { return fmt.Errorf("no configmaps set found in namespace: %s", namespace) } // Delete each configmap in parallel with fixed group size n, batch := len(configMaps.Items), 10 for i := 0; i < n; i = i + batch { g := new(errgroup.Group) for j := i; j < i+batch && j < n; j++ { g.Go(func() error { err := clientset.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), configMaps.Items[j].Name, metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { // Ignore not found errors return fmt.Errorf("failed to delete configmap %s: %v", configMaps.Items[j].Name, err) } return nil }) } if err := g.Wait(); err != nil { return err } } return nil } func listConfigmaps(clientset *kubernetes.Clientset, labelSelector string, namespace string) (*corev1.ConfigMapList, error) { configMaps, err := clientset.CoreV1().ConfigMaps(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { return nil, fmt.Errorf("failed to list configmaps: %v", err) } return configMaps, nil } // Get info of configmaps by name func listConfigmapsByName(clientset *kubernetes.Clientset, labelSelector string, namespace string, cmMap map[string][]int) error { configMaps, err := listConfigmaps(clientset, labelSelector, namespace) if err != nil { return err } for _, cm := range configMaps.Items { name, ok := cm.Labels["cmName"] if !ok { return fmt.Errorf("failed to find the cmName of configmap %s", cm.Name) } _, ok = cmMap[name] if !ok { // Initialize the map with default values // size, group-size, total in int list cmMap[name] = []int{0, 0, 0} // Get the size of the configmap _, ok = cm.Data["data"] if ok { cmMap[name][0] = len(cm.Data["data"]) } } // Increment the total count of configmaps cmMap[name][2]++ if cmMap[name][1] != 0 { continue } ownerID, ok := cm.Labels["ownerID"] if !ok { return fmt.Errorf("failed to find the ownerID of configmap %s", name) } if ownerIDInt, err := strconv.Atoi(ownerID); err == nil { // Use the ownerID to get the group size if ownerIDInt > cmMap[name][1] { cmMap[name][1] = ownerIDInt } } else { return fmt.Errorf("failed to convert ownerID %s to int: %v", ownerID, err) } } return nil }