cluster-autoscaler/cloudprovider/aws/aws_manager.go (432 lines of code) (raw):

/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ //go:generate go run ec2_instance_types/gen.go -region $AWS_REGION package aws import ( "errors" "fmt" "math/rand" "regexp" "strconv" "strings" "time" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/eks" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" ) const ( operationWaitTimeout = 5 * time.Second operationPollInterval = 100 * time.Millisecond maxRecordsReturnedByAPI = 100 maxAsgNamesPerDescribe = 100 refreshInterval = 1 * time.Minute autoDiscovererTypeASG = "asg" asgAutoDiscovererKeyTag = "tag" optionsTagsPrefix = "k8s.io/cluster-autoscaler/node-template/autoscaling-options/" labelAwsCSITopologyZone = "topology.ebs.csi.aws.com/zone" ) // AwsManager is handles aws communication and data caching. type AwsManager struct { awsService awsWrapper asgCache *asgCache lastRefresh time.Time instanceTypes map[string]*InstanceType managedNodegroupCache *managedNodegroupCache } type asgTemplate struct { InstanceType *InstanceType Region string Zone string Tags []*autoscaling.TagDescription } // createAwsManagerInternal allows for custom objects to be passed in by tests func createAWSManagerInternal( awsSDKProvider *awsSDKProvider, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, awsService *awsWrapper, instanceTypes map[string]*InstanceType, ) (*AwsManager, error) { klog.Infof("AWS SDK Version: %s", aws.SDKVersion) if awsService == nil { sess := awsSDKProvider.session awsService = &awsWrapper{autoscaling.New(sess), ec2.New(sess), eks.New(sess)} } specs, err := parseASGAutoDiscoverySpecs(discoveryOpts) if err != nil { return nil, err } cache, err := newASGCache(awsService, discoveryOpts.NodeGroupSpecs, specs) if err != nil { return nil, err } mngCache := newManagedNodeGroupCache(awsService) manager := &AwsManager{ awsService: *awsService, asgCache: cache, instanceTypes: instanceTypes, managedNodegroupCache: mngCache, } if err := manager.forceRefresh(); err != nil { return nil, err } return manager, nil } // CreateAwsManager constructs awsManager object. func CreateAwsManager(awsSDKProvider *awsSDKProvider, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, instanceTypes map[string]*InstanceType) (*AwsManager, error) { return createAWSManagerInternal(awsSDKProvider, discoveryOpts, nil, instanceTypes) } // Refresh is called before every main loop and can be used to dynamically update cloud provider state. // In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). func (m *AwsManager) Refresh() error { if m.lastRefresh.Add(refreshInterval).After(time.Now()) { return nil } return m.forceRefresh() } func (m *AwsManager) forceRefresh() error { if err := m.asgCache.regenerate(); err != nil { klog.Errorf("Failed to regenerate ASG cache: %v", err) return err } m.lastRefresh = time.Now() klog.V(2).Infof("Refreshed ASG list, next refresh after %v", m.lastRefresh.Add(refreshInterval)) return nil } // GetAsgForInstance returns AsgConfig of the given Instance func (m *AwsManager) GetAsgForInstance(instance AwsInstanceRef) *asg { return m.asgCache.FindForInstance(instance) } // Cleanup the ASG cache. func (m *AwsManager) Cleanup() { m.asgCache.Cleanup() } func (m *AwsManager) getAsgs() map[AwsRef]*asg { return m.asgCache.Get() } func (m *AwsManager) getAutoscalingOptions(ref AwsRef) map[string]string { return m.asgCache.GetAutoscalingOptions(ref) } // SetAsgSize sets ASG size. func (m *AwsManager) SetAsgSize(asg *asg, size int) error { return m.asgCache.SetAsgSize(asg, size) } // DeleteInstances deletes the given instances. All instances must be controlled by the same ASG. func (m *AwsManager) DeleteInstances(instances []*AwsInstanceRef) error { if err := m.asgCache.DeleteInstances(instances); err != nil { return err } klog.V(2).Infof("DeleteInstances was called: scheduling an ASG list refresh for next main loop evaluation") m.lastRefresh = time.Now().Add(-refreshInterval) return nil } // GetAsgNodes returns Asg nodes. func (m *AwsManager) GetAsgNodes(ref AwsRef) ([]AwsInstanceRef, error) { return m.asgCache.InstancesByAsg(ref) } // GetInstanceStatus returns the status of ASG nodes func (m *AwsManager) GetInstanceStatus(ref AwsInstanceRef) (*string, error) { return m.asgCache.InstanceStatus(ref) } func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) { if len(asg.AvailabilityZones) < 1 { return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name) } az := asg.AvailabilityZones[0] region := az[0 : len(az)-1] if len(asg.AvailabilityZones) > 1 { klog.V(4).Infof("Found multiple availability zones for ASG %q; using %s for %s label\n", asg.Name, az, apiv1.LabelZoneFailureDomain) } instanceTypeName, err := getInstanceTypeForAsg(m.asgCache, asg) if err != nil { return nil, err } if t, ok := m.instanceTypes[instanceTypeName]; ok { return &asgTemplate{ InstanceType: t, Region: region, Zone: az, Tags: asg.Tags, }, nil } return nil, fmt.Errorf("ASG %q uses the unknown EC2 instance type %q", asg.Name, instanceTypeName) } // GetAsgOptions parse options extracted from ASG tags and merges them with provided defaults func (m *AwsManager) GetAsgOptions(asg asg, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions { options := m.getAutoscalingOptions(asg.AwsRef) if options == nil || len(options) == 0 { return &defaults } if stringOpt, found := options[config.DefaultScaleDownUtilizationThresholdKey]; found { if opt, err := strconv.ParseFloat(stringOpt, 64); err != nil { klog.Warningf("failed to convert asg %s %s tag to float: %v", asg.Name, config.DefaultScaleDownUtilizationThresholdKey, err) } else { defaults.ScaleDownUtilizationThreshold = opt } } if stringOpt, found := options[config.DefaultScaleDownGpuUtilizationThresholdKey]; found { if opt, err := strconv.ParseFloat(stringOpt, 64); err != nil { klog.Warningf("failed to convert asg %s %s tag to float: %v", asg.Name, config.DefaultScaleDownGpuUtilizationThresholdKey, err) } else { defaults.ScaleDownGpuUtilizationThreshold = opt } } if stringOpt, found := options[config.DefaultScaleDownUnneededTimeKey]; found { if opt, err := time.ParseDuration(stringOpt); err != nil { klog.Warningf("failed to convert asg %s %s tag to duration: %v", asg.Name, config.DefaultScaleDownUnneededTimeKey, err) } else { defaults.ScaleDownUnneededTime = opt } } if stringOpt, found := options[config.DefaultScaleDownUnreadyTimeKey]; found { if opt, err := time.ParseDuration(stringOpt); err != nil { klog.Warningf("failed to convert asg %s %s tag to duration: %v", asg.Name, config.DefaultScaleDownUnreadyTimeKey, err) } else { defaults.ScaleDownUnreadyTime = opt } } if stringOpt, found := options[config.DefaultIgnoreDaemonSetsUtilizationKey]; found { if opt, err := strconv.ParseBool(stringOpt); err != nil { klog.Warningf("failed to convert asg %s %s tag to bool: %v", asg.Name, config.DefaultIgnoreDaemonSetsUtilizationKey, err) } else { defaults.IgnoreDaemonSetsUtilization = opt } } return &defaults } func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*apiv1.Node, error) { node := apiv1.Node{} nodeName := fmt.Sprintf("%s-asg-%d", asg.Name, rand.Int63()) node.ObjectMeta = metav1.ObjectMeta{ Name: nodeName, SelfLink: fmt.Sprintf("/api/v1/nodes/%s", nodeName), Labels: map[string]string{}, } node.Status = apiv1.NodeStatus{ Capacity: apiv1.ResourceList{}, } // TODO: get a real value. node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(110, resource.DecimalSI) node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(template.InstanceType.VCPU, resource.DecimalSI) node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(template.InstanceType.GPU, resource.DecimalSI) node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(template.InstanceType.MemoryMb*1024*1024, resource.DecimalSI) m.updateCapacityWithRequirementsOverrides(&node.Status.Capacity, asg.MixedInstancesPolicy) resourcesFromTags := extractAllocatableResourcesFromAsg(template.Tags) klog.V(5).Infof("Extracted resources from ASG tags %v", resourcesFromTags) for resourceName, val := range resourcesFromTags { node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val } // TODO: use proper allocatable!! node.Status.Allocatable = node.Status.Capacity // GenericLabels node.Labels = cloudprovider.JoinStringMaps(node.Labels, buildGenericLabels(template, nodeName)) // NodeLabels node.Labels = cloudprovider.JoinStringMaps(node.Labels, extractLabelsFromAsg(template.Tags)) node.Spec.Taints = extractTaintsFromAsg(template.Tags) if nodegroupName, clusterName := node.Labels["nodegroup-name"], node.Labels["cluster-name"]; nodegroupName != "" && clusterName != "" { klog.V(5).Infof("Nodegroup %s in cluster %s is an EKS managed nodegroup.", nodegroupName, clusterName) // Call AWS EKS DescribeNodegroup API, check if keys already exist in Labels and do NOT overwrite mngLabels, err := m.managedNodegroupCache.getManagedNodegroupLabels(nodegroupName, clusterName) if err != nil { klog.Errorf("Failed to get labels from EKS DescribeNodegroup API for nodegroup %s in cluster %s because %s.", nodegroupName, clusterName, err) } else if mngLabels != nil && len(mngLabels) > 0 { node.Labels = joinNodeLabelsChoosingUserValuesOverAPIValues(node.Labels, mngLabels) klog.V(5).Infof("node.Labels : %+v\n", node.Labels) } mngTaints, err := m.managedNodegroupCache.getManagedNodegroupTaints(nodegroupName, clusterName) if err != nil { klog.Errorf("Failed to get taints from EKS DescribeNodegroup API for nodegroup %s in cluster %s because %s.", nodegroupName, clusterName, err) } else if mngTaints != nil && len(mngTaints) > 0 { node.Spec.Taints = append(node.Spec.Taints, mngTaints...) klog.V(5).Infof("node.Spec.Taints : %+v\n", node.Spec.Taints) } mngTags, err := m.managedNodegroupCache.getManagedNodegroupTags(nodegroupName, clusterName) if err != nil { klog.Errorf("Failed to get tags from EKS DescribeNodegroup API for nodegroup %s in cluster %s because %s.", nodegroupName, clusterName, err) } else if mngTags != nil && len(mngTags) > 0 { resourcesFromMngTags := extractAllocatableResourcesFromTags(mngTags) klog.V(5).Infof("Extracted resources from EKS nodegroup tags %v", resourcesFromTags) // ManagedNodeGroup resource-indicating tags override conflicting tags on the ASG if they exist for resourceName, val := range resourcesFromMngTags { node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val } } } node.Status.Conditions = cloudprovider.BuildReadyConditions() return &node, nil } func joinNodeLabelsChoosingUserValuesOverAPIValues(extractedLabels map[string]string, mngLabels map[string]string) map[string]string { result := make(map[string]string) // Copy Generic Labels and Labels from ASG for k, v := range extractedLabels { result[k] = v } // Copy Labels from EKS DescribeNodegroup API call // If the there is a duplicate key, this will overwrite the ASG Tag specified values with the EKS DescribeNodegroup API values // We are overwriting them because it seems like EKS isn't sending the ASG Tags to Kubernetes itself // so scale ups based on the ASG Tag aren't working for k, v := range mngLabels { result[k] = v } return result } func (m *AwsManager) updateCapacityWithRequirementsOverrides(capacity *apiv1.ResourceList, policy *mixedInstancesPolicy) { if policy == nil || len(policy.instanceTypesOverrides) > 0 || policy.instanceRequirements == nil { return } instanceRequirements := policy.instanceRequirements if instanceRequirements.VCpuCount != nil && instanceRequirements.VCpuCount.Min != nil { (*capacity)[apiv1.ResourceCPU] = *resource.NewQuantity(*instanceRequirements.VCpuCount.Min, resource.DecimalSI) } if instanceRequirements.MemoryMiB != nil && instanceRequirements.MemoryMiB.Min != nil { (*capacity)[apiv1.ResourceMemory] = *resource.NewQuantity(*instanceRequirements.MemoryMiB.Min*1024*1024, resource.DecimalSI) } for _, manufacturer := range instanceRequirements.AcceleratorManufacturers { if *manufacturer == autoscaling.AcceleratorManufacturerNvidia { for _, acceleratorType := range instanceRequirements.AcceleratorTypes { if *acceleratorType == autoscaling.AcceleratorTypeGpu { (*capacity)[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(*instanceRequirements.AcceleratorCount.Min, resource.DecimalSI) } } } } } func buildGenericLabels(template *asgTemplate, nodeName string) map[string]string { result := make(map[string]string) result[apiv1.LabelArchStable] = template.InstanceType.Architecture result[apiv1.LabelOSStable] = cloudprovider.DefaultOS result[apiv1.LabelInstanceTypeStable] = template.InstanceType.InstanceType result[apiv1.LabelTopologyRegion] = template.Region result[apiv1.LabelTopologyZone] = template.Zone result[labelAwsCSITopologyZone] = template.Zone result[apiv1.LabelHostname] = nodeName return result } func extractLabelsFromAsg(tags []*autoscaling.TagDescription) map[string]string { result := make(map[string]string) for _, tag := range tags { k := *tag.Key v := *tag.Value splits := strings.Split(k, "k8s.io/cluster-autoscaler/node-template/label/") // Extract EKS labels from ASG if len(splits) <= 1 { splits = strings.Split(k, "eks:") } if len(splits) > 1 { label := splits[1] if label != "" { result[label] = v } } } return result } func extractAutoscalingOptionsFromTags(tags []*autoscaling.TagDescription) map[string]string { options := make(map[string]string) for _, tag := range tags { if !strings.HasPrefix(aws.StringValue(tag.Key), optionsTagsPrefix) { continue } splits := strings.Split(aws.StringValue(tag.Key), optionsTagsPrefix) if len(splits) != 2 || splits[1] == "" { continue } options[splits[1]] = aws.StringValue(tag.Value) } return options } func extractAllocatableResourcesFromAsg(tags []*autoscaling.TagDescription) map[string]*resource.Quantity { result := make(map[string]*resource.Quantity) for _, tag := range tags { k := *tag.Key v := *tag.Value splits := strings.Split(k, "k8s.io/cluster-autoscaler/node-template/resources/") if len(splits) > 1 { label := splits[1] if label != "" { quantity, err := resource.ParseQuantity(v) if err != nil { continue } result[label] = &quantity } } } return result } func extractAllocatableResourcesFromTags(tags map[string]string) map[string]*resource.Quantity { result := make(map[string]*resource.Quantity) for k, v := range tags { splits := strings.Split(k, "k8s.io/cluster-autoscaler/node-template/resources/") if len(splits) > 1 { label := splits[1] if label != "" { quantity, err := resource.ParseQuantity(v) if err != nil { klog.Warningf("Failed to parse resource quanitity '%s' for resource '%s'", v, label) continue } result[label] = &quantity } } } return result } func extractTaintsFromAsg(tags []*autoscaling.TagDescription) []apiv1.Taint { taints := make([]apiv1.Taint, 0) for _, tag := range tags { k := *tag.Key v := *tag.Value // The tag value must be in the format <tag>:NoSchedule r, _ := regexp.Compile("(.*):(?:NoSchedule|NoExecute|PreferNoSchedule)") if r.MatchString(v) { splits := strings.Split(k, "k8s.io/cluster-autoscaler/node-template/taint/") if len(splits) > 1 { values := strings.SplitN(v, ":", 2) if len(values) > 1 { taints = append(taints, apiv1.Taint{ Key: splits[1], Value: values[0], Effect: apiv1.TaintEffect(values[1]), }) } } } } return taints } // An asgAutoDiscoveryConfig specifies how to autodiscover AWS ASGs. type asgAutoDiscoveryConfig struct { // Tags to match on. // Any ASG with all of the provided tag keys will be autoscaled. Tags map[string]string } // ParseASGAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs // parsed into configuration appropriate for ASG autodiscovery. func parseASGAutoDiscoverySpecs(o cloudprovider.NodeGroupDiscoveryOptions) ([]asgAutoDiscoveryConfig, error) { cfgs := make([]asgAutoDiscoveryConfig, len(o.NodeGroupAutoDiscoverySpecs)) var err error for i, spec := range o.NodeGroupAutoDiscoverySpecs { cfgs[i], err = parseASGAutoDiscoverySpec(spec) if err != nil { return nil, err } } return cfgs, nil } func parseASGAutoDiscoverySpec(spec string) (asgAutoDiscoveryConfig, error) { cfg := asgAutoDiscoveryConfig{} tokens := strings.SplitN(spec, ":", 2) if len(tokens) != 2 { return cfg, fmt.Errorf("invalid node group auto discovery spec specified via --node-group-auto-discovery: %s", spec) } discoverer := tokens[0] if discoverer != autoDiscovererTypeASG { return cfg, fmt.Errorf("unsupported discoverer specified: %s", discoverer) } param := tokens[1] kv := strings.SplitN(param, "=", 2) if len(kv) != 2 { return cfg, fmt.Errorf("invalid key=value pair %s", kv) } k, v := kv[0], kv[1] if k != asgAutoDiscovererKeyTag { return cfg, fmt.Errorf("unsupported parameter key \"%s\" is specified for discoverer \"%s\". The only supported key is \"%s\"", k, discoverer, asgAutoDiscovererKeyTag) } if v == "" { return cfg, errors.New("tag value not supplied") } p := strings.Split(v, ",") if len(p) == 0 { return cfg, fmt.Errorf("invalid ASG tag for auto discovery specified: ASG tag must not be empty") } cfg.Tags = make(map[string]string, len(p)) for _, label := range p { lp := strings.SplitN(label, "=", 2) if len(lp) > 1 { cfg.Tags[lp[0]] = lp[1] continue } cfg.Tags[lp[0]] = "" } return cfg, nil }