internal/deployers/eksapi/static_cluster.go (168 lines of code) (raw):

package eksapi import ( "bytes" "context" "fmt" "log" "strings" "time" "github.com/aws/aws-k8s-tester/internal/deployers/eksapi/templates" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/client" karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/yaml" ) type StaticClusterManager struct { k8sClient *kubernetes.Clientset karpenterClient client.Client options *deployerOptions } type NodeCondition func(nodes []corev1.Node) bool func NewStaticClusterManager(options *deployerOptions) *StaticClusterManager { return &StaticClusterManager{ options: options, } } func (s *StaticClusterManager) SetK8sClient(kubeconfig string) { cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { log.Fatalf("Failed to build kubeconfig: %v", err) } s.k8sClient, err = kubernetes.NewForConfig(cfg) if err != nil { log.Fatalf("Failed to create Kubernetes client: %v", err) } s.karpenterClient, err = client.New(cfg, client.Options{}) if err != nil { log.Fatalf("Failed to create Karpenter client: %v", err) } } func (s *StaticClusterManager) EnsureNodeForStaticCluster() error { if err := s.CreateNodePool(); err != nil { return err } return s.DeployBusyboxAndWaitForNodes() } func (s *StaticClusterManager) TearDownNodeForStaticCluster() error { if err := s.TearDownBusyboxAndNodes(); err != nil { return err } return s.TearDownNodePool() } func (s *StaticClusterManager) CreateNodePool() error { if !strings.Contains(strings.ToLower(s.options.StaticClusterName), "nvidia") { klog.Info("NVIDIA not in cluster name, skipping node pool creation") return nil } var arch string if strings.Contains(s.options.StaticClusterName, "x86_64") { arch = "amd64" } else if strings.Contains(s.options.StaticClusterName, "aarch64") { arch = "arm64" } else { return fmt.Errorf("unable to determine architecture from cluster name") } t := templates.NvidiaStaticClusterNodepool var buf bytes.Buffer if err := t.Execute(&buf, templates.NvidiaStaticClusterNodepoolTemplateData{ Arch: arch, InstanceTypes: s.options.InstanceTypes, }); err != nil { return err } nodePool := &karpv1.NodePool{} if err := yaml.Unmarshal(buf.Bytes(), nodePool); err != nil { return fmt.Errorf("failed to unmarshal nodepool YAML: %v", err) } ctx := context.TODO() existing := &karpv1.NodePool{} err := s.karpenterClient.Get(ctx, client.ObjectKey{Name: nodePool.Name}, existing) if client.IgnoreNotFound(err) != nil { return err } if errors.IsNotFound(err) { return s.karpenterClient.Create(ctx, nodePool) } return nil } func (s *StaticClusterManager) TearDownNodePool() error { if !strings.Contains(strings.ToLower(s.options.StaticClusterName), "nvidia") { klog.Info("NVIDIA not in cluster name, skipping node pool deletion") return nil } nodePool := &karpv1.NodePool{ ObjectMeta: metav1.ObjectMeta{ Name: "nvidia", }, } if err := s.karpenterClient.Delete(context.TODO(), nodePool); err != nil { if errors.IsNotFound(err) { klog.Info("NodePool 'nvidia' not found, skipping deletion") return nil } return fmt.Errorf("failed to delete nodepool: %v", err) } klog.Info("NodePool deleted successfully") return nil } func (s *StaticClusterManager) DeployBusyboxAndWaitForNodes() error { klog.Infof("Deploying busybox pods") t := templates.BusyboxDeployment var buf bytes.Buffer if err := t.Execute(&buf, templates.BusyboxDeploymentTemplateData{ Nodes: s.options.Nodes, }); err != nil { return err } deployment := &v1.Deployment{} err := yaml.Unmarshal(buf.Bytes(), deployment) if err != nil { return fmt.Errorf("failed to unmarshal deployment: %v", err) } result, err := s.k8sClient.AppsV1().Deployments("default").Create(context.TODO(), deployment, metav1.CreateOptions{}) if err != nil { return err } klog.Infof("Created deployment %q.\n", result.GetObjectMeta().GetName()) return waitForNodeCondition(s.k8sClient, func(nodes []corev1.Node) bool { readyNodes := 0 for _, node := range nodes { if isNodeReady(&node) { readyNodes++ } } klog.Infof("Ready nodes: %d, Expected nodes: %d", readyNodes, s.options.Nodes) return readyNodes >= s.options.Nodes }, 15*time.Minute, "Waiting for nodes to be ready") } func (s *StaticClusterManager) TearDownBusyboxAndNodes() error { klog.Infof("Cleaning up busybox pods") err := s.k8sClient.AppsV1().Deployments("default").Delete(context.TODO(), "busybox-deployment", metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("failed to delete deployment: %v", err) } klog.Info("Busybox deployment deleted successfully") return waitForNodeCondition(s.k8sClient, func(nodes []corev1.Node) bool { return len(nodes) == 0 }, 30*time.Minute, "Waiting for nodes to be removed") } func waitForNodeCondition(clientset *kubernetes.Clientset, condition NodeCondition, timeout time.Duration, description string) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return wait.PollUntilContextTimeout(ctx, 15*time.Second, timeout, true, func(ctx context.Context) (bool, error) { nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return false, err } conditionMet := condition(nodes.Items) klog.Infof("%s: Current node count: %d", description, len(nodes.Items)) return conditionMet, nil }) }