pkg/operations/kubernetesupgrade/upgradeagentnode.go (116 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package kubernetesupgrade
import (
"context"
"fmt"
"math/rand"
"strings"
"time"
"github.com/pkg/errors"
"github.com/Azure/aks-engine-azurestack/pkg/api"
"github.com/Azure/aks-engine-azurestack/pkg/armhelpers"
"github.com/Azure/aks-engine-azurestack/pkg/i18n"
"github.com/Azure/aks-engine-azurestack/pkg/kubernetes"
"github.com/Azure/aks-engine-azurestack/pkg/operations"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
interval = time.Second * 1
retry = time.Second * 5
)
// Compiler to verify QueueMessageProcessor implements OperationsProcessor
var _ UpgradeNode = &UpgradeAgentNode{}
// UpgradeAgentNode upgrades a Kubernetes 1.5 agent node to 1.6
type UpgradeAgentNode struct {
Translator *i18n.Translator
logger *logrus.Entry
TemplateMap map[string]interface{}
ParametersMap map[string]interface{}
UpgradeContainerService *api.ContainerService
SubscriptionID string
ResourceGroup string
Client armhelpers.AKSEngineClient
kubeConfig string
timeout time.Duration
cordonDrainTimeout time.Duration
}
// DeleteNode takes state/resources of the master/agent node from ListNodeResources
// backs up/preserves state as needed by a specific version of Kubernetes and then deletes
// the node
// The 'drain' flag is used to invoke 'cordon and drain' flow.
func (kan *UpgradeAgentNode) DeleteNode(vmName *string, drain bool) error {
kubeAPIServerURL := kan.UpgradeContainerService.Properties.MasterProfile.FQDN
if vmName == nil || *vmName == "" {
return errors.Errorf("Error deleting VM: VM name was empty")
}
nodeName := strings.ToLower(*vmName)
client, err := kan.Client.GetKubernetesClient(kubeAPIServerURL, kan.kubeConfig, interval, kan.timeout)
if err != nil {
return err
}
// Cordon and drain the node
if drain {
err = operations.SafelyDrainNodeWithClient(client, kan.logger, nodeName, kan.cordonDrainTimeout)
if err != nil {
kan.logger.Warningf("Error draining agent VM %s. Proceeding with deletion. Error: %v", *vmName, err)
// Proceed with deletion anyways
}
}
// Delete VM in ARM
if err = operations.CleanDeleteVirtualMachine(kan.Client, kan.logger, kan.SubscriptionID, kan.ResourceGroup, *vmName); err != nil {
return err
}
// Delete VM in api server
if err = client.DeleteNode(nodeName); err != nil {
statusErr, ok := err.(*apierrors.StatusError)
if ok && statusErr.ErrStatus.Reason != v1.StatusReasonNotFound {
kan.logger.Warnf("Node %s got an error while deregistering: %#v", *vmName, err)
}
}
return nil
}
// CreateNode creates a new master/agent node with the targeted version of Kubernetes
func (kan *UpgradeAgentNode) CreateNode(ctx context.Context, poolName string, agentNo int) error {
poolCountParameter := kan.ParametersMap[poolName+"Count"].(map[string]interface{})
poolCountParameter["value"] = agentNo + 1
agentCount := poolCountParameter["value"]
kan.logger.Infof("Agent pool: %s, set count to: %d temporarily during upgrade. Upgrading agent: %d",
poolName, agentCount, agentNo)
poolOffsetVarName := poolName + "Offset"
templateVariables := kan.TemplateMap["variables"].(map[string]interface{})
templateVariables[poolOffsetVarName] = agentNo
random := rand.New(rand.NewSource(time.Now().UnixNano()))
deploymentSuffix := random.Int31()
deploymentName := fmt.Sprintf("k8s-upgrade-%s-%d-%s-%d", poolName, agentNo, time.Now().Format("06-01-02T15.04.05"), deploymentSuffix)
return armhelpers.DeployTemplateSync(kan.Client, kan.logger, kan.ResourceGroup, deploymentName, kan.TemplateMap, kan.ParametersMap)
}
// Validate will verify that agent node has been upgraded as expected.
func (kan *UpgradeAgentNode) Validate(vmName *string) error {
if vmName == nil || *vmName == "" {
kan.logger.Warningf("VM name was empty. Skipping node condition check")
return nil
}
nodeName := strings.ToLower(*vmName)
kan.logger.Infof("Validating %s", nodeName)
apiserverURL := kan.UpgradeContainerService.Properties.MasterProfile.FQDN
client, err := kan.Client.GetKubernetesClient(apiserverURL, kan.kubeConfig, interval, kan.timeout)
if err != nil {
return &armhelpers.DeploymentValidationError{Err: err}
}
retryTimer := time.NewTimer(time.Millisecond)
timeoutTimer := time.NewTimer(kan.timeout)
for {
select {
case <-timeoutTimer.C:
retryTimer.Stop()
err := kan.DeleteNode(vmName, false)
if err != nil {
kan.logger.Errorf("Error deleting agent VM %s: %v", *vmName, err)
return &armhelpers.DeploymentValidationError{Err: kan.Translator.Errorf("Node was not ready within %v. Failed to delete node %s, error: %v", kan.timeout, *vmName, err)}
}
return &armhelpers.DeploymentValidationError{Err: kan.Translator.Errorf("Node was not ready within %v. Succeeded to delete node %s.", kan.timeout, *vmName)}
case <-retryTimer.C:
agentNode, err := client.GetNode(nodeName)
if err != nil {
kan.logger.Infof("Agent node: %s status error: %v", nodeName, err)
retryTimer.Reset(retry)
} else if kubernetes.IsNodeReady(agentNode) {
kan.logger.Infof("Agent node: %s is ready", nodeName)
timeoutTimer.Stop()
return nil
} else {
kan.logger.Infof("Agent node: %s not ready yet...", nodeName)
retryTimer.Reset(retry)
}
}
}
}