pkg/operations/kubernetesupgrade/upgrader.go (636 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package kubernetesupgrade
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/Azure/aks-engine-azurestack/pkg/api"
"github.com/Azure/aks-engine-azurestack/pkg/api/common"
"github.com/Azure/aks-engine-azurestack/pkg/armhelpers"
"github.com/Azure/aks-engine-azurestack/pkg/armhelpers/utils"
"github.com/Azure/aks-engine-azurestack/pkg/engine"
"github.com/Azure/aks-engine-azurestack/pkg/engine/transform"
"github.com/Azure/aks-engine-azurestack/pkg/helpers"
"github.com/Azure/aks-engine-azurestack/pkg/helpers/to"
"github.com/Azure/aks-engine-azurestack/pkg/i18n"
"github.com/Azure/aks-engine-azurestack/pkg/kubernetes"
"github.com/Azure/aks-engine-azurestack/pkg/operations"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Upgrader holds information on upgrading an AKS cluster
type Upgrader struct {
Translator *i18n.Translator
logger *logrus.Entry
ClusterTopology
Client armhelpers.AKSEngineClient
kubeConfig string
stepTimeout *time.Duration
cordonDrainTimeout *time.Duration
AKSEngineVersion string
CurrentVersion string
ControlPlaneOnly bool
Force bool
}
type vmStatus int
const (
defaultTimeout = time.Minute * 20
defaultCordonDrainTimeout = time.Minute * 20
nodePropertiesCopyTimeout = time.Minute * 5
getResourceTimeout = time.Minute * 1
perNodeUpgradeTimeout = time.Minute * 20
vmStatusUpgraded vmStatus = iota
vmStatusNotUpgraded
vmStatusIgnored
)
type vmInfo struct {
name string
status vmStatus
}
// Init initializes an upgrader struct
func (ku *Upgrader) Init(translator *i18n.Translator, logger *logrus.Entry, clusterTopology ClusterTopology, client armhelpers.AKSEngineClient, kubeConfig string, stepTimeout *time.Duration, cordonDrainTimeout *time.Duration, aksEngineVersion string, controlPlaneOnly bool) {
ku.Translator = translator
ku.logger = logger
ku.ClusterTopology = clusterTopology
ku.Client = client
ku.kubeConfig = kubeConfig
ku.stepTimeout = stepTimeout
ku.cordonDrainTimeout = cordonDrainTimeout
ku.AKSEngineVersion = aksEngineVersion
ku.ControlPlaneOnly = controlPlaneOnly
}
// RunUpgrade runs the upgrade pipeline
func (ku *Upgrader) RunUpgrade() error {
if err := ku.validatePodSecurityPolices(); err != nil {
if ku.Force {
ku.logger.Warning("Error validating PodSecurityPolices")
} else {
ku.logger.Warning("Error validating PodSecurityPolices. Consider using --force if you really want to proceed")
return errors.Wrap(err, "error validating PodSecurityPolices")
}
}
controlPlaneUpgradeTimeout := perNodeUpgradeTimeout
if ku.ClusterTopology.DataModel.Properties.MasterProfile.Count > 0 {
controlPlaneUpgradeTimeout = perNodeUpgradeTimeout * time.Duration(ku.ClusterTopology.DataModel.Properties.MasterProfile.Count)
}
ctxControlPlane, cancelControlPlane := context.WithTimeout(context.Background(), controlPlaneUpgradeTimeout)
defer cancelControlPlane()
if err := ku.upgradeMasterNodes(ctxControlPlane); err != nil {
return err
}
ku.handleUnreconcilableAddons()
if ku.ControlPlaneOnly {
return nil
}
var numNodesToUpgrade int
nodesUpgradeTimeout := perNodeUpgradeTimeout
if numNodesToUpgrade > 0 {
nodesUpgradeTimeout = perNodeUpgradeTimeout * time.Duration(numNodesToUpgrade)
}
ctxNodes, cancelNodes := context.WithTimeout(context.Background(), nodesUpgradeTimeout)
defer cancelNodes()
return ku.upgradeAgentPools(ctxNodes)
}
// handleUnreconcilableAddons ensures addon upgrades that addon-manager cannot handle by itself.
// This method fails silently otherwide it would break test "Should not fail if a Kubernetes client cannot be created" (upgradecluster_test.go)
func (ku *Upgrader) handleUnreconcilableAddons() {
upgradeVersion := ku.DataModel.Properties.OrchestratorProfile.OrchestratorVersion
// kube-proxy upgrade fails from v1.15 to 1.16: https://github.com/Azure/aks-engine/issues/3557
// deleting daemonset so addon-manager recreates instead of patching
if !common.IsKubernetesVersionGe(ku.CurrentVersion, "1.16.0") && common.IsKubernetesVersionGe(upgradeVersion, "1.16.0") {
ku.logger.Infof("Attempting to delete kube-proxy daemonset.")
client, err := ku.getKubernetesClient(getResourceTimeout)
if err != nil {
ku.logger.Errorf("Error getting Kubernetes client: %v", err)
return
}
err = client.DeleteDaemonSet(&appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: common.KubeProxyAddonName,
},
})
if err != nil {
ku.logger.Errorf("Error deleting kube-proxy daemonset: %v", err)
}
ku.logger.Infof("Deleted kube-proxy daemonset. Addon-manager will recreate it.")
}
// metrics-server upgrade fails from v1.15 to 1.16 as the addon mode is EnsureExists for pre-v1.16 cluster
if !common.IsKubernetesVersionGe(ku.CurrentVersion, "1.16.0") && common.IsKubernetesVersionGe(upgradeVersion, "1.16.0") {
ku.logger.Infof("Attempting to delete metrics-server deployment.")
client, err := ku.getKubernetesClient(getResourceTimeout)
if err != nil {
ku.logger.Errorf("Error getting Kubernetes client: %v", err)
return
}
err = client.DeleteClusterRole(&rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: "system:metrics-server",
},
})
if err != nil {
ku.logger.Errorf("Error deleting metrics-server cluster role: %v", err)
}
err = client.DeleteDeployment(&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: common.MetricsServerAddonName,
},
})
if err != nil {
ku.logger.Errorf("Error deleting metrics-server deployment: %v", err)
}
ku.logger.Infof("Deleted metrics-server deployment. Addon-manager will recreate it.")
}
}
// Validate will run validation post upgrade
func (ku *Upgrader) Validate() error {
return nil
}
func (ku *Upgrader) upgradeMasterNodes(ctx context.Context) error {
if ku.ClusterTopology.DataModel.Properties.MasterProfile == nil {
return nil
}
ku.logger.Infof("Master nodes StorageProfile: %s", ku.ClusterTopology.DataModel.Properties.MasterProfile.StorageProfile)
// Upgrade Master VMs
templateMap, parametersMap, err := ku.generateUpgradeTemplate(ku.ClusterTopology.DataModel, ku.AKSEngineVersion)
if err != nil {
return ku.Translator.Errorf("error generating upgrade template: %s", err.Error())
}
ku.logger.Infof("Prepping master nodes for upgrade...")
transformer := &transform.Transformer{
Translator: ku.Translator,
}
if ku.ClusterTopology.DataModel.Properties.OrchestratorProfile.KubernetesConfig.PrivateJumpboxProvision() {
err = transformer.RemoveJumpboxResourcesFromTemplate(ku.logger, templateMap)
if err != nil {
return ku.Translator.Errorf("error removing jumpbox resources from template: %s", err.Error())
}
}
if ku.DataModel.Properties.OrchestratorProfile.KubernetesConfig.LoadBalancerSku == api.StandardLoadBalancerSku {
err = transformer.NormalizeForK8sSLBScalingOrUpgrade(ku.logger, templateMap)
if err != nil {
return ku.Translator.Errorf("error normalizing upgrade template for SLB: %s", err.Error())
}
}
if to.Bool(ku.DataModel.Properties.OrchestratorProfile.KubernetesConfig.EnableEncryptionWithExternalKms) {
err = transformer.RemoveKMSResourcesFromTemplate(ku.logger, templateMap)
if err != nil {
return ku.Translator.Errorf("error removing KMS resources from template: %s", err.Error())
}
}
if err = transformer.NormalizeResourcesForK8sMasterUpgrade(ku.logger, templateMap, ku.DataModel.Properties.MasterProfile.IsManagedDisks(), nil); err != nil {
ku.logger.Error(err.Error())
return err
}
transformer.RemoveImmutableResourceProperties(ku.logger, templateMap)
upgradeMasterNode := UpgradeMasterNode{
Translator: ku.Translator,
logger: ku.logger,
}
upgradeMasterNode.TemplateMap = templateMap
upgradeMasterNode.ParametersMap = parametersMap
upgradeMasterNode.UpgradeContainerService = ku.ClusterTopology.DataModel
upgradeMasterNode.ResourceGroup = ku.ClusterTopology.ResourceGroup
upgradeMasterNode.SubscriptionID = ku.ClusterTopology.SubscriptionID
upgradeMasterNode.Client = ku.Client
upgradeMasterNode.kubeConfig = ku.kubeConfig
if ku.stepTimeout == nil {
upgradeMasterNode.timeout = defaultTimeout
} else {
upgradeMasterNode.timeout = *ku.stepTimeout
}
expectedMasterCount := ku.ClusterTopology.DataModel.Properties.MasterProfile.Count
mastersUpgradedCount := len(*ku.ClusterTopology.UpgradedMasterVMs)
mastersToUgradeCount := expectedMasterCount - mastersUpgradedCount
ku.logger.Infof("Total expected master count: %d", expectedMasterCount)
ku.logger.Infof("Master nodes that need to be upgraded: %d", mastersToUgradeCount)
ku.logger.Infof("Master nodes that have been upgraded: %d", mastersUpgradedCount)
ku.logger.Infof("Starting upgrade of master nodes...")
masterNodesInCluster := len(*ku.ClusterTopology.MasterVMs) + mastersUpgradedCount
ku.logger.Infof("masterNodesInCluster: %d", masterNodesInCluster)
if masterNodesInCluster > expectedMasterCount {
return ku.Translator.Errorf("Total count of master VMs: %d exceeded expected count: %d", masterNodesInCluster, expectedMasterCount)
}
// This condition is possible if the previous upgrade operation failed during master
// VM upgrade when a master VM was deleted but creation of upgraded master did not run.
if masterNodesInCluster < expectedMasterCount {
ku.logger.Infof(
"Found missing master VMs in the cluster. Reconstructing names of missing master VMs for recreation during upgrade...")
}
existingMastersIndex := make(map[int]bool)
for _, vm := range *ku.ClusterTopology.MasterVMs {
masterIndex, _ := utils.GetVMNameIndex(*vm.Properties.StorageProfile.OSDisk.OSType, *vm.Name)
existingMastersIndex[masterIndex] = true
}
mastersToCreate := expectedMasterCount - masterNodesInCluster
ku.logger.Infof("Expected master count: %d, Creating %d more master VMs", expectedMasterCount, mastersToCreate)
// NOTE: this is NOT completely idempotent because it assumes that
// the OS disk has been deleted
for i := 0; i < mastersToCreate; i++ {
masterIndexToCreate := 0
for existingMastersIndex[masterIndexToCreate] {
masterIndexToCreate++
}
ku.logger.Infof("Creating upgraded master VM with index: %d", masterIndexToCreate)
err = upgradeMasterNode.CreateNode(ctx, "master", masterIndexToCreate)
if err != nil {
ku.logger.Infof("Error creating upgraded master VM with index: %d", masterIndexToCreate)
return err
}
tempVMName := ""
err = upgradeMasterNode.Validate(&tempVMName)
if err != nil {
ku.logger.Infof("Error validating upgraded master VM with index: %d", masterIndexToCreate)
return err
}
existingMastersIndex[masterIndexToCreate] = true
}
upgradedMastersIndex := make(map[int]bool)
for _, vm := range *ku.ClusterTopology.UpgradedMasterVMs {
ku.logger.Infof("Master VM: %s is upgraded to expected orchestrator version", *vm.Name)
masterIndex, _ := utils.GetVMNameIndex(*vm.Properties.StorageProfile.OSDisk.OSType, *vm.Name)
upgradedMastersIndex[masterIndex] = true
}
for _, vm := range *ku.ClusterTopology.MasterVMs {
ku.logger.Infof("Upgrading Master VM: %s", *vm.Name)
masterIndex, _ := utils.GetVMNameIndex(*vm.Properties.StorageProfile.OSDisk.OSType, *vm.Name)
err = upgradeMasterNode.DeleteNode(vm.Name, false)
if err != nil {
ku.logger.Infof("Error deleting master VM: %s, err: %v", *vm.Name, err)
return err
}
err = upgradeMasterNode.CreateNode(ctx, "master", masterIndex)
if err != nil {
ku.logger.Infof("Error creating upgraded master VM: %s", *vm.Name)
return err
}
err = upgradeMasterNode.Validate(vm.Name)
if err != nil {
ku.logger.Infof("Error validating upgraded master VM: %s", *vm.Name)
return err
}
upgradedMastersIndex[masterIndex] = true
}
return nil
}
func (ku *Upgrader) upgradeAgentPools(ctx context.Context) error {
for _, agentPool := range ku.ClusterTopology.AgentPools {
// Upgrade Agent VMs
templateMap, parametersMap, err := ku.generateUpgradeTemplate(ku.ClusterTopology.DataModel, ku.AKSEngineVersion)
if err != nil {
ku.logger.Errorf("Error generating upgrade template: %v", err)
return ku.Translator.Errorf("Error generating upgrade template: %s", err.Error())
}
ku.logger.Infof("Prepping agent pool '%s' for upgrade...", *agentPool.Name)
preservePools := map[string]bool{*agentPool.Name: true}
transformer := &transform.Transformer{
Translator: ku.Translator,
}
if ku.ClusterTopology.DataModel.Properties.OrchestratorProfile.KubernetesConfig.PrivateJumpboxProvision() {
err = transformer.RemoveJumpboxResourcesFromTemplate(ku.logger, templateMap)
if err != nil {
return ku.Translator.Errorf("error removing jumpbox resources from template: %s", err.Error())
}
}
var isMasterManagedDisk bool
if ku.DataModel.Properties.MasterProfile != nil {
isMasterManagedDisk = ku.DataModel.Properties.MasterProfile.IsManagedDisks()
}
if ku.DataModel.Properties.OrchestratorProfile.KubernetesConfig.LoadBalancerSku == api.StandardLoadBalancerSku {
err = transformer.NormalizeForK8sSLBScalingOrUpgrade(ku.logger, templateMap)
if err != nil {
return ku.Translator.Errorf("error normalizing upgrade template for SLB: %s", err.Error())
}
}
if err = transformer.NormalizeResourcesForK8sAgentUpgrade(ku.logger, templateMap, isMasterManagedDisk, preservePools); err != nil {
ku.logger.Error(err.Error())
return ku.Translator.Errorf("Error generating upgrade template: %s", err.Error())
}
transformer.RemoveImmutableResourceProperties(ku.logger, templateMap)
var agentCount int
var agentPoolProfile *api.AgentPoolProfile
for _, app := range ku.ClusterTopology.DataModel.Properties.AgentPoolProfiles {
if app.Name == *agentPool.Name {
agentCount = app.Count
agentPoolProfile = app
break
}
}
if agentCount == 0 {
ku.logger.Infof("Agent pool '%s' is empty", *agentPool.Name)
return nil
}
upgradeAgentNode := UpgradeAgentNode{
Translator: ku.Translator,
logger: ku.logger,
}
upgradeAgentNode.TemplateMap = templateMap
upgradeAgentNode.ParametersMap = parametersMap
upgradeAgentNode.UpgradeContainerService = ku.ClusterTopology.DataModel
upgradeAgentNode.SubscriptionID = ku.ClusterTopology.SubscriptionID
upgradeAgentNode.ResourceGroup = ku.ClusterTopology.ResourceGroup
upgradeAgentNode.Client = ku.Client
upgradeAgentNode.kubeConfig = ku.kubeConfig
if ku.stepTimeout == nil {
upgradeAgentNode.timeout = defaultTimeout
} else {
upgradeAgentNode.timeout = *ku.stepTimeout
}
if ku.cordonDrainTimeout == nil {
upgradeAgentNode.cordonDrainTimeout = defaultCordonDrainTimeout
} else {
upgradeAgentNode.cordonDrainTimeout = *ku.cordonDrainTimeout
}
agentVMs := make(map[int]*vmInfo)
// Go over upgraded VMs and verify provisioning state
// per https://docs.microsoft.com/en-us/rest/api/compute/virtualmachines/virtualmachines-state :
// - Creating: Indicates the virtual Machine is being created.
// - Updating: Indicates that there is an update operation in progress on the Virtual Machine.
// - Succeeded: Indicates that the operation executed on the virtual machine succeeded.
// - Deleting: Indicates that the virtual machine is being deleted.
// - Failed: Indicates that the update operation on the Virtual Machine failed.
// Delete VMs in 'bad' state. Such VMs will be re-created later in this function.
upgradedCount := 0
for _, vm := range *agentPool.UpgradedAgentVMs {
ku.logger.Infof("Agent VM: %s, pool name: %s on expected orchestrator version", *vm.Name, *agentPool.Name)
var vmProvisioningState string
if vm.Properties != nil && vm.Properties.ProvisioningState != nil {
vmProvisioningState = *vm.Properties.ProvisioningState
}
agentIndex, _ := utils.GetVMNameIndex(*vm.Properties.StorageProfile.OSDisk.OSType, *vm.Name)
switch vmProvisioningState {
case "Creating", "Updating", "Succeeded":
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusUpgraded}
upgradedCount++
case "Failed":
ku.logger.Infof("Deleting agent VM %s in provisioning state %s", *vm.Name, vmProvisioningState)
err = upgradeAgentNode.DeleteNode(vm.Name, false)
if err != nil {
ku.logger.Errorf("Error deleting agent VM %s: %v", *vm.Name, err)
return err
}
case "Deleting":
fallthrough
default:
ku.logger.Infof("Ignoring agent VM %s in provisioning state %s", *vm.Name, vmProvisioningState)
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusIgnored}
}
}
for _, vm := range *agentPool.AgentVMs {
agentIndex, _ := utils.GetVMNameIndex(*vm.Properties.StorageProfile.OSDisk.OSType, *vm.Name)
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusNotUpgraded}
}
toBeUpgradedCount := len(*agentPool.AgentVMs)
ku.logger.Infof("Starting upgrade of %d agent nodes (out of %d) in pool identifier: %s, name: %s...",
toBeUpgradedCount, agentCount, *agentPool.Identifier, *agentPool.Name)
// Create missing nodes to match agentCount. This could be due to previous upgrade failure
// If there are nodes that need to be upgraded, create one extra node, which will be used to take on the load from upgrading nodes.
if toBeUpgradedCount > 0 {
agentCount++
}
newCreatedVMs := []string{}
client, err := ku.getKubernetesClient(10 * time.Second)
if err != nil {
ku.logger.Errorf("Error getting Kubernetes client: %v", err)
return err
}
for upgradedCount+toBeUpgradedCount < agentCount {
agentIndex := getAvailableIndex(agentVMs)
var vmName string
vmName, err = utils.GetK8sVMName(ku.DataModel.Properties, agentPoolProfile, agentIndex)
if err != nil {
ku.logger.Errorf("Error reconstructing agent VM name with index %d: %v", agentIndex, err)
return err
}
ku.logger.Infof("Creating new agent node %s (index %d)", vmName, agentIndex)
err = upgradeAgentNode.CreateNode(ctx, *agentPool.Name, agentIndex)
if err != nil {
ku.logger.Errorf("Error creating agent node %s (index %d): %v", vmName, agentIndex, err)
return err
}
err = upgradeAgentNode.Validate(&vmName)
if err != nil {
ku.logger.Infof("Error validating agent node %s (index %d): %v", vmName, agentIndex, err)
return err
}
newCreatedVMs = append(newCreatedVMs, vmName)
agentVMs[agentIndex] = &vmInfo{vmName, vmStatusUpgraded}
upgradedCount++
}
if toBeUpgradedCount == 0 {
ku.logger.Infof("No nodes to upgrade")
continue
}
// Upgrade nodes in agent pool
upgradedCount = 0
for agentIndex, vm := range agentVMs {
if vm.status != vmStatusNotUpgraded {
continue
}
ku.logger.Infof("Upgrading Agent VM: %s, pool name: %s", vm.name, *agentPool.Name)
// copy custom properties from old node to new node if the PreserveNodesProperties in AgentPoolProfile is not set to false explicitly.
preserveNodesProperties := api.DefaultPreserveNodesProperties
if agentPoolProfile != nil && agentPoolProfile.PreserveNodesProperties != nil {
preserveNodesProperties = *agentPoolProfile.PreserveNodesProperties
}
if preserveNodesProperties {
if len(newCreatedVMs) > 0 {
newNodeName := newCreatedVMs[0]
newCreatedVMs = newCreatedVMs[1:]
ku.logger.Infof("Copying custom annotations, labels, taints from old node %s to new node %s...", vm.name, newNodeName)
err = ku.copyCustomPropertiesToNewNode(client, strings.ToLower(vm.name), newNodeName)
if err != nil {
ku.logger.Warningf("Failed to copy custom annotations, labels, taints from old node %s to new node %s: %v", vm.name, newNodeName, err)
}
}
}
err := upgradeAgentNode.DeleteNode(&vm.name, true)
if err != nil {
ku.logger.Errorf("Error deleting agent VM %s: %v", vm.name, err)
return err
}
vmName, err := utils.GetK8sVMName(ku.DataModel.Properties, agentPoolProfile, agentIndex)
if err != nil {
ku.logger.Errorf("Error fetching new VM name: %v", err)
return err
}
// do not create last node in favor of already created extra node.
if upgradedCount == toBeUpgradedCount-1 {
ku.logger.Infof("Skipping creation of VM %s (index %d)", vmName, agentIndex)
delete(agentVMs, agentIndex)
} else {
err = upgradeAgentNode.CreateNode(ctx, *agentPool.Name, agentIndex)
if err != nil {
ku.logger.Errorf("Error creating upgraded agent VM %s: %v", vmName, err)
return err
}
err = upgradeAgentNode.Validate(&vmName)
if err != nil {
ku.logger.Errorf("Error validating upgraded agent VM %s: %v", vmName, err)
return err
}
newCreatedVMs = append(newCreatedVMs, vmName)
vm.status = vmStatusUpgraded
}
upgradedCount++
}
}
return nil
}
func (ku *Upgrader) generateUpgradeTemplate(upgradeContainerService *api.ContainerService, aksEngineVersion string) (map[string]interface{}, map[string]interface{}, error) {
var err error
ctx := engine.Context{
Translator: ku.Translator,
}
templateGenerator, err := engine.InitializeTemplateGenerator(ctx)
if err != nil {
return nil, nil, ku.Translator.Errorf("failed to initialize template generator: %s", err.Error())
}
_, err = upgradeContainerService.SetPropertiesDefaults(api.PropertiesDefaultsParams{
IsScale: false,
IsUpgrade: true,
PkiKeySize: helpers.DefaultPkiKeySize,
})
if err != nil {
return nil, nil, ku.Translator.Errorf("error in SetPropertiesDefaults: %s", err.Error())
}
var templateJSON string
var parametersJSON string
if templateJSON, parametersJSON, err = templateGenerator.GenerateTemplateV2(upgradeContainerService, engine.DefaultGeneratorCode, aksEngineVersion); err != nil {
return nil, nil, ku.Translator.Errorf("error generating upgrade template: %s", err.Error())
}
var template interface{}
var parameters interface{}
err = json.Unmarshal([]byte(templateJSON), &template)
if err != nil {
return nil, nil, ku.Translator.Errorf("error while unmarshaling the ARM template JSON: %s", err.Error())
}
err = json.Unmarshal([]byte(parametersJSON), ¶meters)
if err != nil {
return nil, nil, ku.Translator.Errorf("error while unmarshaling the ARM parameters JSON: %s", err.Error())
}
templateMap := template.(map[string]interface{})
parametersMap := parameters.(map[string]interface{})
return templateMap, parametersMap, nil
}
func (ku *Upgrader) copyCustomPropertiesToNewNode(client kubernetes.Client, oldNodeName string, newNodeName string) error {
// The new node is created without any taints, Kubernetes might schedule some pods on this newly created node before the taints/annotations/labels
// are copied over from corresponding old node. So drain the new node first before copying over the node properties.
// Note: SafelyDrainNodeWithClient() sets the Unschedulable of the node to true, set Unschedulable to false in copyCustomNodeProperties
var cordonDrainTimeout time.Duration
if ku.cordonDrainTimeout == nil {
cordonDrainTimeout = defaultCordonDrainTimeout
} else {
cordonDrainTimeout = *ku.cordonDrainTimeout
}
err := operations.SafelyDrainNodeWithClient(client, ku.logger, newNodeName, cordonDrainTimeout)
if err != nil {
ku.logger.Warningf("Error draining agent VM %s. Proceeding with copying node properties. Error: %v", newNodeName, err)
}
ch := make(chan struct{}, 1)
go func() {
for {
oldNode, err := client.GetNode(oldNodeName)
if err != nil {
ku.logger.Debugf("Failed to get properties of the old node %s: %v", oldNodeName, err)
time.Sleep(time.Second * 5)
continue
}
newNode, err := client.GetNode(newNodeName)
if err != nil {
ku.logger.Debugf("Failed to get properties of the new node %s: %v", newNodeName, err)
time.Sleep(time.Second * 5)
continue
}
err = ku.copyCustomNodeProperties(client, oldNodeName, oldNode, newNodeName, newNode)
if err != nil {
ku.logger.Debugf("Failed to copy custom annotations, labels, taints from old node %s to new node %s: %v", oldNodeName, newNodeName, err)
time.Sleep(time.Second * 5)
} else {
ch <- struct{}{}
}
}
}()
for {
select {
case <-ch:
ku.logger.Infof("Successfully copied custom annotations, labels, taints from old node %s to new node %s.", oldNodeName, newNodeName)
return nil
case <-time.After(nodePropertiesCopyTimeout):
err := fmt.Errorf("Copying custom annotations, labels, taints from old node %s to new node %s can't complete within %v", oldNodeName, newNodeName, nodePropertiesCopyTimeout)
ku.logger.Error(err.Error())
return err
}
}
}
func (ku *Upgrader) copyCustomNodeProperties(client kubernetes.Client, oldNodeName string, oldNode *v1.Node, newNodeName string, newNode *v1.Node) error {
// copy additional custom annotations from old node to new node
if oldNode.Annotations != nil {
if newNode.Annotations == nil {
newNode.Annotations = map[string]string{}
}
for k, v := range oldNode.Annotations {
if _, ok := newNode.Annotations[k]; !ok {
newNode.Annotations[k] = strings.Replace(v, oldNodeName, newNodeName, -1)
}
}
}
// copy additional custom labels from old node to new node
if oldNode.Labels != nil {
if newNode.Labels == nil {
newNode.Labels = map[string]string{}
}
for k, v := range oldNode.Labels {
if _, ok := newNode.Labels[k]; !ok {
newNode.Labels[k] = strings.Replace(v, oldNodeName, newNodeName, -1)
}
}
}
// copy Taints from old node to new node
if oldNode.Spec.Taints != nil {
newNode.Spec.Taints = append([]v1.Taint{}, oldNode.Spec.Taints...)
for i := range newNode.Spec.Taints {
newNode.Spec.Taints[i].Value = strings.Replace(newNode.Spec.Taints[i].Value, oldNodeName, newNodeName, -1)
}
}
newNode, err := client.UpdateNode(newNode)
if err != nil {
ku.logger.Warningf("Failed to update the new node %s: %v", newNodeName, err)
return err
}
newNode.Spec.Unschedulable = false
_, err = client.UpdateNode(newNode)
return err
}
func (ku *Upgrader) getKubernetesClient(timeout time.Duration) (kubernetes.Client, error) {
apiserverURL := ku.DataModel.Properties.GetMasterFQDN()
return ku.Client.GetKubernetesClient(
apiserverURL,
ku.kubeConfig,
interval,
timeout)
}
// return unused index within the range of agent indices, or subsequent index
func getAvailableIndex(vms map[int]*vmInfo) int {
maxIndex := 0
for indx := range vms {
if indx > maxIndex {
maxIndex = indx
}
}
for indx := 0; indx < maxIndex; indx++ {
if _, found := vms[indx]; !found {
return indx
}
}
return maxIndex + 1
}
// validatePodSecurityPolices tries to guess if the user needs to manually migrate from PSP to PSA
// before upgrading to Kubernetes v1.25
func (ku *Upgrader) validatePodSecurityPolices() error {
if common.IsKubernetesVersionGe(ku.CurrentVersion, common.PodSecurityPolicyRemovedVersion) {
return nil
}
ku.logger.Debug("Checking no user-created PodSecurityPolicies still present.")
client, err := ku.getKubernetesClient(getResourceTimeout)
if err != nil {
return errors.Wrap(err, "error getting Kubernetes client")
}
policies, err := client.ListPodSecurityPolices(metav1.ListOptions{})
if err != nil {
return errors.Wrap(err, "error listing PodSecurityPolices")
}
next := ku.DataModel.Properties.OrchestratorProfile.OrchestratorVersion
return validateUserCreatedPodSecurityPolices(ku.CurrentVersion, next, policies.Items)
}
// validatePodSecurityPolices tries to guess if the user needs to manually migrate from PSP to PSA
func validateUserCreatedPodSecurityPolices(currentVersion, upgradeVersion string, policies []policyv1beta1.PodSecurityPolicy) error {
if common.IsKubernetesVersionGe(currentVersion, common.PodSecurityPolicyRemovedVersion) {
return nil
}
if !common.IsKubernetesVersionGe(upgradeVersion, common.PodSecurityPolicyRemovedVersion) {
return nil
}
if len(policies) > 2 {
return errors.New("user-created PodSecurityPolices found in the cluster (try 'kubectl get psp'), " +
"migrate from PodSecurityPolices before upgrading to Kubernetes v1.25+, " +
"see https://github.com/Azure/aks-engine-azurestack/blob/master/docs/topics/pod-security.md")
}
for _, policy := range policies {
if policy.Name != "privileged" && policy.Name != "restricted" {
return errors.New("user-created PodSecurityPolices found in the cluster (try 'kubectl get psp'), " +
"migrate from PodSecurityPolices before upgrading to Kubernetes v1.25+, " +
"see https://github.com/Azure/aks-engine-azurestack/blob/master/docs/topics/pod-security.md")
}
}
return nil
}