cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go (508 lines of code) (raw):
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"fmt"
"reflect"
"strings"
"sync"
"time"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
klog "k8s.io/klog/v2"
)
const (
scaleToZeroSupported = true
placeholderInstanceNamePrefix = "i-placeholder"
placeholderUnfulfillableStatus = "placeholder-cannot-be-fulfilled"
)
type asgCache struct {
registeredAsgs map[AwsRef]*asg
asgToInstances map[AwsRef][]AwsInstanceRef
instanceToAsg map[AwsInstanceRef]*asg
instanceStatus map[AwsInstanceRef]*string
instanceLifecycle map[AwsInstanceRef]*string
asgInstanceTypeCache *instanceTypeExpirationStore
mutex sync.Mutex
awsService *awsWrapper
interrupt chan struct{}
asgAutoDiscoverySpecs []asgAutoDiscoveryConfig
explicitlyConfigured map[AwsRef]bool
autoscalingOptions map[AwsRef]map[string]string
}
type launchTemplate struct {
name string
version string
}
type mixedInstancesPolicy struct {
launchTemplate *launchTemplate
instanceTypesOverrides []string
instanceRequirementsOverrides *autoscaling.InstanceRequirements
instanceRequirements *ec2.InstanceRequirements
}
type asg struct {
AwsRef
minSize int
maxSize int
curSize int
lastUpdateTime time.Time
AvailabilityZones []string
LaunchConfigurationName string
LaunchTemplate *launchTemplate
MixedInstancesPolicy *mixedInstancesPolicy
Tags []*autoscaling.TagDescription
}
func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) {
registry := &asgCache{
registeredAsgs: make(map[AwsRef]*asg, 0),
awsService: awsService,
asgToInstances: make(map[AwsRef][]AwsInstanceRef),
instanceToAsg: make(map[AwsInstanceRef]*asg),
instanceStatus: make(map[AwsInstanceRef]*string),
instanceLifecycle: make(map[AwsInstanceRef]*string),
asgInstanceTypeCache: newAsgInstanceTypeCache(awsService),
interrupt: make(chan struct{}),
asgAutoDiscoverySpecs: autoDiscoverySpecs,
explicitlyConfigured: make(map[AwsRef]bool),
autoscalingOptions: make(map[AwsRef]map[string]string),
}
if err := registry.parseExplicitAsgs(explicitSpecs); err != nil {
return nil, err
}
return registry, nil
}
// Use a function variable for ease of testing
var getInstanceTypeForAsg = func(m *asgCache, group *asg) (string, error) {
if obj, found, _ := m.asgInstanceTypeCache.GetByKey(group.AwsRef.Name); found {
return obj.(instanceTypeCachedObject).instanceType, nil
}
result, err := m.awsService.getInstanceTypesForAsgs([]*asg{group})
if err != nil {
return "", fmt.Errorf("could not get instance type for %s: %w", group.AwsRef.Name, err)
}
if instanceType, ok := result[group.AwsRef.Name]; ok {
return instanceType, nil
}
return "", fmt.Errorf("could not find instance type for %s", group.AwsRef.Name)
}
// Fetch explicitly configured ASGs. These ASGs should never be unregistered
// during refreshes, even if they no longer exist in AWS.
func (m *asgCache) parseExplicitAsgs(specs []string) error {
for _, spec := range specs {
asg, err := m.buildAsgFromSpec(spec)
if err != nil {
return fmt.Errorf("failed to parse node group spec: %v", err)
}
m.explicitlyConfigured[asg.AwsRef] = true
m.register(asg)
}
return nil
}
// Register ASG. Returns the registered ASG.
func (m *asgCache) register(asg *asg) *asg {
if existing, asgExists := m.registeredAsgs[asg.AwsRef]; asgExists {
if reflect.DeepEqual(existing, asg) {
return existing
}
// Explicit registered groups should always use the manually provided min/max
// values and the not the ones returned by the API
if !m.explicitlyConfigured[asg.AwsRef] {
existing.minSize = asg.minSize
existing.maxSize = asg.maxSize
}
existing.curSize = asg.curSize
// Those information are mainly required to create templates when scaling
// from zero
existing.AvailabilityZones = asg.AvailabilityZones
existing.LaunchConfigurationName = asg.LaunchConfigurationName
existing.LaunchTemplate = asg.LaunchTemplate
existing.MixedInstancesPolicy = asg.MixedInstancesPolicy
existing.Tags = asg.Tags
klog.V(4).Infof("Updated ASG cache for %s. min/max/current is %d/%d/%d", asg.AwsRef.Name, existing.minSize, existing.maxSize, existing.curSize)
return existing
}
klog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name)
m.registeredAsgs[asg.AwsRef] = asg
return asg
}
// Unregister ASG. Returns the unregistered ASG.
func (m *asgCache) unregister(a *asg) *asg {
if _, asgExists := m.registeredAsgs[a.AwsRef]; asgExists {
klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name)
delete(m.registeredAsgs, a.AwsRef)
}
return a
}
func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) {
s, err := dynamic.SpecFromString(spec, scaleToZeroSupported)
if err != nil {
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
}
asg := &asg{
AwsRef: AwsRef{Name: s.Name},
minSize: s.MinSize,
maxSize: s.MaxSize,
}
return asg, nil
}
// Get returns the currently registered ASGs
func (m *asgCache) Get() map[AwsRef]*asg {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.registeredAsgs
}
// GetAutoscalingOptions return autoscaling options strings obtained from ASG tags.
func (m *asgCache) GetAutoscalingOptions(ref AwsRef) map[string]string {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.autoscalingOptions[ref]
}
// FindForInstance returns AsgConfig of the given Instance
func (m *asgCache) FindForInstance(instance AwsInstanceRef) *asg {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.findForInstance(instance)
}
func (m *asgCache) findForInstance(instance AwsInstanceRef) *asg {
if asg, found := m.instanceToAsg[instance]; found {
return asg
}
return nil
}
// InstancesByAsg returns the nodes of an ASG
func (m *asgCache) InstancesByAsg(ref AwsRef) ([]AwsInstanceRef, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if instances, found := m.asgToInstances[ref]; found {
return instances, nil
}
return nil, fmt.Errorf("error while looking for instances of ASG: %s", ref)
}
func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if status, found := m.instanceStatus[ref]; found {
return status, nil
}
return nil, fmt.Errorf("could not find instance %v", ref)
}
func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (*string, error) {
if lifecycle, found := m.instanceLifecycle[ref]; found {
return lifecycle, nil
}
return nil, fmt.Errorf("could not find instance %v", ref)
}
func (m *asgCache) SetAsgSize(asg *asg, size int) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.setAsgSizeNoLock(asg, size)
}
func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
params := &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asg.Name),
DesiredCapacity: aws.Int64(int64(size)),
HonorCooldown: aws.Bool(false),
}
klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
start := time.Now()
_, err := m.awsService.SetDesiredCapacity(params)
observeAWSRequest("SetDesiredCapacity", err, start)
if err != nil {
return err
}
// Proactively set the ASG size so autoscaler makes better decisions
asg.lastUpdateTime = start
asg.curSize = size
return nil
}
func (m *asgCache) decreaseAsgSizeByOneNoLock(asg *asg) error {
return m.setAsgSizeNoLock(asg, asg.curSize-1)
}
// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if len(instances) == 0 {
return nil
}
commonAsg := m.findForInstance(*instances[0])
if commonAsg == nil {
return fmt.Errorf("can't delete instance %s, which is not part of an ASG", instances[0].Name)
}
for _, instance := range instances {
asg := m.findForInstance(*instance)
if asg != commonAsg {
instanceIds := make([]string, len(instances))
for i, instance := range instances {
instanceIds[i] = instance.Name
}
return fmt.Errorf("can't delete instances %s as they belong to at least two different ASGs (%s and %s)", strings.Join(instanceIds, ","), commonAsg.Name, asg.Name)
}
}
placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances)
// Check if there are any placeholder instances in the list.
if placeHolderInstancesCount > 0 {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected %d placeholder instance(s) in ASG %s",
placeHolderInstancesCount, commonAsg.Name)
asgNames := []string{commonAsg.Name}
asgDetail, err := m.awsService.getAutoscalingGroupsByNames(asgNames)
if err != nil {
klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err)
return err
}
activeInstancesInAsg := len(asgDetail[0].Instances)
desiredCapacityInAsg := int(*asgDetail[0].DesiredCapacity)
klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d. updating ASG to match active instances count",
commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg)
// If the difference between the active instances and the desired capacity is greater than 1,
// it means that the ASG is under-provisioned and the desired capacity is not being reached.
// In this case, we would reduce the size of ASG by the count of unprovisioned instances
// which is equal to the total count of active instances in ASG
err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg)
if err != nil {
klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err)
return err
}
}
for _, instance := range instances {
if m.isPlaceholderInstance(instance) {
// skipping placeholder as placeholder instances don't exist
// and we have already reduced ASG size during placeholder check.
continue
}
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}
if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)
// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--
}
return nil
}
// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix)
}
// Fetch automatically discovered ASGs. These ASGs should be unregistered if
// they no longer exist in AWS.
func (m *asgCache) buildAsgTags() map[string]string {
groupTags := map[string]string{}
for _, spec := range m.asgAutoDiscoverySpecs {
for k, v := range spec.Tags {
groupTags[k] = v
}
}
return groupTags
}
func (m *asgCache) buildAsgNames() []string {
refreshNames := make([]string, len(m.explicitlyConfigured))
i := 0
for k := range m.explicitlyConfigured {
refreshNames[i] = k.Name
i++
}
return refreshNames
}
// regenerate the cached view of explicitly configured and auto-discovered ASGs
func (m *asgCache) regenerate() error {
m.mutex.Lock()
defer m.mutex.Unlock()
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
newInstanceStatusMap := make(map[AwsInstanceRef]*string)
newInstanceLifecycleMap := make(map[AwsInstanceRef]*string)
// Fetch details of all ASGs
refreshNames := m.buildAsgNames()
klog.V(4).Infof("Regenerating instance to ASG map for ASG names: %v", refreshNames)
namedGroups, err := m.awsService.getAutoscalingGroupsByNames(refreshNames)
if err != nil {
return err
}
refreshTags := m.buildAsgTags()
klog.V(4).Infof("Regenerating instance to ASG map for ASG tags: %v", refreshTags)
taggedGroups, err := m.awsService.getAutoscalingGroupsByTags(refreshTags)
if err != nil {
return err
}
groups := append(namedGroups, taggedGroups...)
// If currently any ASG has more Desired than running Instances, introduce placeholders
// for the instances to come up. This is required to track Desired instances that
// will never come up, like with Spot Request that can't be fulfilled
groups = m.createPlaceholdersForDesiredNonStartedInstances(groups)
// Register or update ASGs
exists := make(map[AwsRef]bool)
for _, group := range groups {
asg, err := m.buildAsgFromAWS(group)
if err != nil {
return err
}
exists[asg.AwsRef] = true
asg = m.register(asg)
newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))
for i, instance := range group.Instances {
ref := m.buildInstanceRefFromAWS(instance)
newInstanceToAsgCache[ref] = asg
newAsgToInstancesCache[asg.AwsRef][i] = ref
newInstanceStatusMap[ref] = instance.HealthStatus
newInstanceLifecycleMap[ref] = instance.LifecycleState
}
}
// Unregister no longer existing auto-discovered ASGs
for _, asg := range m.registeredAsgs {
if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
m.unregister(asg)
}
}
err = m.asgInstanceTypeCache.populate(m.registeredAsgs)
if err != nil {
klog.Warningf("Failed to fully populate ASG->instanceType mapping: %v", err)
}
// Rebuild autoscaling options cache
newAutoscalingOptions := make(map[AwsRef]map[string]string)
for _, asg := range m.registeredAsgs {
options := extractAutoscalingOptionsFromTags(asg.Tags)
if !reflect.DeepEqual(m.autoscalingOptions[asg.AwsRef], options) {
klog.V(4).Infof("Extracted autoscaling options from %q ASG tags: %v", asg.Name, options)
}
newAutoscalingOptions[asg.AwsRef] = options
}
m.asgToInstances = newAsgToInstancesCache
m.instanceToAsg = newInstanceToAsgCache
m.autoscalingOptions = newAutoscalingOptions
m.instanceStatus = newInstanceStatusMap
m.instanceLifecycle = newInstanceLifecycleMap
return nil
}
func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group {
for _, g := range groups {
desired := *g.DesiredCapacity
realInstances := int64(len(g.Instances))
if desired <= realInstances {
continue
}
klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+
"Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired)
healthStatus := ""
isAvailable, err := m.isNodeGroupAvailable(g)
if err != nil {
klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err)
} else if !isAvailable {
klog.Warningf("Instance group %s cannot provision any more nodes!", *g.AutoScalingGroupName)
healthStatus = placeholderUnfulfillableStatus
}
for i := realInstances; i < desired; i++ {
id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i)
g.Instances = append(g.Instances, &autoscaling.Instance{
InstanceId: &id,
AvailabilityZone: g.AvailabilityZones[0],
HealthStatus: &healthStatus,
})
}
}
return groups
}
func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: group.AutoScalingGroupName,
}
start := time.Now()
response, err := m.awsService.DescribeScalingActivities(input)
observeAWSRequest("DescribeScalingActivities", err, start)
if err != nil {
return true, err // If we can't describe the scaling activities we assume the node group is available
}
for _, activity := range response.Activities {
asgRef := AwsRef{Name: *group.AutoScalingGroupName}
if a, ok := m.registeredAsgs[asgRef]; ok {
lut := a.lastUpdateTime
if activity.StartTime.Before(lut) {
break
} else if *activity.StatusCode == "Failed" {
klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity)
return false, nil
}
} else {
klog.V(4).Infof("asg %v is not registered yet, skipping DescribeScalingActivities check", asgRef.Name)
}
}
return true, nil
}
func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
spec := dynamic.NodeGroupSpec{
Name: aws.StringValue(g.AutoScalingGroupName),
MinSize: int(aws.Int64Value(g.MinSize)),
MaxSize: int(aws.Int64Value(g.MaxSize)),
SupportScaleToZero: scaleToZeroSupported,
}
if verr := spec.Validate(); verr != nil {
return nil, fmt.Errorf("failed to create node group spec: %v", verr)
}
asg := &asg{
AwsRef: AwsRef{Name: spec.Name},
minSize: spec.MinSize,
maxSize: spec.MaxSize,
curSize: int(aws.Int64Value(g.DesiredCapacity)),
AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones),
LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName),
Tags: g.Tags,
}
if g.LaunchTemplate != nil {
asg.LaunchTemplate = buildLaunchTemplateFromSpec(g.LaunchTemplate)
}
if g.MixedInstancesPolicy != nil {
getInstanceTypes := func(overrides []*autoscaling.LaunchTemplateOverrides) []string {
res := []string{}
for _, override := range overrides {
if override.InstanceType != nil {
res = append(res, *override.InstanceType)
}
}
return res
}
getInstanceTypeRequirements := func(overrides []*autoscaling.LaunchTemplateOverrides) *autoscaling.InstanceRequirements {
if len(overrides) == 1 && overrides[0].InstanceRequirements != nil {
return overrides[0].InstanceRequirements
}
return nil
}
asg.MixedInstancesPolicy = &mixedInstancesPolicy{
launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification),
instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
instanceRequirementsOverrides: getInstanceTypeRequirements(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
}
if len(asg.MixedInstancesPolicy.instanceTypesOverrides) == 0 {
instanceRequirements, err := m.getInstanceRequirementsFromMixedInstancesPolicy(asg.MixedInstancesPolicy)
if err != nil {
return nil, fmt.Errorf("unable to retrieve instance requirements from mixed instance policy, err: %v", err)
}
asg.MixedInstancesPolicy.instanceRequirements = instanceRequirements
}
if len(asg.MixedInstancesPolicy.instanceTypesOverrides) != 0 && asg.MixedInstancesPolicy.instanceRequirementsOverrides != nil {
return nil, fmt.Errorf("invalid setup of both instance type and instance requirements overrides configured")
}
}
return asg, nil
}
func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixedInstancesPolicy) (*ec2.InstanceRequirements, error) {
instanceRequirements := &ec2.InstanceRequirements{}
if policy.instanceRequirementsOverrides != nil {
var err error
instanceRequirements, err = m.awsService.getEC2RequirementsFromAutoscaling(policy.instanceRequirementsOverrides)
if err != nil {
return nil, err
}
} else if policy.launchTemplate != nil {
templateData, err := m.awsService.getLaunchTemplateData(policy.launchTemplate.name, policy.launchTemplate.version)
if err != nil {
return nil, err
}
if templateData.InstanceRequirements != nil {
instanceRequirements = templateData.InstanceRequirements
}
}
return instanceRequirements, nil
}
func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsInstanceRef {
providerID := fmt.Sprintf("aws:///%s/%s", aws.StringValue(instance.AvailabilityZone), aws.StringValue(instance.InstanceId))
return AwsInstanceRef{
ProviderID: providerID,
Name: aws.StringValue(instance.InstanceId),
}
}
// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
}
// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache
func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int {
placeholderInstancesCount := 0
for _, instance := range instances {
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
placeholderInstancesCount++
}
}
return placeholderInstancesCount
}