cluster-autoscaler/cloudprovider/azure/azure_scale_set.go (660 lines of code) (raw):
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
klog "k8s.io/klog/v2"
"sigs.k8s.io/cloud-provider-azure/pkg/retry"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
)
var (
defaultVmssInstancesRefreshPeriod = 5 * time.Minute
vmssContextTimeout = 3 * time.Minute
asyncContextTimeout = 30 * time.Minute
vmssSizeMutex sync.Mutex
)
const (
provisioningStateCreating string = "Creating"
provisioningStateDeleting string = "Deleting"
provisioningStateFailed string = "Failed"
provisioningStateMigrating string = "Migrating"
provisioningStateSucceeded string = "Succeeded"
provisioningStateUpdating string = "Updating"
)
// ScaleSet implements NodeGroup interface.
type ScaleSet struct {
azureRef
manager *AzureManager
minSize int
maxSize int
enableForceDelete bool
enableDynamicInstanceList bool
enableDetailedCSEMessage bool
// Current Size (Number of VMs)
// curSize tracks (and caches) the number of VMs in this ScaleSet.
// It is periodically updated from vmss.Sku.Capacity, with VMSS itself coming
// either from azure.Cache (which periodically does VMSS.List)
// or from direct VMSS.Get (always used for Spot).
curSize int64
// sizeRefreshPeriod is how often curSize is refreshed from vmss.Sku.Capacity.
// (Set from azureCache.refreshInterval = VmssCacheTTL or [defaultMetadataCache]refreshInterval = 1min)
sizeRefreshPeriod time.Duration
// lastSizeRefresh is the time curSize was last refreshed from vmss.Sku.Capacity.
// Together with sizeRefreshPeriod, it is used to determine if it is time to refresh curSize.
lastSizeRefresh time.Time
// getVmssSizeRefreshPeriod is how often curSize should be refreshed in case VMSS.Get call is used.
// (Set from GetVmssSizeRefreshPeriod, if specified = get-vmss-size-refresh-period = 30s
getVmssSizeRefreshPeriod time.Duration
// sizeMutex protects curSize (the number of VMs in the ScaleSet) from concurrent access
sizeMutex sync.Mutex
InstanceCache
// uses Azure Dedicated Host
dedicatedHost bool
enableFastDeleteOnFailedProvisioning bool
}
// NewScaleSet creates a new NewScaleSet.
func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64, dedicatedHost bool) (*ScaleSet, error) {
scaleSet := &ScaleSet{
azureRef: azureRef{
Name: spec.Name,
},
minSize: spec.MinSize,
maxSize: spec.MaxSize,
manager: az,
curSize: curSize,
sizeRefreshPeriod: az.azureCache.refreshInterval,
InstanceCache: InstanceCache{
instancesRefreshJitter: az.config.VmssVmsCacheJitter,
},
enableForceDelete: az.config.EnableForceDelete,
enableDynamicInstanceList: az.config.EnableDynamicInstanceList,
enableDetailedCSEMessage: az.config.EnableDetailedCSEMessage,
dedicatedHost: dedicatedHost,
}
if az.config.VmssVirtualMachinesCacheTTLInSeconds != 0 {
scaleSet.instancesRefreshPeriod = time.Duration(az.config.VmssVirtualMachinesCacheTTLInSeconds) * time.Second
} else {
scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod
}
if az.config.GetVmssSizeRefreshPeriod != 0 {
scaleSet.getVmssSizeRefreshPeriod = time.Duration(az.config.GetVmssSizeRefreshPeriod) * time.Second
} else {
scaleSet.getVmssSizeRefreshPeriod = az.azureCache.refreshInterval
}
if az.config.EnableDetailedCSEMessage {
klog.V(2).Infof("enableDetailedCSEMessage: %t", scaleSet.enableDetailedCSEMessage)
}
scaleSet.enableFastDeleteOnFailedProvisioning = az.config.EnableFastDeleteOnFailedProvisioning
return scaleSet, nil
}
// MinSize returns minimum size of the node group.
func (scaleSet *ScaleSet) MinSize() int {
return scaleSet.minSize
}
// Exist checks if the node group really exists on the cloud provider side. Allows to tell the
// theoretical node group from the real one.
func (scaleSet *ScaleSet) Exist() bool {
return true
}
// Create creates the node group on the cloud provider side.
func (scaleSet *ScaleSet) Create() (cloudprovider.NodeGroup, error) {
return nil, cloudprovider.ErrAlreadyExist
}
// Delete deletes the node group on the cloud provider side.
// This will be executed only for autoprovisioned node groups, once their size drops to 0.
func (scaleSet *ScaleSet) Delete() error {
return cloudprovider.ErrNotImplemented
}
// Autoprovisioned returns true if the node group is autoprovisioned.
func (scaleSet *ScaleSet) Autoprovisioned() bool {
return false
}
// GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular
// NodeGroup. Returning a nil will result in using default options.
func (scaleSet *ScaleSet) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) {
template, err := scaleSet.getVMSSFromCache()
if err != nil {
klog.Errorf("failed to get information for VMSS: %s", scaleSet.Name)
// Note: We don't return an error here and instead accept defaults.
// Every invocation of GetOptions() returns an error if this condition is met:
// `if err != nil && err != cloudprovider.ErrNotImplemented`
// The error return value is intended to only capture unimplemented.
return nil, nil
}
return scaleSet.manager.GetScaleSetOptions(*template.Name, defaults), nil
}
// MaxSize returns maximum size of the node group.
func (scaleSet *ScaleSet) MaxSize() int {
return scaleSet.maxSize
}
func (scaleSet *ScaleSet) getVMSSFromCache() (compute.VirtualMachineScaleSet, error) {
allVMSS := scaleSet.manager.azureCache.getScaleSets()
if _, exists := allVMSS[scaleSet.Name]; !exists {
return compute.VirtualMachineScaleSet{}, fmt.Errorf("could not find vmss: %s", scaleSet.Name)
}
return allVMSS[scaleSet.Name], nil
}
func (scaleSet *ScaleSet) getCurSize() (int64, *GetVMSSFailedError) {
scaleSet.sizeMutex.Lock()
defer scaleSet.sizeMutex.Unlock()
set, err := scaleSet.getVMSSFromCache()
if err != nil {
klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err)
return -1, newGetVMSSFailedError(err, true)
}
// // Remove check for returning in-memory size when VMSS is in updating state
// // If VMSS state is updating, return the currentSize which would've been proactively incremented or decremented by CA
// // unless it's -1. In that case, its better to initialize it.
// if scaleSet.curSize != -1 && set.VirtualMachineScaleSetProperties != nil &&
// strings.EqualFold(to.String(set.VirtualMachineScaleSetProperties.ProvisioningState), string(compute.GalleryProvisioningStateUpdating)) {
// klog.V(3).Infof("VMSS %q is in updating state, returning cached size: %d", scaleSet.Name, scaleSet.curSize)
// return scaleSet.curSize, nil
// }
effectiveSizeRefreshPeriod := scaleSet.sizeRefreshPeriod
// If the scale set is Spot, we want to have a more fresh view of the Sku.Capacity field.
// This is because evictions can happen
// at any given point in time, even before VMs are materialized as
// nodes. We should be able to react to those and have the autoscaler
// readjust the goal again to force restoration.
if isSpot(&set) {
effectiveSizeRefreshPeriod = scaleSet.getVmssSizeRefreshPeriod
}
if scaleSet.lastSizeRefresh.Add(effectiveSizeRefreshPeriod).After(time.Now()) {
klog.V(3).Infof("VMSS: %s, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize)
return scaleSet.curSize, nil
}
// If the scale set is on Spot, make a GET VMSS call to fetch more updated fresh info
if isSpot(&set) {
ctx, cancel := getContextWithCancel()
defer cancel()
var rerr *retry.Error
set, rerr = scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(ctx, scaleSet.manager.config.ResourceGroup, scaleSet.Name)
if rerr != nil {
klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, rerr)
return -1, newGetVMSSFailedError(rerr.Error(), rerr.IsNotFound())
}
}
vmssSizeMutex.Lock()
curSize := *set.Sku.Capacity
vmssSizeMutex.Unlock()
if scaleSet.curSize != curSize {
// Invalidate the instance cache if the capacity has changed.
klog.V(5).Infof("VMSS %q size changed from: %d to %d, invalidating instance cache", scaleSet.Name, scaleSet.curSize, curSize)
scaleSet.invalidateInstanceCache()
}
klog.V(3).Infof("VMSS: %s, in-memory size: %d, new size: %d", scaleSet.Name, scaleSet.curSize, curSize)
scaleSet.curSize = curSize
scaleSet.lastSizeRefresh = time.Now()
return scaleSet.curSize, nil
}
// getScaleSetSize gets Scale Set size.
func (scaleSet *ScaleSet) getScaleSetSize() (int64, error) {
// First, get the current size of the ScaleSet
size, getVMSSError := scaleSet.getCurSize()
if size == -1 || getVMSSError != nil {
klog.V(3).Infof("getScaleSetSize: either size is -1 (actual: %d) or error exists (actual err:%v)", size, getVMSSError.error)
return size, getVMSSError.error
}
return size, nil
}
// waitForCreateOrUpdate waits for the outcome of VMSS capacity update initiated via CreateOrUpdateAsync.
func (scaleSet *ScaleSet) waitForCreateOrUpdateInstances(future *azure.Future) {
var err error
defer func() {
// Invalidate instanceCache on success and failure. Failure might have created a few instances, but it is very rare.
scaleSet.invalidateInstanceCache()
if err != nil {
klog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err)
// Invalidate the VMSS size cache in order to fetch the size from the API.
scaleSet.invalidateLastSizeRefreshWithLock()
scaleSet.manager.invalidateCache()
}
}()
ctx, cancel := getContextWithTimeout(asyncContextTimeout)
defer cancel()
klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(%s)", scaleSet.Name)
httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(ctx, future, scaleSet.manager.config.ResourceGroup)
isSuccess, err := isSuccessHTTPResponse(httpResponse, err)
if isSuccess {
klog.V(3).Infof("waitForCreateOrUpdateInstances(%s) success", scaleSet.Name)
return
}
klog.Errorf("waitForCreateOrUpdateInstances(%s) failed, err: %v", scaleSet.Name, err)
}
// setScaleSetSize sets ScaleSet size.
func (scaleSet *ScaleSet) setScaleSetSize(size int64, delta int) error {
vmssInfo, err := scaleSet.getVMSSFromCache()
if err != nil {
klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err)
return err
}
requiredInstances := delta
// If after reallocating instances we still need more instances or we're just in Delete mode
// send a scale request
if requiredInstances > 0 {
klog.V(3).Infof("Remaining unsatisfied count is %d. Attempting to increase scale set %q "+
"capacity", requiredInstances, scaleSet.Name)
err := scaleSet.createOrUpdateInstances(&vmssInfo, size)
if err != nil {
klog.Errorf("Failed to increase capacity for scale set %q to %d: %v", scaleSet.Name, requiredInstances, err)
return err
}
}
return nil
}
// TargetSize returns the current TARGET size of the node group. It is possible that the
// number is different from the number of nodes registered in Kubernetes.
func (scaleSet *ScaleSet) TargetSize() (int, error) {
size, err := scaleSet.getScaleSetSize()
return int(size), err
}
// IncreaseSize increases Scale Set size
func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
size, err := scaleSet.getScaleSetSize()
if err != nil {
return err
}
if size == -1 {
return fmt.Errorf("the scale set %s is under initialization, skipping IncreaseSize", scaleSet.Name)
}
if int(size)+delta > scaleSet.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", int(size)+delta, scaleSet.MaxSize())
}
return scaleSet.setScaleSetSize(size+int64(delta), delta)
}
// AtomicIncreaseSize is not implemented.
func (scaleSet *ScaleSet) AtomicIncreaseSize(delta int) error {
return cloudprovider.ErrNotImplemented
}
// GetScaleSetVms returns list of nodes for the given scale set.
func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, *retry.Error) {
ctx, cancel := getContextWithTimeout(vmssContextTimeout)
defer cancel()
vmList, rerr := scaleSet.manager.azClient.virtualMachineScaleSetVMsClient.List(ctx, scaleSet.manager.config.ResourceGroup,
scaleSet.Name, string(compute.InstanceViewTypesInstanceView))
klog.V(4).Infof("GetScaleSetVms: scaleSet.Name: %s, vmList: %v", scaleSet.Name, vmList)
if rerr != nil {
klog.Errorf("VirtualMachineScaleSetVMsClient.List failed for %s: %v", scaleSet.Name, rerr)
return nil, rerr
}
return vmList, nil
}
// GetFlexibleScaleSetVms returns list of nodes for flexible scale set.
func (scaleSet *ScaleSet) GetFlexibleScaleSetVms() ([]compute.VirtualMachine, *retry.Error) {
klog.V(4).Infof("GetScaleSetVms: starts")
ctx, cancel := getContextWithTimeout(vmssContextTimeout)
defer cancel()
// get VMSS info from cache to obtain ID currently scaleSet does not store ID info.
vmssInfo, err := scaleSet.getVMSSFromCache()
if err != nil {
klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err)
var rerr = &retry.Error{
RawError: err,
}
return nil, rerr
}
vmList, rerr := scaleSet.manager.azClient.virtualMachinesClient.ListVmssFlexVMsWithoutInstanceView(ctx, *vmssInfo.ID)
if rerr != nil {
klog.Errorf("VirtualMachineScaleSetVMsClient.List failed for %s: %v", scaleSet.Name, rerr)
return nil, rerr
}
klog.V(4).Infof("GetFlexibleScaleSetVms: scaleSet.Name: %s, vmList: %v", scaleSet.Name, vmList)
return vmList, nil
}
// DecreaseTargetSize decreases the target size of the node group. This function
// doesn't permit to delete any existing node and can be used only to reduce the
// request for new nodes that have not been yet fulfilled. Delta should be negative.
// It is assumed that cloud provider will not delete the existing nodes if the size
// when there is an option to just decrease the target.
func (scaleSet *ScaleSet) DecreaseTargetSize(delta int) error {
// VMSS size should be changed automatically after the Node deletion, hence this operation is not required.
// To prevent some unreproducible bugs, an extra refresh of cache is needed.
scaleSet.invalidateInstanceCache()
_, err := scaleSet.getScaleSetSize()
if err != nil {
klog.Warningf("DecreaseTargetSize: failed with error: %v", err)
}
return err
}
// Belongs returns true if the given node belongs to the NodeGroup.
func (scaleSet *ScaleSet) Belongs(node *apiv1.Node) (bool, error) {
klog.V(6).Infof("Check if node belongs to this scale set: scaleset:%v, node:%v\n", scaleSet, node)
ref := &azureRef{
Name: node.Spec.ProviderID,
}
targetAsg, err := scaleSet.manager.GetNodeGroupForInstance(ref)
if err != nil {
return false, err
}
if targetAsg == nil {
return false, fmt.Errorf("%s doesn't belong to a known scale set", node.Name)
}
if !strings.EqualFold(targetAsg.Id(), scaleSet.Id()) {
return false, nil
}
return true, nil
}
func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachineScaleSet, newSize int64) error {
if vmssInfo == nil {
return fmt.Errorf("vmssInfo cannot be nil while increating scaleSet capacity")
}
scaleSet.sizeMutex.Lock()
defer scaleSet.sizeMutex.Unlock()
// Update the new capacity to cache.
vmssSizeMutex.Lock()
vmssInfo.Sku.Capacity = &newSize
vmssSizeMutex.Unlock()
// Compose a new VMSS for updating.
op := compute.VirtualMachineScaleSet{
Name: vmssInfo.Name,
Sku: vmssInfo.Sku,
Location: vmssInfo.Location,
}
if vmssInfo.ExtendedLocation != nil {
op.ExtendedLocation = &compute.ExtendedLocation{
Name: vmssInfo.ExtendedLocation.Name,
Type: vmssInfo.ExtendedLocation.Type,
}
klog.V(3).Infof("Passing ExtendedLocation information if it is not nil, with Edge Zone name:(%s)", *op.ExtendedLocation.Name)
}
ctx, cancel := getContextWithTimeout(vmssContextTimeout)
defer cancel()
klog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdateAsync(%s)", scaleSet.Name)
future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdateAsync(ctx, scaleSet.manager.config.ResourceGroup, scaleSet.Name, op)
if rerr != nil {
klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %+v", scaleSet.Name, rerr)
return rerr.Error()
}
// Proactively set the VMSS size so autoscaler makes better decisions.
scaleSet.curSize = newSize
scaleSet.lastSizeRefresh = time.Now()
go scaleSet.waitForCreateOrUpdateInstances(future)
return nil
}
// DeleteInstances deletes the given instances. All instances must be controlled by the same nodegroup.
func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregisteredNodes bool) error {
if len(instances) == 0 {
return nil
}
klog.V(3).Infof("Deleting vmss instances %v", instances)
commonAsg, err := scaleSet.manager.GetNodeGroupForInstance(instances[0])
if err != nil {
return err
}
instancesToDelete := []*azureRef{}
for _, instance := range instances {
err = scaleSet.verifyNodeGroup(instance, commonAsg.Id())
if err != nil {
return err
}
if cpi, found, err := scaleSet.getInstanceByProviderID(instance.Name); found && err == nil && cpi.Status != nil &&
cpi.Status.State == cloudprovider.InstanceDeleting {
klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name)
continue
}
instancesToDelete = append(instancesToDelete, instance)
}
// nothing to delete
if len(instancesToDelete) == 0 {
klog.V(3).Infof("No new instances eligible for deletion, skipping")
return nil
}
instanceIDs := []string{}
for _, instance := range instancesToDelete {
instanceID, err := getLastSegment(instance.Name)
if err != nil {
klog.Errorf("getLastSegment failed with error: %v", err)
return err
}
instanceIDs = append(instanceIDs, instanceID)
}
requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}
ctx, cancel := getContextWithTimeout(vmssContextTimeout)
defer cancel()
future, rerr := scaleSet.deleteInstances(ctx, requiredIds, commonAsg.Id())
if rerr != nil {
klog.Errorf("virtualMachineScaleSetsClient.DeleteInstancesAsync for instances %v for %s failed: %+v", requiredIds.InstanceIds, scaleSet.Name, rerr)
return rerr.Error()
}
if !scaleSet.manager.config.StrictCacheUpdates {
// Proactively decrement scale set size so that we don't
// go below minimum node count if cache data is stale
// only do it for non-unregistered nodes
if !hasUnregisteredNodes {
scaleSet.sizeMutex.Lock()
scaleSet.curSize -= int64(len(instanceIDs))
scaleSet.lastSizeRefresh = time.Now()
scaleSet.sizeMutex.Unlock()
}
// Proactively set the status of the instances to be deleted in cache
for _, instance := range instancesToDelete {
scaleSet.setInstanceStatusByProviderID(instance.Name, cloudprovider.InstanceStatus{State: cloudprovider.InstanceDeleting})
}
}
go scaleSet.waitForDeleteInstances(future, requiredIds)
return nil
}
func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) {
ctx, cancel := getContextWithTimeout(asyncContextTimeout)
defer cancel()
klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s", requiredIds.InstanceIds, scaleSet.Name)
httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(ctx, future, scaleSet.manager.config.ResourceGroup)
isSuccess, err := isSuccessHTTPResponse(httpResponse, err)
if isSuccess {
klog.V(3).Infof(".WaitForDeleteInstancesResult(%v) for %s success", requiredIds.InstanceIds, scaleSet.Name)
if scaleSet.manager.config.StrictCacheUpdates {
if err := scaleSet.manager.forceRefresh(); err != nil {
klog.Errorf("forceRefresh failed with error: %v", err)
}
scaleSet.invalidateInstanceCache()
}
return
}
if !scaleSet.manager.config.StrictCacheUpdates {
// On failure, invalidate the instanceCache - cannot have instances in deletingState
scaleSet.invalidateInstanceCache()
}
klog.Errorf("WaitForDeleteInstancesResult(%v) for %s failed with error: %v", requiredIds.InstanceIds, scaleSet.Name, err)
}
// DeleteNodes deletes the nodes from the group.
func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error {
klog.V(8).Infof("Delete nodes requested: %q\n", nodes)
size, err := scaleSet.getScaleSetSize()
if err != nil {
return err
}
if int(size) <= scaleSet.MinSize() {
return fmt.Errorf("min size reached, nodes will not be deleted")
}
// Distinguish between unregistered node deletion and normal node deletion
refs := make([]*azureRef, 0, len(nodes))
hasUnregisteredNodes := false
unregisteredRefs := make([]*azureRef, 0, len(nodes))
for _, node := range nodes {
belongs, err := scaleSet.Belongs(node)
if err != nil {
return err
}
if belongs != true {
return fmt.Errorf("%s belongs to a different asg than %s", node.Name, scaleSet.Id())
}
if node.Annotations[cloudprovider.FakeNodeReasonAnnotation] == cloudprovider.FakeNodeUnregistered {
hasUnregisteredNodes = true
}
ref := &azureRef{
Name: node.Spec.ProviderID,
}
if node.Annotations[cloudprovider.FakeNodeReasonAnnotation] == cloudprovider.FakeNodeUnregistered {
klog.V(5).Infof("Node: %s type is unregistered..Appending to the unregistered list", node.Name)
unregisteredRefs = append(unregisteredRefs, ref)
} else {
refs = append(refs, ref)
}
}
if len(unregisteredRefs) > 0 {
klog.V(3).Infof("Removing unregisteredNodes: %v", unregisteredRefs)
return scaleSet.DeleteInstances(unregisteredRefs, true)
}
return scaleSet.DeleteInstances(refs, hasUnregisteredNodes)
}
// ForceDeleteNodes deletes nodes from the group regardless of constraints.
func (scaleSet *ScaleSet) ForceDeleteNodes(nodes []*apiv1.Node) error {
return cloudprovider.ErrNotImplemented
}
// Id returns ScaleSet id.
func (scaleSet *ScaleSet) Id() string {
return scaleSet.Name
}
// Debug returns a debug string for the Scale Set.
func (scaleSet *ScaleSet) Debug() string {
return fmt.Sprintf("%s (%d:%d)", scaleSet.Id(), scaleSet.MinSize(), scaleSet.MaxSize())
}
// TemplateNodeInfo returns a node template for this scale set.
func (scaleSet *ScaleSet) TemplateNodeInfo() (*framework.NodeInfo, error) {
template, err := scaleSet.getVMSSFromCache()
if err != nil {
return nil, err
}
inputLabels := map[string]string{}
inputTaints := ""
node, err := buildNodeFromTemplate(scaleSet.Name, inputLabels, inputTaints, template, scaleSet.manager, scaleSet.enableDynamicInstanceList)
if err != nil {
return nil, err
}
nodeInfo := framework.NewNodeInfo(node, nil, &framework.PodInfo{Pod: cloudprovider.BuildKubeProxy(scaleSet.Name)})
return nodeInfo, nil
}
// Nodes returns a list of all nodes that belong to this node group.
func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
curSize, getVMSSError := scaleSet.getCurSize()
if getVMSSError != nil {
klog.Errorf("Failed to get current size for vmss %q: %v", scaleSet.Name, getVMSSError.error)
if getVMSSError.notFound {
return []cloudprovider.Instance{}, nil // Don't return error if VMSS not found
}
return nil, getVMSSError.error // We want to return error if other errors occur.
}
scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()
if int64(len(scaleSet.instanceCache)) == curSize &&
scaleSet.lastInstanceRefresh.Add(scaleSet.instancesRefreshPeriod).After(time.Now()) {
klog.V(4).Infof("Nodes: returns with curSize %d", curSize)
return scaleSet.instanceCache, nil
}
// Forcefully updating the instanceCache as the instanceCacheSize didn't match curSize or cache is invalid.
err := scaleSet.updateInstanceCache()
if err != nil {
return nil, err
}
klog.V(4).Infof("Nodes: returns")
return scaleSet.instanceCache, nil
}
// buildScaleSetCacheForFlex is used by orchestrationMode == compute.Flexible
func (scaleSet *ScaleSet) buildScaleSetCacheForFlex() error {
klog.V(3).Infof("buildScaleSetCacheForFlex: resetting instance Cache for scaleSet %s",
scaleSet.Name)
splay := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(scaleSet.instancesRefreshJitter + 1)
lastRefresh := time.Now().Add(-time.Second * time.Duration(splay))
vms, rerr := scaleSet.GetFlexibleScaleSetVms()
if rerr != nil {
if isAzureRequestsThrottled(rerr) {
// Log a warning and update the instance refresh time so that it would retry after cache expiration
klog.Warningf("GetFlexibleScaleSetVms() is throttled with message %v, would return the cached instances", rerr)
scaleSet.lastInstanceRefresh = lastRefresh
return nil
}
return rerr.Error()
}
scaleSet.instanceCache = buildInstanceCacheForFlex(vms, scaleSet.enableFastDeleteOnFailedProvisioning)
scaleSet.lastInstanceRefresh = lastRefresh
return nil
}
func (scaleSet *ScaleSet) buildScaleSetCacheForUniform() error {
klog.V(3).Infof("updateInstanceCache: resetting instance Cache for scaleSet %s",
scaleSet.Name)
splay := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(scaleSet.instancesRefreshJitter + 1)
lastRefresh := time.Now().Add(-time.Second * time.Duration(splay))
vms, rerr := scaleSet.GetScaleSetVms()
if rerr != nil {
if isAzureRequestsThrottled(rerr) {
// Log a warning and update the instance refresh time so that it would retry later.
// Ensure to retry no sooner than rerr.RetryAfter
klog.Warningf("updateInstanceCache: GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr)
nextRefresh := lastRefresh.Add(scaleSet.instancesRefreshPeriod)
if nextRefresh.Before(rerr.RetryAfter) {
delay := rerr.RetryAfter.Sub(nextRefresh)
lastRefresh = lastRefresh.Add(delay)
}
scaleSet.lastInstanceRefresh = lastRefresh
return nil
}
return rerr.Error()
}
instances := []cloudprovider.Instance{}
// Note that the GetScaleSetVms() results is not used directly because for the List endpoint,
// their resource ID format is not consistent with Get endpoint
for i := range vms {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if *vms[i].ID == "" {
continue
}
resourceID, err := convertResourceGroupNameToLower(*vms[i].ID)
if err != nil {
// This shouldn't happen. Log a warning message for tracking.
klog.Warningf("updateInstanceCache: buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err)
continue
}
instances = append(instances, cloudprovider.Instance{
Id: azurePrefix + resourceID,
Status: scaleSet.instanceStatusFromVM(&vms[i]),
})
}
scaleSet.instanceCache = instances
scaleSet.lastInstanceRefresh = lastRefresh
return nil
}
// Note that the GetScaleSetVms() results is not used directly because for the List endpoint,
// their resource ID format is not consistent with Get endpoint
// buildInstanceCacheForFlex used by orchestrationMode == compute.Flexible
func buildInstanceCacheForFlex(vms []compute.VirtualMachine, enableFastDeleteOnFailedProvisioning bool) []cloudprovider.Instance {
var instances []cloudprovider.Instance
for _, vm := range vms {
powerState := vmPowerStateRunning
if vm.InstanceView != nil && vm.InstanceView.Statuses != nil {
powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses)
}
addVMToCache(&instances, vm.ID, vm.ProvisioningState, powerState, enableFastDeleteOnFailedProvisioning)
}
return instances
}
// addVMToCache used by orchestrationMode == compute.Flexible
func addVMToCache(instances *[]cloudprovider.Instance, id, provisioningState *string, powerState string, enableFastDeleteOnFailedProvisioning bool) {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*id) == 0 {
return
}
resourceID, err := convertResourceGroupNameToLower(*id)
if err != nil {
// This shouldn't happen. Log a warning message for tracking.
klog.Warningf("buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err)
return
}
*instances = append(*instances, cloudprovider.Instance{
Id: azurePrefix + resourceID,
Status: instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState, enableFastDeleteOnFailedProvisioning),
})
}
// instanceStatusFromProvisioningStateAndPowerState converts the VM provisioning state to cloudprovider.InstanceStatus
// instanceStatusFromProvisioningStateAndPowerState used by orchestrationMode == compute.Flexible
// Suggestion: reunify this with scaleSet.instanceStatusFromVM()
func instanceStatusFromProvisioningStateAndPowerState(resourceID string, provisioningState *string, powerState string, enableFastDeleteOnFailedProvisioning bool) *cloudprovider.InstanceStatus {
if provisioningState == nil {
return nil
}
klog.V(5).Infof("Getting vm instance provisioning state %s for %s", *provisioningState, resourceID)
status := &cloudprovider.InstanceStatus{}
switch *provisioningState {
case provisioningStateDeleting:
status.State = cloudprovider.InstanceDeleting
case provisioningStateCreating:
status.State = cloudprovider.InstanceCreating
case provisioningStateFailed:
status.State = cloudprovider.InstanceRunning
if enableFastDeleteOnFailedProvisioning {
// Provisioning can fail both during instance creation or after the instance is running.
// Per https://learn.microsoft.com/en-us/azure/virtual-machines/states-billing#provisioning-states,
// ProvisioningState represents the most recent provisioning state, therefore only report
// InstanceCreating errors when the power state indicates the instance has not yet started running
if !isRunningVmPowerState(powerState) {
klog.V(4).Infof("VM %s reports failed provisioning state with non-running power state: %s", resourceID, powerState)
status.State = cloudprovider.InstanceCreating
status.ErrorInfo = &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OutOfResourcesErrorClass,
ErrorCode: "provisioning-state-failed",
ErrorMessage: "Azure failed to provision a node for this node group",
}
} else {
klog.V(5).Infof("VM %s reports a failed provisioning state but is running (%s)", resourceID, powerState)
status.State = cloudprovider.InstanceRunning
}
}
default:
status.State = cloudprovider.InstanceRunning
}
return status
}
func isSpot(vmss *compute.VirtualMachineScaleSet) bool {
return vmss != nil && vmss.VirtualMachineScaleSetProperties != nil &&
vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile != nil &&
vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile.Priority == compute.Spot
}
func (scaleSet *ScaleSet) invalidateLastSizeRefreshWithLock() {
scaleSet.sizeMutex.Lock()
scaleSet.lastSizeRefresh = time.Now().Add(-1 * scaleSet.sizeRefreshPeriod)
scaleSet.sizeMutex.Unlock()
}
func (scaleSet *ScaleSet) getOrchestrationMode() (compute.OrchestrationMode, error) {
vmss, err := scaleSet.getVMSSFromCache()
if err != nil {
klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err)
return "", err
}
return vmss.OrchestrationMode, nil
}
func (scaleSet *ScaleSet) cseErrors(extensions *[]compute.VirtualMachineExtensionInstanceView) ([]string, bool) {
var errs []string
failed := false
if extensions != nil {
for _, extension := range *extensions {
if strings.EqualFold(to.String(extension.Name), vmssCSEExtensionName) && extension.Statuses != nil {
for _, status := range *extension.Statuses {
if status.Level == "Error" {
errs = append(errs, to.String(status.Message))
failed = true
}
}
}
}
}
return errs, failed
}
func (scaleSet *ScaleSet) getSKU() string {
vmssInfo, err := scaleSet.getVMSSFromCache()
if err != nil {
klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err)
return ""
}
return to.String(vmssInfo.Sku.Name)
}
func (scaleSet *ScaleSet) verifyNodeGroup(instance *azureRef, commonNgID string) error {
ng, err := scaleSet.manager.GetNodeGroupForInstance(instance)
if err != nil {
return err
}
if !strings.EqualFold(ng.Id(), commonNgID) {
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)",
instance.Name, commonNgID)
}
return nil
}
// GetVMSSFailedError is used to differentiate between
// NotFound and other errors
type GetVMSSFailedError struct {
notFound bool
error error
}
func newGetVMSSFailedError(error error, notFound bool) *GetVMSSFailedError {
return &GetVMSSFailedError{
error: error,
notFound: notFound,
}
}
func (v *GetVMSSFailedError) Error() string {
return v.error.Error()
}