pkg/operations/kubernetesupgrade/upgradecluster.go (385 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. package kubernetesupgrade import ( "context" "fmt" "math/rand" "strings" "time" "github.com/Azure/aks-engine-azurestack/pkg/api" "github.com/Azure/aks-engine-azurestack/pkg/api/common" "github.com/Azure/aks-engine-azurestack/pkg/armhelpers" "github.com/Azure/aks-engine-azurestack/pkg/armhelpers/utils" "github.com/Azure/aks-engine-azurestack/pkg/i18n" "github.com/Azure/aks-engine-azurestack/pkg/kubernetes" compute "github.com/Azure/azure-sdk-for-go/profile/p20200901/resourcemanager/compute/armcompute" "github.com/pkg/errors" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" util "k8s.io/client-go/util/retry" ) // ClusterTopology contains resources of the cluster the upgrade operation // is targeting type ClusterTopology struct { DataModel *api.ContainerService SubscriptionID string Location string ResourceGroup string NameSuffix string AgentPoolsToUpgrade map[string]bool AgentPools map[string]*AgentPoolTopology MasterVMs *[]*compute.VirtualMachine UpgradedMasterVMs *[]*compute.VirtualMachine } // AgentPoolTopology contains agent VMs in a single pool type AgentPoolTopology struct { Identifier *string Name *string AgentVMs *[]*compute.VirtualMachine UpgradedAgentVMs *[]*compute.VirtualMachine } // UpgradeCluster upgrades a cluster with Orchestrator version X.X to version Y.Y. // Right now upgrades are supported for Kubernetes cluster only. type UpgradeCluster struct { Translator *i18n.Translator Logger *logrus.Entry ClusterTopology Client armhelpers.AKSEngineClient StepTimeout *time.Duration CordonDrainTimeout *time.Duration UpgradeWorkFlow UpgradeWorkFlow Force bool ControlPlaneOnly bool CurrentVersion string } // MasterPoolName pool name const MasterPoolName = "master" // UpgradeCluster runs the workflow to upgrade a Kubernetes cluster. func (uc *UpgradeCluster) UpgradeCluster(az armhelpers.AKSEngineClient, kubeConfig string, aksEngineVersion string) error { uc.MasterVMs = &[]*compute.VirtualMachine{} uc.UpgradedMasterVMs = &[]*compute.VirtualMachine{} uc.AgentPools = make(map[string]*AgentPoolTopology) var kubeClient kubernetes.Client if az != nil { timeout := time.Duration(60) * time.Minute k, err := az.GetKubernetesClient("", kubeConfig, interval, timeout) if err != nil { uc.Logger.Warnf("Failed to get a Kubernetes client: %v", err) } kubeClient = k } if err := uc.setNodesToUpgrade(kubeClient, uc.ResourceGroup); err != nil { return uc.Translator.Errorf("Error while querying ARM for resources: %+v", err) } if kubeClient != nil { ctx, cancel := context.WithTimeout(context.Background(), 150*time.Second) defer cancel() notReadyStream := uc.upgradedNotReadyStream(kubeClient, wait.Backoff{Steps: 15, Duration: 10 * time.Second}) if err := uc.checkControlPlaneNodesStatus(ctx, notReadyStream); err != nil { uc.Logger.Error("Aborting the upgrade process to avoid potential control plane downtime") return errors.Wrap(err, "checking status of upgraded control plane nodes") } } kc := uc.DataModel.Properties.OrchestratorProfile.KubernetesConfig if kc != nil && kc.IsClusterAutoscalerEnabled() && !uc.ControlPlaneOnly { // pause the cluster-autoscaler before running upgrade and resume it afterward uc.Logger.Info("Pausing cluster autoscaler, replica count: 0") count, err := uc.SetClusterAutoscalerReplicaCount(kubeClient, 0) if err != nil { uc.Logger.Errorf("Failed to pause cluster-autoscaler: %v", err) if !uc.Force { return err } } else { if err == nil { defer func() { uc.Logger.Infof("Resuming cluster autoscaler, replica count: %d", count) if _, err = uc.SetClusterAutoscalerReplicaCount(kubeClient, count); err != nil { uc.Logger.Errorf("Failed to resume cluster-autoscaler: %v", err) } }() } } } upgradeVersion := uc.DataModel.Properties.OrchestratorProfile.OrchestratorVersion what := "control plane and all nodes" if uc.ControlPlaneOnly { what = "control plane nodes" } uc.Logger.Infof("Upgrading %s to Kubernetes version %s", what, upgradeVersion) if err := uc.getUpgradeWorkflow(kubeConfig, aksEngineVersion).RunUpgrade(); err != nil { return err } what = "Cluster" if uc.ControlPlaneOnly { what = "Control plane" } uc.Logger.Infof("%s upgraded successfully to Kubernetes version %s", what, upgradeVersion) return nil } // SetClusterAutoscalerReplicaCount changes the replica count of a cluster-autoscaler deployment. func (uc *UpgradeCluster) SetClusterAutoscalerReplicaCount(kubeClient kubernetes.Client, replicaCount int32) (int32, error) { if kubeClient == nil { return 0, errors.New("no kubernetes client") } var count int32 var err error const namespace, name, retries = "kube-system", "cluster-autoscaler", 10 for attempt := 0; attempt < retries; attempt++ { deployment, getErr := kubeClient.GetDeployment(namespace, name) err = getErr if getErr == nil { count = *deployment.Spec.Replicas deployment.Spec.Replicas = &replicaCount if _, err = kubeClient.UpdateDeployment(namespace, deployment); err == nil { break } } sleepTime := time.Duration(rand.Intn(5)) uc.Logger.Warnf("Failed to update cluster-autoscaler deployment: %v", err) uc.Logger.Infof("Retry updating cluster-autoscaler after %d seconds", sleepTime) time.Sleep(sleepTime * time.Second) } if err != nil { return 0, err } return count, nil } func (uc *UpgradeCluster) getUpgradeWorkflow(kubeConfig string, aksEngineVersion string) UpgradeWorkFlow { if uc.UpgradeWorkFlow != nil { return uc.UpgradeWorkFlow } u := &Upgrader{} u.Init(uc.Translator, uc.Logger, uc.ClusterTopology, uc.Client, kubeConfig, uc.StepTimeout, uc.CordonDrainTimeout, aksEngineVersion, uc.ControlPlaneOnly) u.CurrentVersion = uc.CurrentVersion u.Force = uc.Force return u } func (uc *UpgradeCluster) setNodesToUpgrade(kubeClient kubernetes.Client, resourceGroup string) error { goalVersion := uc.DataModel.Properties.OrchestratorProfile.OrchestratorVersion ctx, cancel := context.WithTimeout(context.Background(), armhelpers.DefaultARMOperationTimeout) defer cancel() vmList, err := uc.Client.ListVirtualMachines(ctx, resourceGroup) if err != nil { return err } for _, vm := range vmList { // Windows VMs contain a substring of the name suffix if !strings.Contains(*(vm.Name), uc.NameSuffix) && !strings.Contains(*(vm.Name), uc.NameSuffix[:4]+"k8s") { uc.Logger.Infof("Skipping VM: %s for upgrade as it does not belong to cluster with expected name suffix: %s", *vm.Name, uc.NameSuffix) continue } currentVersion := uc.getNodeVersion(kubeClient, strings.ToLower(*vm.Name), vm.Tags, true) if uc.Force { if currentVersion == "" { currentVersion = "Unknown" } uc.addVMToUpgradeSets(vm, currentVersion) } else { if currentVersion == "" { uc.Logger.Infof("Skipping VM: %s for upgrade as the orchestrator version could not be determined.", *vm.Name) continue } // If the current version is different than the desired version then we add the VM to the list of VMs to upgrade. if currentVersion != goalVersion { if err := uc.upgradable(currentVersion); err != nil { return err } uc.addVMToUpgradeSets(vm, currentVersion) } else if currentVersion == goalVersion { uc.addVMToFinishedSets(vm, currentVersion) } } } return nil } func (uc *UpgradeCluster) upgradable(currentVersion string) error { nodeVersion := &api.OrchestratorProfile{ OrchestratorType: api.Kubernetes, OrchestratorVersion: currentVersion, } targetVersion := uc.DataModel.Properties.OrchestratorProfile.OrchestratorVersion orch, err := api.GetOrchestratorVersionProfile(nodeVersion, uc.DataModel.Properties.HasWindows(), uc.DataModel.Properties.IsAzureStackCloud()) if err != nil { return err } for _, up := range orch.Upgrades { if up.OrchestratorVersion == targetVersion { return nil } } return errors.Errorf("%s cannot be upgraded to %s", currentVersion, targetVersion) } // getNodeVersion returns a node's current Kubernetes version via Kubernetes API or VM tag. // For VMSS nodes, make sure OsProfile.ComputerName instead of VM name is used as the name here // because the former is used as the K8s node name. // Also, if the latest VMSS model is applied, then we can get the version info from the tags. // Otherwise, we have to get version via K8s API. This is because VMSS does not support tags // for individual instances and old/new instances have the same tags. func (uc *UpgradeCluster) getNodeVersion(client kubernetes.Client, name string, tags map[string]*string, getVersionFromTags bool) string { if getVersionFromTags { if tags != nil && tags["orchestrator"] != nil { parts := strings.Split(*tags["orchestrator"], ":") if len(parts) == 2 { return parts[1] } } uc.Logger.Warnf("Expected tag \"orchestrator\" not found for VM: %s. Using Kubernetes API to retrieve Kubernetes version.", name) } if client != nil { node, err := client.GetNode(name) if err == nil { return strings.TrimPrefix(node.Status.NodeInfo.KubeletVersion, "v") } uc.Logger.Warnf("Failed to get node %s: %v", name, err) } return "" } func (uc *UpgradeCluster) addVMToAgentPool(vm *compute.VirtualMachine, isUpgradableVM bool) error { var poolIdentifier string var poolPrefix string var err error var vmPoolName string if vm.Tags != nil && vm.Tags["poolName"] != nil { vmPoolName = *vm.Tags["poolName"] } else { uc.Logger.Infof("poolName tag not found for VM: %s.", *vm.Name) // If there's only one agent pool, assume this VM is a member. agentPools := []string{} for k := range uc.AgentPoolsToUpgrade { if !strings.HasPrefix(k, "master") { agentPools = append(agentPools, k) } } if len(agentPools) == 1 { vmPoolName = agentPools[0] } } if vmPoolName == "" { uc.Logger.Warnf("Couldn't determine agent pool membership for VM: %s.", *vm.Name) return nil } uc.Logger.Infof("Evaluating VM: %s in pool: %s...", *vm.Name, vmPoolName) if vmPoolName == "" { uc.Logger.Infof("VM: %s does not contain `poolName` tag, skipping.", *vm.Name) return nil } else if !uc.AgentPoolsToUpgrade[vmPoolName] { uc.Logger.Infof("Skipping upgrade of VM: %s in pool: %s.", *vm.Name, vmPoolName) return nil } if *vm.Properties.StorageProfile.OSDisk.OSType == compute.OperatingSystemTypesWindows { poolPrefix, _, _, _, err = utils.WindowsVMNameParts(*vm.Name) if err != nil { uc.Logger.Error(err.Error()) return err } if !strings.Contains(uc.NameSuffix, poolPrefix) { uc.Logger.Infof("Skipping VM: %s for upgrade as it does not belong to cluster with expected name suffix: %s", *vm.Name, uc.NameSuffix) return nil } // The k8s Windows VM Naming Format was previously "^([a-fA-F0-9]{5})([0-9a-zA-Z]{3})([a-zA-Z0-9]{4,6})$" (i.e.: 50621k8s9000) // The k8s Windows VM Naming Format is now "^([a-fA-F0-9]{4})([0-9a-zA-Z]{3})([0-9]{3,8})$" (i.e.: 1708k8s020) // The pool identifier is made of the first 11 or 9 characters if string((*vm.Name)[8]) == "9" { poolIdentifier = (*vm.Name)[:11] } else { poolIdentifier = (*vm.Name)[:9] } } else { // vm.StorageProfile.OsDisk.OsType == compute.Linux poolIdentifier, poolPrefix, _, err = utils.K8sLinuxVMNameParts(*vm.Name) if err != nil { uc.Logger.Error(err.Error()) return err } if !strings.EqualFold(uc.NameSuffix, poolPrefix) { uc.Logger.Infof("Skipping VM: %s for upgrade as it does not belong to cluster with expected name suffix: %s", *vm.Name, uc.NameSuffix) return nil } } if uc.AgentPools[poolIdentifier] == nil { uc.AgentPools[poolIdentifier] = &AgentPoolTopology{&poolIdentifier, &vmPoolName, &[]*compute.VirtualMachine{}, &[]*compute.VirtualMachine{}} } orchestrator := "unknown" if vm.Tags != nil && vm.Tags["orchestrator"] != nil { orchestrator = *vm.Tags["orchestrator"] } //TODO(sterbrec): extract this from add to agentPool // separate the upgrade/skip decision from the agentpool composition if isUpgradableVM { uc.Logger.Infof("Adding Agent VM: %s, orchestrator: %s to pool: %s (AgentVMs)", *vm.Name, orchestrator, poolIdentifier) *uc.AgentPools[poolIdentifier].AgentVMs = append(*uc.AgentPools[poolIdentifier].AgentVMs, vm) } else { uc.Logger.Infof("Adding Agent VM: %s, orchestrator: %s to pool: %s (UpgradedAgentVMs)", *vm.Name, orchestrator, poolIdentifier) *uc.AgentPools[poolIdentifier].UpgradedAgentVMs = append(*uc.AgentPools[poolIdentifier].UpgradedAgentVMs, vm) } return nil } func (uc *UpgradeCluster) addVMToUpgradeSets(vm *compute.VirtualMachine, currentVersion string) { if strings.Contains(*(vm.Name), fmt.Sprintf("%s-", common.LegacyControlPlaneVMPrefix)) { uc.Logger.Infof("Master VM name: %s, orchestrator: %s (MasterVMs)", *vm.Name, currentVersion) *uc.MasterVMs = append(*uc.MasterVMs, vm) } else { if err := uc.addVMToAgentPool(vm, true); err != nil { uc.Logger.Errorf("Failed to add VM %s to agent pool: %s", *vm.Name, err) } } } func (uc *UpgradeCluster) addVMToFinishedSets(vm *compute.VirtualMachine, currentVersion string) { if strings.Contains(*(vm.Name), fmt.Sprintf("%s-", common.LegacyControlPlaneVMPrefix)) { uc.Logger.Infof("Master VM name: %s, orchestrator: %s (UpgradedMasterVMs)", *vm.Name, currentVersion) *uc.UpgradedMasterVMs = append(*uc.UpgradedMasterVMs, vm) } else { if err := uc.addVMToAgentPool(vm, false); err != nil { uc.Logger.Errorf("Failed to add VM %s to agent pool: %s", *vm.Name, err) } } } // checkControlPlaneNodesStatus checks whether it is safe to proceed with the upgrade process // by looking at the status of previously upgraded control plane nodes. // // It returns an error if more than 1 of the already-upgraded control plane nodes are in the NotReady state. // To recreate the node, users have to manually update the "orchestrator" tag on the VM. func (uc *UpgradeCluster) checkControlPlaneNodesStatus(ctx context.Context, upgradedNotReadyStream <-chan []string) error { if len(*uc.UpgradedMasterVMs) == 0 { return nil } uc.Logger.Infoln("Checking status of upgraded control plane nodes") upgradedNotReadyCount := 0 loop: for { select { case upgradedNotReady, ok := <-upgradedNotReadyStream: if !ok { break loop } upgradedNotReadyCount = len(upgradedNotReady) case <-ctx.Done(): break loop } } // return error if more than 1 upgraded node is not ready if upgradedNotReadyCount > 1 { uc.Logger.Error("At least 2 of the previously upgraded control plane nodes did not reach the NodeReady status") return errors.New("too many upgraded nodes are not ready") } return nil } func (uc *UpgradeCluster) upgradedNotReadyStream(client kubernetes.Client, backoff wait.Backoff) <-chan []string { alwaysRetry := func(_ error) bool { return true } upgraded := []string{} for _, vm := range *uc.UpgradedMasterVMs { upgraded = append(upgraded, *vm.Name) } stream := make(chan []string) go func() { defer close(stream) util.OnError(backoff, alwaysRetry, func() error { //nolint:errcheck upgradedNotReady, err := uc.getUpgradedNotReady(client, upgraded) if err != nil { return err } stream <- upgradedNotReady if len(upgradedNotReady) > 0 { return errors.New("retry to give NotReady nodes some extra time") } return nil }) }() return stream } func (uc *UpgradeCluster) getUpgradedNotReady(client kubernetes.Client, upgraded []string) ([]string, error) { //TODO, the controlplane node will have both node-role.kubernetes.io/master and node-role.kubernetes.io/control-plane label // if node-role.kubernetes.io/master is removed in future change, also update the following label selector cpNodes, err := client.ListNodesByOptions(metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/master"}) if err != nil { return nil, err } nodeStatusMap := make(map[string]bool) for _, n := range cpNodes.Items { nodeStatusMap[n.Name] = kubernetes.IsNodeReady(&n) } upgradedNotReady := []string{} for _, vm := range upgraded { if ready, found := nodeStatusMap[vm]; found && !ready { upgradedNotReady = append(upgradedNotReady, vm) } } return upgradedNotReady, nil }