pkg/providers/instancetype/instancetype.go (234 lines of code) (raw):
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package instancetype
import (
"context"
"fmt"
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/karpenter/pkg/scheduling"
awscache "github.com/aws/karpenter-provider-aws/pkg/cache"
"github.com/aws/karpenter-provider-aws/pkg/providers/amifamily"
"github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation"
"github.com/aws/karpenter-provider-aws/pkg/providers/instancetype/offering"
"github.com/aws/karpenter-provider-aws/pkg/providers/pricing"
"github.com/mitchellh/hashstructure/v2"
"github.com/patrickmn/go-cache"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/util/sets"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
sdk "github.com/aws/karpenter-provider-aws/pkg/aws"
"github.com/aws/karpenter-provider-aws/pkg/providers/subnet"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
)
type Provider interface {
List(context.Context, *v1.EC2NodeClass) ([]*cloudprovider.InstanceType, error)
}
type DefaultProvider struct {
ec2api sdk.EC2API
subnetProvider subnet.Provider
instanceTypesResolver Resolver
// Values stored *before* considering insufficient capacity errors from the unavailableOfferings cache.
// Fully initialized Instance Types are also cached based on the set of all instance types, zones, unavailableOfferings cache,
// EC2NodeClass, and kubelet configuration from the NodePool
muInstanceTypesInfo sync.RWMutex
// TODO @engedaam: Look into only storing the needed EC2InstanceTypeInfo
instanceTypesInfo []ec2types.InstanceTypeInfo
muInstanceTypesOfferings sync.RWMutex
instanceTypesOfferings map[string]sets.Set[string]
allZones sets.Set[string]
instanceTypesCache *cache.Cache
discoveredCapacityCache *cache.Cache
cm *pretty.ChangeMonitor
// instanceTypesSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types
instanceTypesSeqNum uint64
// instanceTypesOfferingsSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types
instanceTypesOfferingsSeqNum uint64
offeringProvider *offering.DefaultProvider
}
func NewDefaultProvider(
instanceTypesCache *cache.Cache,
offeringCache *cache.Cache,
discoveredCapacityCache *cache.Cache,
ec2api sdk.EC2API,
subnetProvider subnet.Provider,
pricingProvider pricing.Provider,
capacityReservationProvider capacityreservation.Provider,
unavailableOfferingsCache *awscache.UnavailableOfferings,
instanceTypesResolver Resolver,
) *DefaultProvider {
return &DefaultProvider{
ec2api: ec2api,
subnetProvider: subnetProvider,
instanceTypesInfo: []ec2types.InstanceTypeInfo{},
instanceTypesOfferings: map[string]sets.Set[string]{},
instanceTypesResolver: instanceTypesResolver,
instanceTypesCache: instanceTypesCache,
discoveredCapacityCache: discoveredCapacityCache,
cm: pretty.NewChangeMonitor(),
instanceTypesSeqNum: 0,
offeringProvider: offering.NewDefaultProvider(
pricingProvider,
capacityReservationProvider,
unavailableOfferingsCache,
offeringCache,
),
}
}
//nolint:gocyclo
func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) {
p.muInstanceTypesInfo.RLock()
p.muInstanceTypesOfferings.RLock()
defer p.muInstanceTypesInfo.RUnlock()
defer p.muInstanceTypesOfferings.RUnlock()
if len(p.instanceTypesInfo) == 0 {
return nil, fmt.Errorf("no instance types found")
}
if len(p.instanceTypesOfferings) == 0 {
return nil, fmt.Errorf("no instance types offerings found")
}
if len(nodeClass.Status.Subnets) == 0 {
return nil, fmt.Errorf("no subnets found")
}
subnetZones := sets.New(lo.Map(nodeClass.Status.Subnets, func(s v1.Subnet, _ int) string {
return lo.FromPtr(&s.Zone)
})...)
// Compute fully initialized instance types hash key
subnetZonesHash, _ := hashstructure.Hash(subnetZones, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
// Compute hash key against node class AMIs (used to force cache rebuild when AMIs change)
amiHash, _ := hashstructure.Hash(nodeClass.Status.AMIs, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
key := fmt.Sprintf("%d-%d-%016x-%016x-%016x",
p.instanceTypesSeqNum,
p.instanceTypesOfferingsSeqNum,
amiHash,
subnetZonesHash,
p.instanceTypesResolver.CacheKey(nodeClass),
)
var instanceTypes []*cloudprovider.InstanceType
if item, ok := p.instanceTypesCache.Get(key); ok {
// Ensure what's returned from this function is a shallow-copy of the slice (not a deep-copy of the data itself)
// so that modifications to the ordering of the data don't affect the original
instanceTypes = item.([]*cloudprovider.InstanceType)
} else {
instanceTypes = p.resolveInstanceTypes(ctx, nodeClass, amiHash)
p.instanceTypesCache.SetDefault(key, instanceTypes)
}
// Offerings aren't cached along with the rest of the instance type info because reserved offerings need to have up to
// date capacity information. Rather than incurring a cache miss each time an instance is launched into a reserved
// offering (or terminated), offerings are injected to the cached instance types on each call. Note that on-demand and
// spot offerings are still cached - only reserved offerings are generated each time.
return p.offeringProvider.InjectOfferings(
ctx,
instanceTypes,
nodeClass,
p.allZones,
), nil
}
func (p *DefaultProvider) resolveInstanceTypes(
ctx context.Context,
nodeClass *v1.EC2NodeClass,
amiHash uint64,
) []*cloudprovider.InstanceType {
zonesToZoneIDs := lo.SliceToMap(nodeClass.Status.Subnets, func(s v1.Subnet) (string, string) {
return s.Zone, s.ZoneID
})
return lo.FilterMap(p.instanceTypesInfo, func(info ec2types.InstanceTypeInfo, _ int) (*cloudprovider.InstanceType, bool) {
it := p.instanceTypesResolver.Resolve(ctx, info, p.instanceTypesOfferings[string(info.InstanceType)].UnsortedList(), zonesToZoneIDs, nodeClass)
if it == nil {
return nil, false
}
if cached, ok := p.discoveredCapacityCache.Get(fmt.Sprintf("%s-%016x", it.Name, amiHash)); ok {
it.Capacity[corev1.ResourceMemory] = cached.(resource.Quantity)
}
InstanceTypeVCPU.Set(float64(lo.FromPtr(info.VCpuInfo.DefaultVCpus)), map[string]string{
instanceTypeLabel: string(info.InstanceType),
})
InstanceTypeMemory.Set(float64(lo.FromPtr(info.MemoryInfo.SizeInMiB)*1024*1024), map[string]string{
instanceTypeLabel: string(info.InstanceType),
})
return it, true
})
}
func (p *DefaultProvider) UpdateInstanceTypes(ctx context.Context) error {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple
// calls to EC2 when we could have just made one call.
p.muInstanceTypesInfo.Lock()
defer p.muInstanceTypesInfo.Unlock()
var instanceTypes []ec2types.InstanceTypeInfo
paginator := ec2.NewDescribeInstanceTypesPaginator(p.ec2api, &ec2.DescribeInstanceTypesInput{
Filters: []ec2types.Filter{
{
Name: aws.String("supported-virtualization-type"),
Values: []string{"hvm"},
},
{
Name: aws.String("processor-info.supported-architecture"),
Values: []string{"x86_64", "arm64"},
},
},
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return fmt.Errorf("describing instance types, %w", err)
}
instanceTypes = append(instanceTypes, page.InstanceTypes...)
}
if p.cm.HasChanged("instance-types", instanceTypes) {
// Only update instanceTypesSeqNun with the instance types have been changed
// This is to not create new keys with duplicate instance types option
atomic.AddUint64(&p.instanceTypesSeqNum, 1)
log.FromContext(ctx).WithValues("count", len(instanceTypes)).V(1).Info("discovered instance types")
}
p.instanceTypesInfo = instanceTypes
return nil
}
func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to GetInstanceTypes do not result in cache misses and multiple
// calls to EC2 when we could have just made one call. This lock is here because multiple callers to EC2 result
// in A LOT of extra memory generated from the response for simultaneous callers.
// TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache
p.muInstanceTypesOfferings.Lock()
defer p.muInstanceTypesOfferings.Unlock()
// Get offerings from EC2
instanceTypeOfferings := map[string]sets.Set[string]{}
paginator := ec2.NewDescribeInstanceTypeOfferingsPaginator(p.ec2api, &ec2.DescribeInstanceTypeOfferingsInput{
LocationType: ec2types.LocationTypeAvailabilityZone,
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return fmt.Errorf("describing instance type zone offerings, %w", err)
}
for _, offering := range page.InstanceTypeOfferings {
if _, ok := instanceTypeOfferings[string(offering.InstanceType)]; !ok {
instanceTypeOfferings[string(offering.InstanceType)] = sets.New[string]()
}
instanceTypeOfferings[string(offering.InstanceType)].Insert(lo.FromPtr(offering.Location))
}
}
if p.cm.HasChanged("instance-type-offering", instanceTypeOfferings) {
// Only update instanceTypesSeqNun with the instance type offerings have been changed
// This is to not create new keys with duplicate instance type offerings option
atomic.AddUint64(&p.instanceTypesOfferingsSeqNum, 1)
log.FromContext(ctx).WithValues("instance-type-count", len(instanceTypeOfferings)).V(1).Info("discovered offerings for instance types")
}
p.instanceTypesOfferings = instanceTypeOfferings
allZones := sets.New[string]()
for _, offeringZones := range instanceTypeOfferings {
for zone := range offeringZones {
allZones.Insert(zone)
}
}
if p.cm.HasChanged("zones", allZones) {
log.FromContext(ctx).WithValues("zones", allZones.UnsortedList()).V(1).Info("discovered zones")
}
p.allZones = allZones
return nil
}
func (p *DefaultProvider) UpdateInstanceTypeCapacityFromNode(ctx context.Context, node *corev1.Node, nodeClaim *karpv1.NodeClaim, nodeClass *v1.EC2NodeClass) error {
// Get mappings for most recent AMIs
instanceTypeName := node.Labels[corev1.LabelInstanceTypeStable]
amiMap := amifamily.MapToInstanceTypes([]*cloudprovider.InstanceType{{
Name: instanceTypeName,
Requirements: scheduling.NewLabelRequirements(node.Labels),
}}, nodeClass.Status.AMIs)
// Ensure NodeClaim AMI is current
if !lo.ContainsBy(amiMap[nodeClaim.Status.ImageID], func(i *cloudprovider.InstanceType) bool {
return i.Name == instanceTypeName
}) {
return nil
}
amiHash, _ := hashstructure.Hash(nodeClass.Status.AMIs, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
key := fmt.Sprintf("%s-%016x", instanceTypeName, amiHash)
// Update cache if non-existent or actual capacity is less than or equal to cached value
actualCapacity := node.Status.Capacity.Memory()
if cachedCapacity, ok := p.discoveredCapacityCache.Get(key); !ok || actualCapacity.Cmp(cachedCapacity.(resource.Quantity)) < 1 {
log.FromContext(ctx).WithValues("memory-capacity", actualCapacity, "instance-type", instanceTypeName).V(1).Info("updating discovered capacity cache")
p.discoveredCapacityCache.SetDefault(key, *actualCapacity)
}
return nil
}
func (p *DefaultProvider) Reset() {
p.instanceTypesInfo = []ec2types.InstanceTypeInfo{}
p.instanceTypesOfferings = map[string]sets.Set[string]{}
p.instanceTypesCache.Flush()
p.discoveredCapacityCache.Flush()
}