pkg/clustermanager/cluster_manager.go (774 lines of code) (raw):

package clustermanager import ( "context" "errors" "fmt" "math" "net/url" "os" "regexp" "strings" "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/integer" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/yaml" "github.com/aws/eks-anywhere/pkg/api/v1alpha1" tinkerbellv1 "github.com/aws/eks-anywhere/pkg/api/v1alpha1/thirdparty/tinkerbell/capt/v1beta1" "github.com/aws/eks-anywhere/pkg/clients/kubernetes" "github.com/aws/eks-anywhere/pkg/cluster" "github.com/aws/eks-anywhere/pkg/clusterapi" "github.com/aws/eks-anywhere/pkg/clustermanager/internal" "github.com/aws/eks-anywhere/pkg/constants" "github.com/aws/eks-anywhere/pkg/diagnostics" "github.com/aws/eks-anywhere/pkg/executables" "github.com/aws/eks-anywhere/pkg/filewriter" "github.com/aws/eks-anywhere/pkg/logger" "github.com/aws/eks-anywhere/pkg/providers" "github.com/aws/eks-anywhere/pkg/providers/tinkerbell" "github.com/aws/eks-anywhere/pkg/retrier" "github.com/aws/eks-anywhere/pkg/types" ) const ( maxRetries = 30 defaultBackOffPeriod = 5 * time.Second machineBackoff = 1 * time.Second defaultMachinesMinWait = 30 * time.Minute // DefaultMaxWaitPerMachine is the default max time the cluster manager will wait per a machine. DefaultMaxWaitPerMachine = 10 * time.Minute // DefaultClusterWait is the default max time the cluster manager will wait for the capi cluster to be in ready state. DefaultClusterWait = 60 * time.Minute // DefaultControlPlaneWait is the default time the cluster manager will wait for the control plane to be ready. DefaultControlPlaneWait = 60 * time.Minute // DefaultControlPlaneWaitAfterMove is the default max time the cluster manager will wait for the control plane to be in ready state after the capi move operation. DefaultControlPlaneWaitAfterMove = 15 * time.Minute // DefaultDeploymentWait is the default max time the cluster manager will wait for the deployment to be available. DefaultDeploymentWait = 30 * time.Minute // DefaultEtcdWait is the default time the cluster manager will wait for ectd to be ready. DefaultEtcdWait = 60 * time.Minute // DefaultUnhealthyMachineTimeout is the default timeout for an unhealthy machine health check. DefaultUnhealthyMachineTimeout = 5 * time.Minute // DefaultNodeStartupTimeout is the default timeout for a machine without a node to be considered to have failed machine health check. DefaultNodeStartupTimeout = 10 * time.Minute // DefaultClusterctlMoveTimeout is arbitrarily established. Equal to kubectl wait default timeouts. DefaultClusterctlMoveTimeout = 30 * time.Minute ) var ( clusterctlNetworkErrorRegex = regexp.MustCompile(`.*failed to connect to the management cluster:.*`) clusterctlMoveProvisionedInfraErrorRegex = regexp.MustCompile(`.*failed to check for provisioned infrastructure*`) kubectlResourceNotFoundRegex = regexp.MustCompile(`.*the server doesn't have a resource type "(.*)".*`) eksaClusterResourceType = fmt.Sprintf("clusters.%s", v1alpha1.GroupVersion.Group) ) type ClusterManager struct { eksaComponents EKSAComponents ClientFactory ClientFactory clusterClient ClusterClient retrier *retrier.Retrier writer filewriter.FileWriter diagnosticsFactory diagnostics.DiagnosticBundleFactory machineMaxWait time.Duration machineBackoff time.Duration machinesMinWait time.Duration controlPlaneWaitTimeout time.Duration controlPlaneWaitAfterMoveTimeout time.Duration externalEtcdWaitTimeout time.Duration unhealthyMachineTimeout time.Duration nodeStartupTimeout time.Duration clusterWaitTimeout time.Duration deploymentWaitTimeout time.Duration clusterctlMoveTimeout time.Duration } // ClientFactory builds Kubernetes clients. type ClientFactory interface { // BuildClientFromKubeconfig builds a Kubernetes client from a kubeconfig file. BuildClientFromKubeconfig(kubeconfigPath string) (kubernetes.Client, error) } // CAPIClient performs operations on a cluster-api management cluster. type CAPIClient interface { BackupManagement(ctx context.Context, cluster *types.Cluster, managementStatePath, clusterName string) error MoveManagement(ctx context.Context, from, target *types.Cluster, clusterName string) error InitInfrastructure(ctx context.Context, managementComponents *cluster.ManagementComponents, clusterSpec *cluster.Spec, cluster *types.Cluster, provider providers.Provider) error GetWorkloadKubeconfig(ctx context.Context, clusterName string, cluster *types.Cluster) ([]byte, error) } // EKSAComponents allows to manage the eks-a components installation in a cluster. type EKSAComponents interface { Install(ctx context.Context, log logr.Logger, cluster *types.Cluster, managementComponents *cluster.ManagementComponents, spec *cluster.Spec) error Upgrade(ctx context.Context, log logr.Logger, cluster *types.Cluster, currentManagementComponents, newManagementComponents *cluster.ManagementComponents, newSpec *cluster.Spec) (*types.ChangeDiff, error) } type ClusterManagerOpt func(*ClusterManager) // DefaultRetrier builds a retrier with the default configuration. func DefaultRetrier() *retrier.Retrier { return retrier.NewWithMaxRetries(maxRetries, defaultBackOffPeriod) } // New constructs a new ClusterManager. func New(client ClientFactory, clusterClient ClusterClient, writer filewriter.FileWriter, diagnosticBundleFactory diagnostics.DiagnosticBundleFactory, eksaComponents EKSAComponents, opts ...ClusterManagerOpt) *ClusterManager { c := &ClusterManager{ eksaComponents: eksaComponents, ClientFactory: client, clusterClient: clusterClient, writer: writer, retrier: DefaultRetrier(), diagnosticsFactory: diagnosticBundleFactory, machineMaxWait: DefaultMaxWaitPerMachine, machineBackoff: machineBackoff, machinesMinWait: defaultMachinesMinWait, controlPlaneWaitTimeout: DefaultControlPlaneWait, controlPlaneWaitAfterMoveTimeout: DefaultControlPlaneWaitAfterMove, externalEtcdWaitTimeout: DefaultEtcdWait, unhealthyMachineTimeout: DefaultUnhealthyMachineTimeout, nodeStartupTimeout: DefaultNodeStartupTimeout, clusterWaitTimeout: DefaultClusterWait, deploymentWaitTimeout: DefaultDeploymentWait, clusterctlMoveTimeout: DefaultClusterctlMoveTimeout, } for _, o := range opts { o(c) } return c } func WithControlPlaneWaitTimeout(timeout time.Duration) ClusterManagerOpt { return func(c *ClusterManager) { c.controlPlaneWaitTimeout = timeout } } func WithExternalEtcdWaitTimeout(timeout time.Duration) ClusterManagerOpt { return func(c *ClusterManager) { c.externalEtcdWaitTimeout = timeout } } func WithMachineBackoff(machineBackoff time.Duration) ClusterManagerOpt { return func(c *ClusterManager) { c.machineBackoff = machineBackoff } } func WithMachineMaxWait(machineMaxWait time.Duration) ClusterManagerOpt { return func(c *ClusterManager) { c.machineMaxWait = machineMaxWait } } func WithMachineMinWait(machineMinWait time.Duration) ClusterManagerOpt { return func(c *ClusterManager) { c.machinesMinWait = machineMinWait } } // WithUnhealthyMachineTimeout sets the timeout of an unhealthy machine health check. func WithUnhealthyMachineTimeout(timeout time.Duration) ClusterManagerOpt { return func(c *ClusterManager) { c.unhealthyMachineTimeout = timeout } } // WithNodeStartupTimeout sets the timeout of a machine without a node to be considered to have failed machine health check. func WithNodeStartupTimeout(timeout time.Duration) ClusterManagerOpt { return func(c *ClusterManager) { c.nodeStartupTimeout = timeout } } func WithRetrier(retrier *retrier.Retrier) ClusterManagerOpt { return func(c *ClusterManager) { c.retrier = retrier } } // WithNoTimeouts disables the timeout for all the waits and retries in cluster manager. func WithNoTimeouts() ClusterManagerOpt { return func(c *ClusterManager) { noTimeoutRetrier := retrier.NewWithNoTimeout() maxTime := time.Duration(math.MaxInt64) c.retrier = noTimeoutRetrier c.machinesMinWait = maxTime c.controlPlaneWaitTimeout = maxTime c.controlPlaneWaitAfterMoveTimeout = maxTime c.externalEtcdWaitTimeout = maxTime c.clusterWaitTimeout = maxTime c.deploymentWaitTimeout = maxTime c.clusterctlMoveTimeout = maxTime } } func clusterctlMoveWaitForInfrastructureRetryPolicy(totalRetries int, err error) (retry bool, wait time.Duration) { // Retry both network and cluster move errors. if match := (clusterctlNetworkErrorRegex.MatchString(err.Error()) || clusterctlMoveProvisionedInfraErrorRegex.MatchString(err.Error())); match { return true, exponentialRetryWaitTime(totalRetries) } return false, 0 } func clusterctlMoveRetryPolicy(totalRetries int, err error) (retry bool, wait time.Duration) { // Retry only network errors. if match := clusterctlNetworkErrorRegex.MatchString(err.Error()); match { return true, exponentialRetryWaitTime(totalRetries) } return false, 0 } func kubectlWaitRetryPolicy(totalRetries int, err error) (retry bool, wait time.Duration) { // Sometimes it is possible that the clusterctl move is successful, // but the clusters.cluster.x-k8s.io resource is not available on the cluster yet. // // Retry on transient 'server doesn't have a resource type' errors. // Use existing exponential backoff implementation for retry on these errors. if match := kubectlResourceNotFoundRegex.MatchString(err.Error()); match { return true, exponentialRetryWaitTime(totalRetries) } return false, 0 } func exponentialRetryWaitTime(totalRetries int) time.Duration { // Exponential backoff on errors. Retrier built-in backoff is linear, so implementing here. // Retrier first calls the policy before retry #1. We want it zero-based for exponentiation. if totalRetries < 1 { totalRetries = 1 } const networkFaultBaseRetryTime = 10 * time.Second const backoffFactor = 1.5 return time.Duration(float64(networkFaultBaseRetryTime) * math.Pow(backoffFactor, float64(totalRetries-1))) } // BackupCAPI takes backup of management cluster's resources during the upgrade process. func (c *ClusterManager) BackupCAPI(ctx context.Context, cluster *types.Cluster, managementStatePath, clusterName string) error { // Network errors, most commonly connection refused or timeout, can occur if either source // cluster becomes inaccessible during the move operation. If this occurs without retries, clusterctl // abandons the move operation, and fails cluster upgrade. // Retrying once connectivity is re-established completes the partial move. // Here we use a retrier, with the above defined clusterctlMoveRetryPolicy policy, to attempt to // wait out the network disruption and complete the move. // Keeping clusterctlMoveTimeout to the same as MoveManagement since both uses the same command with the differrent params. r := retrier.New(c.clusterctlMoveTimeout, retrier.WithRetryPolicy(clusterctlMoveRetryPolicy)) return c.backupCAPI(ctx, cluster, managementStatePath, clusterName, r) } // BackupCAPIWaitForInfrastructure takes backup of bootstrap cluster's resources during the upgrade process // like BackupCAPI but with a retry policy to wait for infrastructure provisioning in addition to network errors. func (c *ClusterManager) BackupCAPIWaitForInfrastructure(ctx context.Context, cluster *types.Cluster, managementStatePath, clusterName string) error { r := retrier.New(c.clusterctlMoveTimeout, retrier.WithRetryPolicy(clusterctlMoveWaitForInfrastructureRetryPolicy)) return c.backupCAPI(ctx, cluster, managementStatePath, clusterName, r) } func (c *ClusterManager) backupCAPI(ctx context.Context, cluster *types.Cluster, managementStatePath, clusterName string, retrier *retrier.Retrier) error { err := retrier.Retry(func() error { return c.clusterClient.BackupManagement(ctx, cluster, managementStatePath, clusterName) }) if err != nil { return fmt.Errorf("backing up CAPI resources of management cluster before moving to bootstrap cluster: %v", err) } return nil } func (c *ClusterManager) MoveCAPI(ctx context.Context, from, to *types.Cluster, clusterName string, clusterSpec *cluster.Spec, checkers ...types.NodeReadyChecker) error { logger.V(3).Info("Waiting for management machines to be ready before move") labels := []string{clusterv1.MachineControlPlaneNameLabel, clusterv1.MachineDeploymentNameLabel} if err := c.waitForNodesReady(ctx, from, clusterName, labels, checkers...); err != nil { return err } logger.V(3).Info("Waiting for management cluster to be ready before move") if err := c.clusterClient.WaitForClusterReady(ctx, from, c.clusterWaitTimeout.String(), clusterName); err != nil { return err } bootStrapClient, err := c.ClientFactory.BuildClientFromKubeconfig(from.KubeconfigFile) if err != nil { return fmt.Errorf("building bootstrap cluster client: %w", err) } if clusterSpec.Cluster.Spec.DatacenterRef.Kind == v1alpha1.TinkerbellDatacenterKind { r := retrier.New(c.clusterctlMoveTimeout, retrier.WithRetryPolicy(clusterctlMoveRetryPolicy)) err = r.Retry(func() error { return updateTinkerbellIPInBootstrapTinkerbellMachineTemplate(ctx, clusterSpec, bootStrapClient) }) if err != nil { return fmt.Errorf("updating Tinkerbell IP in tinkerbell machine templates: %w", err) } } // Network errors, most commonly connection refused or timeout, can occur if either source or target // cluster becomes inaccessible during the move operation. If this occurs without retries, clusterctl // abandons the move operation, leaving an unpredictable subset of the CAPI components copied to target // or deleted from source. Retrying once connectivity is re-established completes the partial move. // Here we use a retrier, with the above defined clusterctlMoveRetryPolicy policy, to attempt to // wait out the network disruption and complete the move. r := retrier.New(c.clusterctlMoveTimeout, retrier.WithRetryPolicy(clusterctlMoveRetryPolicy)) err = r.Retry(func() error { return c.clusterClient.MoveManagement(ctx, from, to, clusterName) }) if err != nil { return fmt.Errorf("moving CAPI management from source to target: %v", err) } logger.V(3).Info("Waiting for workload cluster control plane to be ready after move") r = retrier.New(c.clusterctlMoveTimeout, retrier.WithRetryPolicy(kubectlWaitRetryPolicy)) err = r.Retry(func() error { return c.clusterClient.WaitForControlPlaneReady(ctx, to, c.controlPlaneWaitAfterMoveTimeout.String(), clusterName) }) if err != nil { return err } logger.V(3).Info("Waiting for workload cluster control plane replicas to be ready after move") err = c.waitForControlPlaneReplicasReady(ctx, to, clusterSpec) if err != nil { return fmt.Errorf("waiting for workload cluster control plane replicas to be ready: %v", err) } logger.V(3).Info("Waiting for workload cluster machine deployment replicas to be ready after move") err = c.waitForMachineDeploymentReplicasReady(ctx, to, clusterSpec) if err != nil { return fmt.Errorf("waiting for workload cluster machinedeployment replicas to be ready: %v", err) } logger.V(3).Info("Waiting for machines to be ready after move") if err = c.waitForNodesReady(ctx, to, clusterName, labels, checkers...); err != nil { return err } return nil } // CreateRegistryCredSecret creates the registry-credentials secret on a managment cluster. func (c *ClusterManager) CreateRegistryCredSecret(ctx context.Context, mgmt *types.Cluster) error { secret := &corev1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", APIVersion: corev1.SchemeGroupVersion.Version, }, ObjectMeta: metav1.ObjectMeta{ Namespace: constants.EksaSystemNamespace, Name: "registry-credentials", }, Data: map[string][]byte{ "username": []byte(os.Getenv("REGISTRY_USERNAME")), "password": []byte(os.Getenv("REGISTRY_PASSWORD")), }, } return c.clusterClient.Apply(ctx, mgmt.KubeconfigFile, secret) } // InstallCAPI installs the cluster-api components in a cluster. func (c *ClusterManager) InstallCAPI(ctx context.Context, managementComponents *cluster.ManagementComponents, clusterSpec *cluster.Spec, cluster *types.Cluster, provider providers.Provider) error { err := c.clusterClient.InitInfrastructure(ctx, managementComponents, clusterSpec, cluster, provider) if err != nil { return fmt.Errorf("initializing capi resources in cluster: %v", err) } return c.waitForCAPI(ctx, cluster, provider, clusterSpec.Cluster.Spec.ExternalEtcdConfiguration != nil) } func (c *ClusterManager) waitForCAPI(ctx context.Context, cluster *types.Cluster, provider providers.Provider, externalEtcdTopology bool) error { err := c.waitForDeployments(ctx, internal.CAPIDeployments, cluster, c.deploymentWaitTimeout.String()) if err != nil { return err } if externalEtcdTopology { err := c.waitForDeployments(ctx, internal.ExternalEtcdDeployments, cluster, c.deploymentWaitTimeout.String()) if err != nil { return err } } err = c.waitForDeployments(ctx, provider.GetDeployments(), cluster, c.deploymentWaitTimeout.String()) if err != nil { return err } return nil } func (c *ClusterManager) waitForDeployments(ctx context.Context, deploymentsByNamespace map[string][]string, cluster *types.Cluster, timeout string) error { for namespace, deployments := range deploymentsByNamespace { for _, deployment := range deployments { err := c.clusterClient.WaitForDeployment(ctx, cluster, timeout, "Available", deployment, namespace) if err != nil { return fmt.Errorf("waiting for %s in namespace %s: %v", deployment, namespace, err) } } } return nil } func (c *ClusterManager) SaveLogsManagementCluster(ctx context.Context, spec *cluster.Spec, cluster *types.Cluster) error { if cluster == nil { return nil } if cluster.KubeconfigFile == "" { return nil } bundle, err := c.diagnosticsFactory.DiagnosticBundleManagementCluster(spec, cluster.KubeconfigFile) if err != nil { logger.V(5).Info("Error generating support bundle for management cluster", "error", err) return nil } return collectDiagnosticBundle(ctx, bundle) } func (c *ClusterManager) SaveLogsWorkloadCluster(ctx context.Context, provider providers.Provider, spec *cluster.Spec, cluster *types.Cluster) error { if cluster == nil { return nil } if cluster.KubeconfigFile == "" { return nil } bundle, err := c.diagnosticsFactory.DiagnosticBundleWorkloadCluster(spec, provider, cluster.KubeconfigFile) if err != nil { logger.V(5).Info("Error generating support bundle for workload cluster", "error", err) return nil } return collectDiagnosticBundle(ctx, bundle) } func collectDiagnosticBundle(ctx context.Context, bundle diagnostics.DiagnosticBundle) error { var sinceTimeValue *time.Time threeHours := "3h" sinceTimeValue, err := diagnostics.ParseTimeFromDuration(threeHours) if err != nil { logger.V(5).Info("Error parsing time options for support bundle generation", "error", err) return nil } err = bundle.CollectAndAnalyze(ctx, sinceTimeValue) if err != nil { logger.V(5).Info("Error collecting and saving logs", "error", err) } return nil } func (c *ClusterManager) waitForControlPlaneReplicasReady(ctx context.Context, managementCluster *types.Cluster, clusterSpec *cluster.Spec) error { isCpReady := func() error { return c.clusterClient.ValidateControlPlaneNodes(ctx, managementCluster, clusterSpec.Cluster.Name) } err := isCpReady() if err == nil { return nil } timeout := c.totalTimeoutForMachinesReadyWait(clusterSpec.Cluster.Spec.ControlPlaneConfiguration.Count) r := retrier.New(timeout) if err := r.Retry(isCpReady); err != nil { return fmt.Errorf("retries exhausted waiting for controlplane replicas to be ready: %v", err) } return nil } func (c *ClusterManager) waitForMachineDeploymentReplicasReady(ctx context.Context, managementCluster *types.Cluster, clusterSpec *cluster.Spec) error { ready, total := 0, 0 policy := func(_ int, _ error) (bool, time.Duration) { return true, c.machineBackoff * time.Duration(integer.IntMax(1, total-ready)) } var machineDeploymentReplicasCount int for _, workerNodeGroupConfiguration := range clusterSpec.Cluster.Spec.WorkerNodeGroupConfigurations { machineDeploymentReplicasCount += *workerNodeGroupConfiguration.Count } areMdReplicasReady := func() error { var err error ready, total, err = c.clusterClient.CountMachineDeploymentReplicasReady(ctx, clusterSpec.Cluster.Name, managementCluster.KubeconfigFile) if err != nil { return err } if ready != total { return fmt.Errorf("%d machine deployment replicas are not ready", total-ready) } return nil } timeout := c.totalTimeoutForMachinesReadyWait(machineDeploymentReplicasCount) r := retrier.New(timeout, retrier.WithRetryPolicy(policy)) if err := r.Retry(areMdReplicasReady); err != nil { return fmt.Errorf("retries exhausted waiting for machinedeployment replicas to be ready: %v", err) } return nil } // totalTimeoutForMachinesReadyWait calculates the total timeout when waiting for machines to be ready. // The timeout increases linearly with the number of machines but can never be less than the configured // minimun. func (c *ClusterManager) totalTimeoutForMachinesReadyWait(replicaCount int) time.Duration { timeout := time.Duration(replicaCount) * c.machineMaxWait if timeout <= c.machinesMinWait { timeout = c.machinesMinWait } return timeout } func (c *ClusterManager) waitForNodesReady(ctx context.Context, managementCluster *types.Cluster, clusterName string, labels []string, checkers ...types.NodeReadyChecker) error { totalNodes, err := c.getNodesCount(ctx, managementCluster, clusterName, labels) if err != nil { return fmt.Errorf("getting the total count of nodes: %v", err) } readyNodes := 0 policy := func(_ int, _ error) (bool, time.Duration) { return true, c.machineBackoff * time.Duration(integer.IntMax(1, totalNodes-readyNodes)) } areNodesReady := func() error { var err error readyNodes, err = c.countNodesReady(ctx, managementCluster, clusterName, labels, checkers...) if err != nil { return err } if readyNodes != totalNodes { logger.V(4).Info("Nodes are not ready yet", "total", totalNodes, "ready", readyNodes, "cluster name", clusterName) return errors.New("nodes are not ready yet") } logger.V(4).Info("Nodes ready", "total", totalNodes) return nil } err = areNodesReady() if err == nil { return nil } timeout := c.totalTimeoutForMachinesReadyWait(totalNodes) r := retrier.New(timeout, retrier.WithRetryPolicy(policy)) if err := r.Retry(areNodesReady); err != nil { return fmt.Errorf("retries exhausted waiting for machines to be ready: %v", err) } return nil } func (c *ClusterManager) getNodesCount(ctx context.Context, managementCluster *types.Cluster, clusterName string, labels []string) (int, error) { totalNodes := 0 labelsMap := make(map[string]interface{}, len(labels)) for _, label := range labels { labelsMap[label] = nil } if _, ok := labelsMap[clusterv1.MachineControlPlaneNameLabel]; ok { kcp, err := c.clusterClient.GetKubeadmControlPlane(ctx, managementCluster, clusterName, executables.WithCluster(managementCluster), executables.WithNamespace(constants.EksaSystemNamespace)) if err != nil { return 0, fmt.Errorf("getting KubeadmControlPlane for cluster %s: %v", clusterName, err) } totalNodes += int(*kcp.Spec.Replicas) } if _, ok := labelsMap[clusterv1.MachineDeploymentNameLabel]; ok { mds, err := c.clusterClient.GetMachineDeploymentsForCluster(ctx, clusterName, executables.WithCluster(managementCluster), executables.WithNamespace(constants.EksaSystemNamespace)) if err != nil { return 0, fmt.Errorf("getting KubeadmControlPlane for cluster %s: %v", clusterName, err) } for _, md := range mds { totalNodes += int(*md.Spec.Replicas) } } return totalNodes, nil } func (c *ClusterManager) countNodesReady(ctx context.Context, managementCluster *types.Cluster, clusterName string, labels []string, checkers ...types.NodeReadyChecker) (ready int, err error) { machines, err := c.clusterClient.GetMachines(ctx, managementCluster, clusterName) if err != nil { return 0, fmt.Errorf("getting machines resources from management cluster: %v", err) } for _, m := range machines { // Extracted from cluster-api: NodeRef is considered a better signal than InfrastructureReady, // because it ensures the node in the workload cluster is up and running. if !m.HasAnyLabel(labels) { continue } passed := true for _, checker := range checkers { if !checker(m.Status) { passed = false break } } if passed { ready += 1 } } return ready, nil } // Upgrade updates the eksa components in a cluster according to a Spec. func (c *ClusterManager) Upgrade(ctx context.Context, cluster *types.Cluster, currentManagementComponents, newManagementComponents *cluster.ManagementComponents, newSpec *cluster.Spec) (*types.ChangeDiff, error) { return c.eksaComponents.Upgrade(ctx, logger.Get(), cluster, currentManagementComponents, newManagementComponents, newSpec) } func (c *ClusterManager) CreateEKSANamespace(ctx context.Context, cluster *types.Cluster) error { return c.clusterClient.CreateNamespaceIfNotPresent(ctx, cluster.KubeconfigFile, constants.EksaSystemNamespace) } func (c *ClusterManager) ApplyBundles(ctx context.Context, clusterSpec *cluster.Spec, cluster *types.Cluster) error { bundleObj, err := yaml.Marshal(clusterSpec.Bundles) if err != nil { return fmt.Errorf("outputting bundle yaml: %v", err) } logger.V(1).Info("Applying Bundles to cluster") err = c.clusterClient.ApplyKubeSpecFromBytes(ctx, cluster, bundleObj) if err != nil { return fmt.Errorf("applying bundle spec: %v", err) } // We need to update this config map with the new upgrader images whenever we // apply a new Bundles object to the cluster in order to support in-place upgrades. cm, err := c.getUpgraderImagesFromBundle(ctx, cluster, clusterSpec) if err != nil { return fmt.Errorf("getting upgrader images from bundle: %v", err) } if err = c.clusterClient.Apply(ctx, cluster.KubeconfigFile, cm); err != nil { return fmt.Errorf("applying upgrader images config map: %v", err) } return nil } // ApplyReleases applies the EKSARelease manifest. func (c *ClusterManager) ApplyReleases(ctx context.Context, clusterSpec *cluster.Spec, cluster *types.Cluster) error { releaseObj, err := yaml.Marshal(clusterSpec.EKSARelease) if err != nil { return fmt.Errorf("outputting release yaml: %v", err) } logger.V(1).Info("Applying EKSARelease to cluster") err = c.clusterClient.ApplyKubeSpecFromBytes(ctx, cluster, releaseObj) if err != nil { return fmt.Errorf("applying release spec: %v", err) } return nil } // PauseCAPIWorkloadClusters pauses all workload CAPI clusters except the management cluster. func (c *ClusterManager) PauseCAPIWorkloadClusters(ctx context.Context, managementCluster *types.Cluster) error { clusters, err := c.clusterClient.GetClusters(ctx, managementCluster) if err != nil { return err } for _, w := range clusters { // skip pausing management cluster if w.Metadata.Name == managementCluster.Name { continue } if err = c.clusterClient.PauseCAPICluster(ctx, w.Metadata.Name, managementCluster.KubeconfigFile); err != nil { return err } } return nil } func (c *ClusterManager) resumeEksaReconcileForManagementAndWorkloadClusters(ctx context.Context, managementCluster *types.Cluster, clusterSpec *cluster.Spec, provider providers.Provider) error { clusters := &v1alpha1.ClusterList{} err := c.clusterClient.ListObjects(ctx, eksaClusterResourceType, clusterSpec.Cluster.Namespace, managementCluster.KubeconfigFile, clusters) if err != nil { return err } for _, w := range clusters.Items { if w.ManagedBy() != clusterSpec.Cluster.Name { continue } if err := c.resumeReconcileForCluster(ctx, managementCluster, &w, provider); err != nil { return err } } return nil } // ResumeEKSAControllerReconcile resumes a paused EKS-Anywhere cluster. func (c *ClusterManager) ResumeEKSAControllerReconcile(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec, provider providers.Provider) error { // clear pause annotation clusterSpec.Cluster.ClearPauseAnnotation() provider.DatacenterConfig(clusterSpec).ClearPauseAnnotation() if clusterSpec.Cluster.IsSelfManaged() { return c.resumeEksaReconcileForManagementAndWorkloadClusters(ctx, cluster, clusterSpec, provider) } return c.resumeReconcileForCluster(ctx, cluster, clusterSpec.Cluster, provider) } func (c *ClusterManager) resumeReconcileForCluster(ctx context.Context, clusterCreds *types.Cluster, cluster *v1alpha1.Cluster, provider providers.Provider) error { pausedAnnotation := cluster.PausedAnnotation() err := c.clusterClient.RemoveAnnotationInNamespace(ctx, provider.DatacenterResourceType(), cluster.Spec.DatacenterRef.Name, pausedAnnotation, clusterCreds, cluster.Namespace) if err != nil { return fmt.Errorf("removing paused annotation when resuming datacenterconfig reconciliation: %v", err) } if provider.MachineResourceType() != "" { for _, machineConfigRef := range cluster.MachineConfigRefs() { err = c.clusterClient.RemoveAnnotationInNamespace(ctx, provider.MachineResourceType(), machineConfigRef.Name, pausedAnnotation, clusterCreds, cluster.Namespace) if err != nil { return fmt.Errorf("removing paused annotation when resuming reconciliation for machine config %s: %v", machineConfigRef.Name, err) } } } err = c.clusterClient.RemoveAnnotationInNamespace(ctx, cluster.ResourceType(), cluster.Name, pausedAnnotation, clusterCreds, cluster.Namespace) if err != nil { return fmt.Errorf("removing paused annotation when resuming cluster reconciliation: %v", err) } if err = c.clusterClient.RemoveAnnotationInNamespace(ctx, cluster.ResourceType(), cluster.Name, v1alpha1.ManagedByCLIAnnotation, clusterCreds, cluster.Namespace, ); err != nil { return fmt.Errorf("removing managed by CLI annotation when resuming cluster reconciliation: %v", err) } return nil } // ResumeCAPIWorkloadClusters resumes all workload CAPI clusters except the management cluster. func (c *ClusterManager) ResumeCAPIWorkloadClusters(ctx context.Context, managementCluster *types.Cluster) error { clusters, err := c.clusterClient.GetClusters(ctx, managementCluster) if err != nil { return err } for _, w := range clusters { // skip resuming management cluster if w.Metadata.Name == managementCluster.Name { continue } if err = c.clusterClient.ResumeCAPICluster(ctx, w.Metadata.Name, managementCluster.KubeconfigFile); err != nil { return err } } return nil } // AllowDeleteWhilePaused allows the deletion of paused clusters. func (c *ClusterManager) AllowDeleteWhilePaused(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec) error { return c.allowDeleteWhilePaused(ctx, cluster, clusterSpec.Cluster) } func (c *ClusterManager) allowDeleteWhilePaused(ctx context.Context, clusterCreds *types.Cluster, cluster *v1alpha1.Cluster) error { allowDelete := map[string]string{v1alpha1.AllowDeleteWhenPausedAnnotation: "true"} if err := c.clusterClient.UpdateAnnotationInNamespace(ctx, cluster.ResourceType(), cluster.Name, allowDelete, clusterCreds, cluster.Namespace); err != nil { return fmt.Errorf("updating paused annotation in cluster reconciliation: %v", err) } return nil } func (c *ClusterManager) PauseEKSAControllerReconcile(ctx context.Context, cluster *types.Cluster, clusterSpec *cluster.Spec, provider providers.Provider) error { if clusterSpec.Cluster.IsSelfManaged() { return c.pauseEksaReconcileForManagementAndWorkloadClusters(ctx, cluster, clusterSpec, provider) } return c.pauseReconcileForCluster(ctx, cluster, clusterSpec.Cluster, provider) } func (c *ClusterManager) pauseEksaReconcileForManagementAndWorkloadClusters(ctx context.Context, managementCluster *types.Cluster, clusterSpec *cluster.Spec, provider providers.Provider) error { clusters := &v1alpha1.ClusterList{} err := c.clusterClient.ListObjects(ctx, eksaClusterResourceType, clusterSpec.Cluster.Namespace, managementCluster.KubeconfigFile, clusters) if err != nil { return err } for _, w := range clusters.Items { if w.ManagedBy() != clusterSpec.Cluster.Name { continue } if err := c.pauseReconcileForCluster(ctx, managementCluster, &w, provider); err != nil { return err } } return nil } func (c *ClusterManager) pauseReconcileForCluster(ctx context.Context, clusterCreds *types.Cluster, cluster *v1alpha1.Cluster, provider providers.Provider) error { pausedAnnotation := map[string]string{cluster.PausedAnnotation(): "true"} err := c.clusterClient.UpdateAnnotationInNamespace(ctx, provider.DatacenterResourceType(), cluster.Spec.DatacenterRef.Name, pausedAnnotation, clusterCreds, cluster.Namespace) if err != nil { return fmt.Errorf("updating annotation when pausing datacenterconfig reconciliation: %v", err) } if provider.MachineResourceType() != "" { for _, machineConfigRef := range cluster.MachineConfigRefs() { err = c.clusterClient.UpdateAnnotationInNamespace(ctx, provider.MachineResourceType(), machineConfigRef.Name, pausedAnnotation, clusterCreds, cluster.Namespace) if err != nil { return fmt.Errorf("updating annotation when pausing reconciliation for machine config %s: %v", machineConfigRef.Name, err) } } } err = c.clusterClient.UpdateAnnotationInNamespace(ctx, cluster.ResourceType(), cluster.Name, pausedAnnotation, clusterCreds, cluster.Namespace) if err != nil { return fmt.Errorf("updating paused annotation in cluster reconciliation: %v", err) } if err = c.clusterClient.UpdateAnnotationInNamespace(ctx, cluster.ResourceType(), cluster.Name, map[string]string{v1alpha1.ManagedByCLIAnnotation: "true"}, clusterCreds, cluster.Namespace, ); err != nil { return fmt.Errorf("updating managed by cli annotation in cluster when pausing cluster reconciliation: %v", err) } return nil } func (c *ClusterManager) GetCurrentClusterSpec(ctx context.Context, clus *types.Cluster, clusterName string) (*cluster.Spec, error) { eksaCluster, err := c.clusterClient.GetEksaCluster(ctx, clus, clusterName) if err != nil { return nil, fmt.Errorf("failed getting EKS-A cluster to build current cluster Spec: %v", err) } return c.buildSpecForCluster(ctx, clus, eksaCluster) } func (c *ClusterManager) buildSpecForCluster(ctx context.Context, clus *types.Cluster, eksaCluster *v1alpha1.Cluster) (*cluster.Spec, error) { client, err := c.ClientFactory.BuildClientFromKubeconfig(clus.KubeconfigFile) if err != nil { return nil, err } return cluster.BuildSpec(ctx, client, eksaCluster) } func (c *ClusterManager) getUpgraderImagesFromBundle(ctx context.Context, cluster *types.Cluster, cl *cluster.Spec) (*corev1.ConfigMap, error) { upgraderImages := make(map[string]string) for _, versionBundle := range cl.Bundles.Spec.VersionsBundles { eksD := versionBundle.EksD eksdVersion := fmt.Sprintf("%s-eks-%s-%s", eksD.KubeVersion, eksD.ReleaseChannel, strings.Split(eksD.Name, "-")[4]) if _, ok := upgraderImages[eksdVersion]; !ok { upgraderImages[eksdVersion] = versionBundle.Upgrader.Upgrader.URI } } upgraderConfigMap, err := c.clusterClient.GetConfigMap(ctx, cluster.KubeconfigFile, constants.UpgraderConfigMapName, constants.EksaSystemNamespace) if err != nil { if executables.IsKubectlNotFoundError(err) { return newUpgraderConfigMap(upgraderImages), nil } return nil, err } for version, image := range upgraderImages { upgraderConfigMap.Data[version] = image } return upgraderConfigMap, nil } func newUpgraderConfigMap(m map[string]string) *corev1.ConfigMap { return &corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "ConfigMap", }, ObjectMeta: metav1.ObjectMeta{ Name: constants.UpgraderConfigMapName, Namespace: constants.EksaSystemNamespace, }, Data: m, } } // As the Tink stack gets redeployed in the management cluster tinkerbell IP changes from bootstrap IP // to the actual Tinkerbell IP specified in datacenter spec. We will need to update this IP in the // TinkerbellMachineTemplate as the previous bootStrap IP is no longer serving the Tink stack. // Also there is a new rollout once the eks-a controller comes up on the management cluster as it sees // the IP change in the template as a diff in spec. To prevent this from happening update the objects // in-place before the move. Since TinkerbellMachineTemplate is immutable we get the object, update // the IP, delete and recreate the object. // For long term, we want to revisit how we handle the bootstrap vs management cluster case in eks-a // controller specific to baremetal provider as the source of truth gets changed due to the nature of // tink stack being moved. // nolint:gocyclo func updateTinkerbellIPInBootstrapTinkerbellMachineTemplate(ctx context.Context, spec *cluster.Spec, client kubernetes.Client) error { logger.Info("Updating Tinkerbell stack IP from bootstrap to management cluster tinkerbell stack IP") tinkerbellMachineTemplates := tinkerbellv1.TinkerbellMachineTemplateList{} if err := client.List(ctx, &tinkerbellMachineTemplates); err != nil { return fmt.Errorf("retrieving tinkerbell machine templates: %w", err) } tinkIP := spec.TinkerbellDatacenter.Spec.TinkerbellIP for _, tinkMachineTemplate := range tinkerbellMachineTemplates.Items { isoURL := url.URL{ Scheme: "http", Host: fmt.Sprintf("%s:%s", tinkIP, tinkerbell.SmeeHTTPPort), // isoURL path is only served in the top level /iso path. Path: "/iso/hook.iso", } err := client.Delete(ctx, &tinkMachineTemplate) if err != nil { return fmt.Errorf("deleting tinkerebell machine template: %w", err) } tinkMachineTemplate.Spec.Template.Spec.BootOptions.ISOURL = isoURL.String() osImageURL := spec.TinkerbellDatacenter.Spec.OSImageURL // When an templateOverride is specified in the spec, we do not want to modify it. cpMachineCfg := spec.TinkerbellMachineConfigs[spec.Cluster.Spec.ControlPlaneConfiguration.MachineGroupRef.Name] if cpMachineCfg.Spec.TemplateRef.Name == "" && strings.Contains(tinkMachineTemplate.Name, clusterapi.ControlPlaneMachineTemplateName(spec.Cluster)) { if cpMachineCfg.Spec.OSImageURL != "" { osImageURL = cpMachineCfg.Spec.OSImageURL } tinkMachineTemplate, err = updateTemplateOverride(spec.Cluster, tinkMachineTemplate, osImageURL, tinkIP, cpMachineCfg.OSFamily()) if err != nil { return err } } // When an templateOverride is specified in the spec, we do not want to modify it. // We update the tinkebelltemplate config only for the corresponding worker node group. for _, wng := range spec.Cluster.Spec.WorkerNodeGroupConfigurations { wngMachineCfg := spec.TinkerbellMachineConfigs[wng.MachineGroupRef.Name] if wngMachineCfg.Spec.TemplateRef.Name == "" && strings.Contains(tinkMachineTemplate.Name, clusterapi.MachineDeploymentName(spec.Cluster, wng)) { if wngMachineCfg.Spec.OSImageURL != "" { osImageURL = wngMachineCfg.Spec.OSImageURL } tinkMachineTemplate, err = updateTemplateOverride(spec.Cluster, tinkMachineTemplate, osImageURL, tinkIP, wngMachineCfg.OSFamily()) if err != nil { return err } } } err = client.Create(ctx, &tinkMachineTemplate) if err != nil { return fmt.Errorf("creating tinkerebell machine template: %w", err) } } return nil } func updateTemplateOverride(clusterSpec *v1alpha1.Cluster, template tinkerbellv1.TinkerbellMachineTemplate, osImageOverride, tinkIP string, osFamily v1alpha1.OSFamily) (tinkerbellv1.TinkerbellMachineTemplate, error) { newOverride := v1alpha1.NewDefaultTinkerbellTemplateConfigCreate(clusterSpec, osImageOverride, tinkIP, tinkIP, osFamily) var err error template.Spec.Template.Spec.TemplateOverride, err = newOverride.ToTemplateString() if err != nil { return tinkerbellv1.TinkerbellMachineTemplate{}, fmt.Errorf("failed to get TinkerbellTemplateConfig: %w", err) } return template, nil }