in pkg/operations/kubernetesupgrade/upgrader.go [332:566]
func (ku *Upgrader) upgradeAgentPools(ctx context.Context) error {
for _, agentPool := range ku.ClusterTopology.AgentPools {
// Upgrade Agent VMs
templateMap, parametersMap, err := ku.generateUpgradeTemplate(ku.ClusterTopology.DataModel, ku.AKSEngineVersion)
if err != nil {
ku.logger.Errorf("Error generating upgrade template: %v", err)
return ku.Translator.Errorf("Error generating upgrade template: %s", err.Error())
}
ku.logger.Infof("Prepping agent pool '%s' for upgrade...", *agentPool.Name)
preservePools := map[string]bool{*agentPool.Name: true}
transformer := &transform.Transformer{
Translator: ku.Translator,
}
if ku.ClusterTopology.DataModel.Properties.OrchestratorProfile.KubernetesConfig.PrivateJumpboxProvision() {
err = transformer.RemoveJumpboxResourcesFromTemplate(ku.logger, templateMap)
if err != nil {
return ku.Translator.Errorf("error removing jumpbox resources from template: %s", err.Error())
}
}
var isMasterManagedDisk bool
if ku.DataModel.Properties.MasterProfile != nil {
isMasterManagedDisk = ku.DataModel.Properties.MasterProfile.IsManagedDisks()
}
if ku.DataModel.Properties.OrchestratorProfile.KubernetesConfig.LoadBalancerSku == api.StandardLoadBalancerSku {
err = transformer.NormalizeForK8sSLBScalingOrUpgrade(ku.logger, templateMap)
if err != nil {
return ku.Translator.Errorf("error normalizing upgrade template for SLB: %s", err.Error())
}
}
if err = transformer.NormalizeResourcesForK8sAgentUpgrade(ku.logger, templateMap, isMasterManagedDisk, preservePools); err != nil {
ku.logger.Error(err.Error())
return ku.Translator.Errorf("Error generating upgrade template: %s", err.Error())
}
transformer.RemoveImmutableResourceProperties(ku.logger, templateMap)
var agentCount int
var agentPoolProfile *api.AgentPoolProfile
for _, app := range ku.ClusterTopology.DataModel.Properties.AgentPoolProfiles {
if app.Name == *agentPool.Name {
agentCount = app.Count
agentPoolProfile = app
break
}
}
if agentCount == 0 {
ku.logger.Infof("Agent pool '%s' is empty", *agentPool.Name)
return nil
}
upgradeAgentNode := UpgradeAgentNode{
Translator: ku.Translator,
logger: ku.logger,
}
upgradeAgentNode.TemplateMap = templateMap
upgradeAgentNode.ParametersMap = parametersMap
upgradeAgentNode.UpgradeContainerService = ku.ClusterTopology.DataModel
upgradeAgentNode.SubscriptionID = ku.ClusterTopology.SubscriptionID
upgradeAgentNode.ResourceGroup = ku.ClusterTopology.ResourceGroup
upgradeAgentNode.Client = ku.Client
upgradeAgentNode.kubeConfig = ku.kubeConfig
if ku.stepTimeout == nil {
upgradeAgentNode.timeout = defaultTimeout
} else {
upgradeAgentNode.timeout = *ku.stepTimeout
}
if ku.cordonDrainTimeout == nil {
upgradeAgentNode.cordonDrainTimeout = defaultCordonDrainTimeout
} else {
upgradeAgentNode.cordonDrainTimeout = *ku.cordonDrainTimeout
}
agentVMs := make(map[int]*vmInfo)
// Go over upgraded VMs and verify provisioning state
// per https://docs.microsoft.com/en-us/rest/api/compute/virtualmachines/virtualmachines-state :
// - Creating: Indicates the virtual Machine is being created.
// - Updating: Indicates that there is an update operation in progress on the Virtual Machine.
// - Succeeded: Indicates that the operation executed on the virtual machine succeeded.
// - Deleting: Indicates that the virtual machine is being deleted.
// - Failed: Indicates that the update operation on the Virtual Machine failed.
// Delete VMs in 'bad' state. Such VMs will be re-created later in this function.
upgradedCount := 0
for _, vm := range *agentPool.UpgradedAgentVMs {
ku.logger.Infof("Agent VM: %s, pool name: %s on expected orchestrator version", *vm.Name, *agentPool.Name)
var vmProvisioningState string
if vm.Properties != nil && vm.Properties.ProvisioningState != nil {
vmProvisioningState = *vm.Properties.ProvisioningState
}
agentIndex, _ := utils.GetVMNameIndex(*vm.Properties.StorageProfile.OSDisk.OSType, *vm.Name)
switch vmProvisioningState {
case "Creating", "Updating", "Succeeded":
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusUpgraded}
upgradedCount++
case "Failed":
ku.logger.Infof("Deleting agent VM %s in provisioning state %s", *vm.Name, vmProvisioningState)
err = upgradeAgentNode.DeleteNode(vm.Name, false)
if err != nil {
ku.logger.Errorf("Error deleting agent VM %s: %v", *vm.Name, err)
return err
}
case "Deleting":
fallthrough
default:
ku.logger.Infof("Ignoring agent VM %s in provisioning state %s", *vm.Name, vmProvisioningState)
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusIgnored}
}
}
for _, vm := range *agentPool.AgentVMs {
agentIndex, _ := utils.GetVMNameIndex(*vm.Properties.StorageProfile.OSDisk.OSType, *vm.Name)
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusNotUpgraded}
}
toBeUpgradedCount := len(*agentPool.AgentVMs)
ku.logger.Infof("Starting upgrade of %d agent nodes (out of %d) in pool identifier: %s, name: %s...",
toBeUpgradedCount, agentCount, *agentPool.Identifier, *agentPool.Name)
// Create missing nodes to match agentCount. This could be due to previous upgrade failure
// If there are nodes that need to be upgraded, create one extra node, which will be used to take on the load from upgrading nodes.
if toBeUpgradedCount > 0 {
agentCount++
}
newCreatedVMs := []string{}
client, err := ku.getKubernetesClient(10 * time.Second)
if err != nil {
ku.logger.Errorf("Error getting Kubernetes client: %v", err)
return err
}
for upgradedCount+toBeUpgradedCount < agentCount {
agentIndex := getAvailableIndex(agentVMs)
var vmName string
vmName, err = utils.GetK8sVMName(ku.DataModel.Properties, agentPoolProfile, agentIndex)
if err != nil {
ku.logger.Errorf("Error reconstructing agent VM name with index %d: %v", agentIndex, err)
return err
}
ku.logger.Infof("Creating new agent node %s (index %d)", vmName, agentIndex)
err = upgradeAgentNode.CreateNode(ctx, *agentPool.Name, agentIndex)
if err != nil {
ku.logger.Errorf("Error creating agent node %s (index %d): %v", vmName, agentIndex, err)
return err
}
err = upgradeAgentNode.Validate(&vmName)
if err != nil {
ku.logger.Infof("Error validating agent node %s (index %d): %v", vmName, agentIndex, err)
return err
}
newCreatedVMs = append(newCreatedVMs, vmName)
agentVMs[agentIndex] = &vmInfo{vmName, vmStatusUpgraded}
upgradedCount++
}
if toBeUpgradedCount == 0 {
ku.logger.Infof("No nodes to upgrade")
continue
}
// Upgrade nodes in agent pool
upgradedCount = 0
for agentIndex, vm := range agentVMs {
if vm.status != vmStatusNotUpgraded {
continue
}
ku.logger.Infof("Upgrading Agent VM: %s, pool name: %s", vm.name, *agentPool.Name)
// copy custom properties from old node to new node if the PreserveNodesProperties in AgentPoolProfile is not set to false explicitly.
preserveNodesProperties := api.DefaultPreserveNodesProperties
if agentPoolProfile != nil && agentPoolProfile.PreserveNodesProperties != nil {
preserveNodesProperties = *agentPoolProfile.PreserveNodesProperties
}
if preserveNodesProperties {
if len(newCreatedVMs) > 0 {
newNodeName := newCreatedVMs[0]
newCreatedVMs = newCreatedVMs[1:]
ku.logger.Infof("Copying custom annotations, labels, taints from old node %s to new node %s...", vm.name, newNodeName)
err = ku.copyCustomPropertiesToNewNode(client, strings.ToLower(vm.name), newNodeName)
if err != nil {
ku.logger.Warningf("Failed to copy custom annotations, labels, taints from old node %s to new node %s: %v", vm.name, newNodeName, err)
}
}
}
err := upgradeAgentNode.DeleteNode(&vm.name, true)
if err != nil {
ku.logger.Errorf("Error deleting agent VM %s: %v", vm.name, err)
return err
}
vmName, err := utils.GetK8sVMName(ku.DataModel.Properties, agentPoolProfile, agentIndex)
if err != nil {
ku.logger.Errorf("Error fetching new VM name: %v", err)
return err
}
// do not create last node in favor of already created extra node.
if upgradedCount == toBeUpgradedCount-1 {
ku.logger.Infof("Skipping creation of VM %s (index %d)", vmName, agentIndex)
delete(agentVMs, agentIndex)
} else {
err = upgradeAgentNode.CreateNode(ctx, *agentPool.Name, agentIndex)
if err != nil {
ku.logger.Errorf("Error creating upgraded agent VM %s: %v", vmName, err)
return err
}
err = upgradeAgentNode.Validate(&vmName)
if err != nil {
ku.logger.Errorf("Error validating upgraded agent VM %s: %v", vmName, err)
return err
}
newCreatedVMs = append(newCreatedVMs, vmName)
vm.status = vmStatusUpgraded
}
upgradedCount++
}
}
return nil
}