pkg/providers/instance/instance.go (569 lines of code) (raw):

/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package instance import ( "context" "errors" "fmt" "math" "sort" "strings" awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware" "github.com/awslabs/operatorpkg/aws/middleware" "github.com/awslabs/operatorpkg/serrors" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/utils/resources" sdk "github.com/aws/karpenter-provider-aws/pkg/aws" "github.com/aws/karpenter-provider-aws/pkg/utils" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/ec2" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/samber/lo" "go.uber.org/multierr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/log" karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" "github.com/aws/karpenter-provider-aws/pkg/batcher" "github.com/aws/karpenter-provider-aws/pkg/cache" awserrors "github.com/aws/karpenter-provider-aws/pkg/errors" "github.com/aws/karpenter-provider-aws/pkg/operator/options" "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/subnet" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/scheduling" ) const ( instanceTypeFlexibilityThreshold = 5 // falling back to on-demand without flexibility risks insufficient capacity errors maxInstanceTypes = 60 ) var ( instanceStateFilter = ec2types.Filter{ Name: aws.String("instance-state-name"), Values: []string{ string(ec2types.InstanceStateNamePending), string(ec2types.InstanceStateNameRunning), string(ec2types.InstanceStateNameStopping), string(ec2types.InstanceStateNameStopped), string(ec2types.InstanceStateNameShuttingDown), }, } ) type Provider interface { Create(context.Context, *v1.EC2NodeClass, *karpv1.NodeClaim, map[string]string, []*cloudprovider.InstanceType) (*Instance, error) Get(context.Context, string) (*Instance, error) List(context.Context) ([]*Instance, error) Delete(context.Context, string) error CreateTags(context.Context, string, map[string]string) error } type DefaultProvider struct { region string recorder events.Recorder ec2api sdk.EC2API unavailableOfferings *cache.UnavailableOfferings subnetProvider subnet.Provider launchTemplateProvider launchtemplate.Provider ec2Batcher *batcher.EC2API capacityReservationProvider capacityreservation.Provider } func NewDefaultProvider( ctx context.Context, region string, recorder events.Recorder, ec2api sdk.EC2API, unavailableOfferings *cache.UnavailableOfferings, subnetProvider subnet.Provider, launchTemplateProvider launchtemplate.Provider, capacityReservationProvider capacityreservation.Provider, ) *DefaultProvider { return &DefaultProvider{ region: region, recorder: recorder, ec2api: ec2api, unavailableOfferings: unavailableOfferings, subnetProvider: subnetProvider, launchTemplateProvider: launchTemplateProvider, ec2Batcher: batcher.EC2(ctx, ec2api), capacityReservationProvider: capacityReservationProvider, } } func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1.EC2NodeClass, nodeClaim *karpv1.NodeClaim, tags map[string]string, instanceTypes []*cloudprovider.InstanceType) (*Instance, error) { // We filter out instance type that don't have an available offering that supports the capacity type capacityType := getCapacityType(nodeClaim, instanceTypes) fleetInstance, err := p.launchInstance(ctx, nodeClass, nodeClaim, capacityType, instanceTypes, tags) if awserrors.IsLaunchTemplateNotFound(err) { // retry once if launch template is not found. This allows karpenter to generate a new LT if the // cache was out-of-sync on the first try fleetInstance, err = p.launchInstance(ctx, nodeClass, nodeClaim, capacityType, instanceTypes, tags) } if err != nil { return nil, err } var capacityReservation string if capacityType == karpv1.CapacityTypeReserved { capacityReservation = p.getCapacityReservationIDForInstance( string(fleetInstance.InstanceType), *fleetInstance.LaunchTemplateAndOverrides.Overrides.AvailabilityZone, instanceTypes, ) } return NewInstanceFromFleet( fleetInstance, tags, capacityType, capacityReservation, lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1.ResourceEFA), ), nil } func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) { out, err := p.ec2Batcher.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ InstanceIds: []string{id}, Filters: []ec2types.Filter{instanceStateFilter}, }) if awserrors.IsNotFound(err) { return nil, cloudprovider.NewNodeClaimNotFoundError(err) } if err != nil { return nil, fmt.Errorf("failed to describe ec2 instances, %w", err) } instances, err := instancesFromOutput(ctx, out) if err != nil { return nil, fmt.Errorf("getting instances from output, %w", err) } if len(instances) != 1 { return nil, fmt.Errorf("expected a single instance, %w", err) } return instances[0], nil } func (p *DefaultProvider) List(ctx context.Context) ([]*Instance, error) { var out = &ec2.DescribeInstancesOutput{} paginator := ec2.NewDescribeInstancesPaginator(p.ec2api, &ec2.DescribeInstancesInput{ Filters: []ec2types.Filter{ { Name: aws.String("tag-key"), Values: []string{v1.NodePoolTagKey}, }, { Name: aws.String("tag-key"), Values: []string{v1.NodeClassTagKey}, }, { Name: aws.String(fmt.Sprintf("tag:%s", v1.EKSClusterNameTagKey)), Values: []string{options.FromContext(ctx).ClusterName}, }, instanceStateFilter, }, }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { return nil, fmt.Errorf("describing ec2 instances, %w", err) } out.Reservations = append(out.Reservations, page.Reservations...) } instances, err := instancesFromOutput(ctx, out) return instances, cloudprovider.IgnoreNodeClaimNotFoundError(err) } func (p *DefaultProvider) Delete(ctx context.Context, id string) error { out, err := p.Get(ctx, id) if err != nil { return err } // Check if the instance is already shutting-down to reduce the number of terminate-instance calls we make thereby // reducing our overall QPS. Due to EC2's eventual consistency model, the result of the terminate-instance or // describe-instance call may return a not found error even when the instance is not terminated - // https://docs.aws.amazon.com/ec2/latest/devguide/eventual-consistency.html. In this case, the instance will get // picked up by the garbage collection controller and will be cleaned up eventually. if out.State != ec2types.InstanceStateNameShuttingDown { if _, err := p.ec2Batcher.TerminateInstances(ctx, &ec2.TerminateInstancesInput{ InstanceIds: []string{id}, }); err != nil { return err } } return nil } func (p *DefaultProvider) CreateTags(ctx context.Context, id string, tags map[string]string) error { ec2Tags := lo.MapToSlice(tags, func(key, value string) ec2types.Tag { return ec2types.Tag{Key: aws.String(key), Value: aws.String(value)} }) if _, err := p.ec2api.CreateTags(ctx, &ec2.CreateTagsInput{ Resources: []string{id}, Tags: ec2Tags, }); err != nil { if awserrors.IsNotFound(err) { return cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("tagging instance, %w", err)) } return fmt.Errorf("tagging instance, %w", err) } return nil } func (p *DefaultProvider) launchInstance( ctx context.Context, nodeClass *v1.EC2NodeClass, nodeClaim *karpv1.NodeClaim, capacityType string, instanceTypes []*cloudprovider.InstanceType, tags map[string]string, ) (ec2types.CreateFleetInstance, error) { zonalSubnets, err := p.subnetProvider.ZonalSubnetsForLaunch(ctx, nodeClass, instanceTypes, capacityType) if err != nil { return ec2types.CreateFleetInstance{}, cloudprovider.NewCreateError(fmt.Errorf("getting subnets, %w", err), "SubnetResolutionFailed", "Error getting subnets") } // Get Launch Template Configs, which may differ due to GPU or Architecture requirements launchTemplateConfigs, err := p.getLaunchTemplateConfigs(ctx, nodeClass, nodeClaim, instanceTypes, zonalSubnets, capacityType, tags) if err != nil { reason, message := awserrors.ToReasonMessage(err) return ec2types.CreateFleetInstance{}, cloudprovider.NewCreateError(fmt.Errorf("getting launch template configs, %w", err), reason, fmt.Sprintf("Error getting launch template configs: %s", message)) } if err := p.checkODFallback(nodeClaim, instanceTypes, launchTemplateConfigs); err != nil { log.FromContext(ctx).Error(err, "failed while checking on-demand fallback") } // Create fleet createFleetInput := GetCreateFleetInput(nodeClass, capacityType, tags, launchTemplateConfigs) if capacityType == karpv1.CapacityTypeSpot { createFleetInput.SpotOptions = &ec2types.SpotOptionsRequest{AllocationStrategy: ec2types.SpotAllocationStrategyPriceCapacityOptimized} } else { createFleetInput.OnDemandOptions = &ec2types.OnDemandOptionsRequest{AllocationStrategy: ec2types.FleetOnDemandAllocationStrategyLowestPrice} } createFleetOutput, err := p.ec2Batcher.CreateFleet(ctx, createFleetInput) p.subnetProvider.UpdateInflightIPs(createFleetInput, createFleetOutput, instanceTypes, lo.Values(zonalSubnets), capacityType) if err != nil { reason, message := awserrors.ToReasonMessage(err) if awserrors.IsLaunchTemplateNotFound(err) { for _, lt := range launchTemplateConfigs { p.launchTemplateProvider.InvalidateCache(ctx, aws.ToString(lt.LaunchTemplateSpecification.LaunchTemplateName), aws.ToString(lt.LaunchTemplateSpecification.LaunchTemplateId)) } return ec2types.CreateFleetInstance{}, cloudprovider.NewCreateError(fmt.Errorf("launch templates not found when creating fleet request, %w", err), reason, fmt.Sprintf("Launch templates not found when creating fleet request: %s", message)) } return ec2types.CreateFleetInstance{}, cloudprovider.NewCreateError(fmt.Errorf("creating fleet request, %w", err), reason, fmt.Sprintf("Error creating fleet request: %s", message)) } p.updateUnavailableOfferingsCache(ctx, createFleetOutput.Errors, capacityType, nodeClaim, instanceTypes) if len(createFleetOutput.Instances) == 0 || len(createFleetOutput.Instances[0].InstanceIds) == 0 { requestID, _ := awsmiddleware.GetRequestIDMetadata(createFleetOutput.ResultMetadata) return ec2types.CreateFleetInstance{}, serrors.Wrap( combineFleetErrors(createFleetOutput.Errors), middleware.AWSRequestIDLogKey, requestID, middleware.AWSOperationNameLogKey, "CreateFleet", middleware.AWSServiceNameLogKey, "EC2", middleware.AWSStatusCodeLogKey, 200, middleware.AWSErrorCodeLogKey, "UnfulfillableCapacity", ) } return createFleetOutput.Instances[0], nil } func GetCreateFleetInput(nodeClass *v1.EC2NodeClass, capacityType string, tags map[string]string, launchTemplateConfigs []ec2types.FleetLaunchTemplateConfigRequest) *ec2.CreateFleetInput { return &ec2.CreateFleetInput{ Type: ec2types.FleetTypeInstant, Context: nodeClass.Spec.Context, LaunchTemplateConfigs: launchTemplateConfigs, TargetCapacitySpecification: &ec2types.TargetCapacitySpecificationRequest{ DefaultTargetCapacityType: lo.Ternary( capacityType == karpv1.CapacityTypeReserved, ec2types.DefaultTargetCapacityType(karpv1.CapacityTypeOnDemand), ec2types.DefaultTargetCapacityType(capacityType), ), TotalTargetCapacity: aws.Int32(1), }, TagSpecifications: []ec2types.TagSpecification{ {ResourceType: ec2types.ResourceTypeInstance, Tags: utils.EC2MergeTags(tags)}, {ResourceType: ec2types.ResourceTypeVolume, Tags: utils.EC2MergeTags(tags)}, {ResourceType: ec2types.ResourceTypeFleet, Tags: utils.EC2MergeTags(tags)}, }, } } func (p *DefaultProvider) checkODFallback(nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, launchTemplateConfigs []ec2types.FleetLaunchTemplateConfigRequest) error { // only evaluate for on-demand fallback if the capacity type for the request is OD and both OD and spot are allowed in requirements if getCapacityType(nodeClaim, instanceTypes) != karpv1.CapacityTypeOnDemand || !scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...).Get(karpv1.CapacityTypeLabelKey).Has(karpv1.CapacityTypeSpot) { return nil } // loop through the LT configs for currently considered instance types to get the flexibility count instanceTypeZones := map[string]struct{}{} for _, ltc := range launchTemplateConfigs { for _, override := range ltc.Overrides { instanceTypeZones[string(override.InstanceType)] = struct{}{} } } if len(instanceTypes) < instanceTypeFlexibilityThreshold { return fmt.Errorf("at least %d instance types are recommended when flexible to spot but requesting on-demand, "+ "the current provisioning request only has %d instance type options", instanceTypeFlexibilityThreshold, len(instanceTypes)) } return nil } func (p *DefaultProvider) getLaunchTemplateConfigs( ctx context.Context, nodeClass *v1.EC2NodeClass, nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*subnet.Subnet, capacityType string, tags map[string]string, ) ([]ec2types.FleetLaunchTemplateConfigRequest, error) { var launchTemplateConfigs []ec2types.FleetLaunchTemplateConfigRequest launchTemplates, err := p.launchTemplateProvider.EnsureAll(ctx, nodeClass, nodeClaim, instanceTypes, capacityType, tags) if err != nil { return nil, fmt.Errorf("getting launch templates, %w", err) } requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) requirements[karpv1.CapacityTypeLabelKey] = scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, capacityType) for _, launchTemplate := range launchTemplates { launchTemplateConfig := ec2types.FleetLaunchTemplateConfigRequest{ Overrides: p.getOverrides(launchTemplate.InstanceTypes, zonalSubnets, requirements, launchTemplate.ImageID, launchTemplate.CapacityReservationID), LaunchTemplateSpecification: &ec2types.FleetLaunchTemplateSpecificationRequest{ LaunchTemplateName: aws.String(launchTemplate.Name), Version: aws.String("$Latest"), }, } if len(launchTemplateConfig.Overrides) > 0 { launchTemplateConfigs = append(launchTemplateConfigs, launchTemplateConfig) } } if len(launchTemplateConfigs) == 0 { return nil, fmt.Errorf("no capacity offerings are currently available given the constraints") } return launchTemplateConfigs, nil } // getOverrides creates and returns launch template overrides for the cross product of InstanceTypes and subnets (with subnets being constrained by // zones and the offerings in InstanceTypes) func (p *DefaultProvider) getOverrides( instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*subnet.Subnet, reqs scheduling.Requirements, image, capacityReservationID string, ) []ec2types.FleetLaunchTemplateOverridesRequest { // Unwrap all the offerings to a flat slice that includes a pointer // to the parent instance type name type offeringWithParentName struct { *cloudprovider.Offering parentInstanceTypeName ec2types.InstanceType } var filteredOfferings []offeringWithParentName for _, it := range instanceTypes { ofs := it.Offerings.Available().Compatible(reqs) // If we are generating a launch template for a specific capacity reservation, we only want to include the offering // for that capacity reservation when generating overrides. if capacityReservationID != "" { ofs = ofs.Compatible(scheduling.NewRequirements(scheduling.NewRequirement( cloudprovider.ReservationIDLabel, corev1.NodeSelectorOpIn, capacityReservationID, ))) } for _, o := range ofs { filteredOfferings = append(filteredOfferings, offeringWithParentName{ Offering: o, parentInstanceTypeName: ec2types.InstanceType(it.Name), }) } } var overrides []ec2types.FleetLaunchTemplateOverridesRequest for _, offering := range filteredOfferings { subnet, ok := zonalSubnets[offering.Zone()] if !ok { continue } overrides = append(overrides, ec2types.FleetLaunchTemplateOverridesRequest{ InstanceType: offering.parentInstanceTypeName, SubnetId: lo.ToPtr(subnet.ID), ImageId: lo.ToPtr(image), // This is technically redundant, but is useful if we have to parse insufficient capacity errors from // CreateFleet so that we can figure out the zone rather than additional API calls to look up the subnet AvailabilityZone: lo.ToPtr(subnet.Zone), }) } return overrides } func (p *DefaultProvider) updateUnavailableOfferingsCache( ctx context.Context, errs []ec2types.CreateFleetError, capacityType string, nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, ) { if capacityType != karpv1.CapacityTypeReserved { for _, err := range errs { if awserrors.IsUnfulfillableCapacity(err) { p.unavailableOfferings.MarkUnavailableForFleetErr(ctx, err, capacityType) } if awserrors.IsServiceLinkedRoleCreationNotPermitted(err) { p.unavailableOfferings.MarkCapacityTypeUnavailable(karpv1.CapacityTypeSpot) p.recorder.Publish(SpotServiceLinkedRoleCreationFailure(nodeClaim)) } } return } reservationIDs := make([]string, 0, len(errs)) for i := range errs { id := p.getCapacityReservationIDForInstance( string(errs[i].LaunchTemplateAndOverrides.Overrides.InstanceType), lo.FromPtr(errs[i].LaunchTemplateAndOverrides.Overrides.AvailabilityZone), instanceTypes, ) reservationIDs = append(reservationIDs, id) log.FromContext(ctx).WithValues( "reason", lo.FromPtr(errs[i].ErrorCode), "instance-type", errs[i].LaunchTemplateAndOverrides.Overrides.InstanceType, "zone", lo.FromPtr(errs[i].LaunchTemplateAndOverrides.Overrides.AvailabilityZone), "capacity-reservation-id", id, ).V(1).Info("marking capacity reservation unavailable") } p.capacityReservationProvider.MarkUnavailable(reservationIDs...) } func (p *DefaultProvider) getCapacityReservationIDForInstance(instance, zone string, instanceTypes []*cloudprovider.InstanceType) string { for _, it := range instanceTypes { if it.Name != instance { continue } for _, o := range it.Offerings { if o.CapacityType() != karpv1.CapacityTypeReserved || o.Zone() != zone { continue } return o.ReservationID() } } // note: this is an invariant that the caller must enforce, should not occur at runtime panic("reservation ID doesn't exist for reserved launch") } // getCapacityType selects the capacity type based on the flexibility of the NodeClaim and the available offerings. // Prioritization is as follows: reserved, spot, on-demand. func getCapacityType(nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) string { for _, capacityType := range []string{karpv1.CapacityTypeReserved, karpv1.CapacityTypeSpot} { requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) if !requirements.Get(karpv1.CapacityTypeLabelKey).Has(capacityType) { continue } requirements[karpv1.CapacityTypeLabelKey] = scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, capacityType) for _, it := range instanceTypes { if len(it.Offerings.Available().Compatible(requirements)) != 0 { return capacityType } } } return karpv1.CapacityTypeOnDemand } func FilterRejectInstanceTypes(nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) ([]*cloudprovider.InstanceType, []*cloudprovider.InstanceType, error) { var err error schedulingRequirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) // We filter out non-reserved instances regardless of the min-values settings, since if the launch is eligible for // reserved instances that's all we'll include in our fleet request. if reqs := schedulingRequirements; reqs.Get(karpv1.CapacityTypeLabelKey).Has(karpv1.CapacityTypeReserved) { filtered, rejected := filterRejectReservedInstanceTypes(reqs, instanceTypes) if _, err = cloudprovider.InstanceTypes(filtered).SatisfiesMinValues(schedulingRequirements); err != nil { return nil, nil, cloudprovider.NewCreateError(fmt.Errorf("failed to construct CreateFleet request while respecting minValues requirements, %w", err), "InstanceTypeFilteringFailed", "Failed to filter instance types while respecting minValues") } if len(filtered) > 0 { // TODO: Support FilterReject on Truncating instance types filtered, err = cloudprovider.InstanceTypes(filtered).Truncate(schedulingRequirements, maxInstanceTypes) if err != nil { return nil, nil, cloudprovider.NewCreateError(fmt.Errorf("truncating instance types, %w", err), "InstanceTypeFilteringFailed", "Error truncating instance types based on the passed-in requirements") } return filtered, rejected, nil } } // Only filter the instances if there are no minValues in the requirement. var rejected []*cloudprovider.InstanceType filtered := instanceTypes if !schedulingRequirements.HasMinValues() { var r []*cloudprovider.InstanceType filtered, r = filterRejectExoticInstanceTypes(filtered) rejected = append(rejected, r...) // If we could potentially launch either a spot or on-demand node, we want to filter out the spot instance types that // are more expensive than the cheapest on-demand type. if isMixedCapacityLaunch(nodeClaim, filtered) { filtered, r = filterRejectUnwantedSpot(filtered) rejected = append(rejected, r...) } } // TODO: Support FilterReject on Truncating instance types filtered, err = cloudprovider.InstanceTypes(filtered).Truncate(schedulingRequirements, maxInstanceTypes) if err != nil { return nil, nil, cloudprovider.NewCreateError(fmt.Errorf("truncating instance types, %w", err), "InstanceTypeFilteringFailed", "Error truncating instance types based on the passed-in requirements") } return filtered, rejected, nil } // filterReservedInstanceTypes is used to filter the provided set of instance types to only include those with // available reserved offerings if the nodeclaim is compatible. If there are no available reserved offerings, no // filtering is applied. func filterRejectReservedInstanceTypes(nodeClaimRequirements scheduling.Requirements, instanceTypes []*cloudprovider.InstanceType) ([]*cloudprovider.InstanceType, []*cloudprovider.InstanceType) { nodeClaimRequirements[karpv1.CapacityTypeLabelKey] = scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, karpv1.CapacityTypeReserved) var reservedInstanceTypes []*cloudprovider.InstanceType var nonReservedInstanceTypes []*cloudprovider.InstanceType for _, it := range instanceTypes { // We only want to include a single offering per pool (instance type / AZ combo). This is due to a limitation in the // CreateFleet API, which limits calls to specifying a single override per pool. We'll choose to launch into the pool // with the most capacity. zonalOfferings := map[string]*cloudprovider.Offering{} for _, o := range it.Offerings.Available().Compatible(nodeClaimRequirements) { if current, ok := zonalOfferings[o.Zone()]; !ok || o.ReservationCapacity > current.ReservationCapacity { zonalOfferings[o.Zone()] = o } } if len(zonalOfferings) == 0 { nonReservedInstanceTypes = append(nonReservedInstanceTypes, it) continue } // WARNING: It is only safe to mutate the slice containing the offerings, not the offerings themselves. The individual // offerings are cached, but not the slice storing them. This helps keep the launch path simple, but changes to the // caching strategy employed by the InstanceType provider could result in unexpected behavior. it.Offerings = lo.Values(zonalOfferings) reservedInstanceTypes = append(reservedInstanceTypes, it) } return reservedInstanceTypes, nonReservedInstanceTypes } // isMixedCapacityLaunch returns true if nodepools and available offerings could potentially allow either a spot or // and on-demand node to launch func isMixedCapacityLaunch(nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) bool { requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) // requirements must allow both if !requirements.Get(karpv1.CapacityTypeLabelKey).Has(karpv1.CapacityTypeSpot) || !requirements.Get(karpv1.CapacityTypeLabelKey).Has(karpv1.CapacityTypeOnDemand) { return false } hasSpotOfferings := false hasODOffering := false if requirements.Get(karpv1.CapacityTypeLabelKey).Has(karpv1.CapacityTypeSpot) { for _, instanceType := range instanceTypes { for _, offering := range instanceType.Offerings.Available() { if requirements.Compatible(offering.Requirements, scheduling.AllowUndefinedWellKnownLabels) != nil { continue } if offering.Requirements.Get(karpv1.CapacityTypeLabelKey).Any() == karpv1.CapacityTypeSpot { hasSpotOfferings = true } else { hasODOffering = true } } } } return hasSpotOfferings && hasODOffering } // filterUnwantedSpot is used to filter out spot types that are more expensive than the cheapest on-demand type that we // could launch during mixed capacity-type launches func filterRejectUnwantedSpot(instanceTypes []*cloudprovider.InstanceType) ([]*cloudprovider.InstanceType, []*cloudprovider.InstanceType) { cheapestOnDemand := math.MaxFloat64 // first, find the price of our cheapest available on-demand instance type that could support this node for _, it := range instanceTypes { for _, o := range it.Offerings.Available() { if o.Requirements.Get(karpv1.CapacityTypeLabelKey).Any() == karpv1.CapacityTypeOnDemand && o.Price < cheapestOnDemand { cheapestOnDemand = o.Price } } } // Filter out any types where the cheapest offering, which should be spot, is more expensive than the cheapest // on-demand instance type that would have worked. This prevents us from getting a larger more-expensive spot // instance type compared to the cheapest sufficiently large on-demand instance type return lo.FilterReject(instanceTypes, func(item *cloudprovider.InstanceType, index int) bool { available := item.Offerings.Available() if len(available) == 0 { return false } return available.Cheapest().Price <= cheapestOnDemand }) } // filterRejectExoticInstanceTypes is used to eliminate less desirable instance types (like GPUs) from the list of possible instance types when // a set of more appropriate instance types would work. If a set of more desirable instance types is not found, then the original slice // of instance types are returned. func filterRejectExoticInstanceTypes(instanceTypes []*cloudprovider.InstanceType) ([]*cloudprovider.InstanceType, []*cloudprovider.InstanceType) { var genericInstanceTypes []*cloudprovider.InstanceType var exoticInstanceTypes []*cloudprovider.InstanceType for _, it := range instanceTypes { // deprioritize metal even if our opinionated filter isn't applied due to something like an instance family // requirement if _, ok := lo.Find(it.Requirements.Get(v1.LabelInstanceSize).Values(), func(size string) bool { return strings.Contains(size, "metal") }); ok { exoticInstanceTypes = append(exoticInstanceTypes, it) continue } if !resources.IsZero(it.Capacity[v1.ResourceAWSNeuron]) || !resources.IsZero(it.Capacity[v1.ResourceAWSNeuronCore]) || !resources.IsZero(it.Capacity[v1.ResourceAMDGPU]) || !resources.IsZero(it.Capacity[v1.ResourceNVIDIAGPU]) || !resources.IsZero(it.Capacity[v1.ResourceHabanaGaudi]) { exoticInstanceTypes = append(exoticInstanceTypes, it) continue } genericInstanceTypes = append(genericInstanceTypes, it) } // if we got some subset of instance types, then prefer to use those if len(genericInstanceTypes) != 0 { return genericInstanceTypes, exoticInstanceTypes } return instanceTypes, nil } func instancesFromOutput(ctx context.Context, out *ec2.DescribeInstancesOutput) ([]*Instance, error) { if len(out.Reservations) == 0 { return nil, cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("instance not found")) } instances := lo.Flatten(lo.Map(out.Reservations, func(r ec2types.Reservation, _ int) []ec2types.Instance { return r.Instances })) if len(instances) == 0 { return nil, cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("instance not found")) } // Get a consistent ordering for instances sort.Slice(instances, func(i, j int) bool { return aws.ToString(instances[i].InstanceId) < aws.ToString(instances[j].InstanceId) }) return lo.Map(instances, func(i ec2types.Instance, _ int) *Instance { return NewInstance(ctx, i) }), nil } func combineFleetErrors(fleetErrs []ec2types.CreateFleetError) (errs error) { unique := sets.NewString() for _, err := range fleetErrs { unique.Insert(fmt.Sprintf("%s: %s", aws.ToString(err.ErrorCode), aws.ToString(err.ErrorMessage))) } for errorCode := range unique { errs = multierr.Append(errs, errors.New(errorCode)) } // If all the Fleet errors are ICE errors then we should wrap the combined error in the generic ICE error iceErrorCount := lo.CountBy(fleetErrs, func(err ec2types.CreateFleetError) bool { return awserrors.IsUnfulfillableCapacity(err) || awserrors.IsServiceLinkedRoleCreationNotPermitted(err) }) if iceErrorCount == len(fleetErrs) { return cloudprovider.NewInsufficientCapacityError(fmt.Errorf("with fleet error(s), %w", errs)) } reason, message := awserrors.ToReasonMessage(errs) return cloudprovider.NewCreateError(errs, reason, message) }