pkg/providers/pricing/pricing.go (346 lines of code) (raw):

/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package pricing import ( "context" "encoding/json" "fmt" "net/http" "strconv" "strings" "sync" "time" "sigs.k8s.io/controller-runtime/pkg/log" sdk "github.com/aws/karpenter-provider-aws/pkg/aws" "github.com/aws/karpenter-provider-aws/pkg/operator/options" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/ec2" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/aws/aws-sdk-go-v2/service/pricing" pricingtypes "github.com/aws/aws-sdk-go-v2/service/pricing/types" "github.com/samber/lo" "go.uber.org/multierr" "sigs.k8s.io/karpenter/pkg/utils/pretty" ) var initialOnDemandPrices = lo.Assign(InitialOnDemandPricesAWS, InitialOnDemandPricesUSGov, InitialOnDemandPricesCN) type Provider interface { LivenessProbe(*http.Request) error InstanceTypes() []ec2types.InstanceType OnDemandPrice(ec2types.InstanceType) (float64, bool) SpotPrice(ec2types.InstanceType, string) (float64, bool) UpdateOnDemandPricing(context.Context) error UpdateSpotPricing(context.Context) error } // DefaultProvider provides actual pricing data to the AWS cloud provider to allow it to make more informed decisions // regarding which instances to launch. This is initialized at startup with a periodically updated static price list to // support running in locations where pricing data is unavailable. In those cases the static pricing data provides a // relative ordering that is still more accurate than our previous pricing model. In the event that a pricing update // fails, the previous pricing information is retained and used which may be the static initial pricing data if pricing // updates never succeed. type DefaultProvider struct { ec2 sdk.EC2API pricing sdk.PricingAPI region string cm *pretty.ChangeMonitor muOnDemand sync.RWMutex onDemandPrices map[ec2types.InstanceType]float64 muSpot sync.RWMutex spotPrices map[ec2types.InstanceType]zonal spotPricingUpdated bool } // zonalPricing is used to capture the per-zone price // for spot data as well as the default price // based on on-demand price when the provisioningController first // comes up type zonal struct { defaultPrice float64 // Used until we get the spot pricing data prices map[string]float64 } func combineZonalPricing(pricingData ...zonal) zonal { z := newZonalPricing(0) for _, elem := range pricingData { if elem.defaultPrice != 0 { z.defaultPrice = elem.defaultPrice } for zone, price := range elem.prices { z.prices[zone] = price } } return z } func newZonalPricing(defaultPrice float64) zonal { z := zonal{ prices: map[string]float64{}, } z.defaultPrice = defaultPrice return z } // NewPricingAPI returns a pricing API configured based on a particular region func NewAPI(cfg aws.Config) *pricing.Client { // pricing API doesn't have an endpoint in all regions pricingAPIRegion := "us-east-1" if strings.HasPrefix(cfg.Region, "ap-") { pricingAPIRegion = "ap-south-1" } else if strings.HasPrefix(cfg.Region, "cn-") { pricingAPIRegion = "cn-northwest-1" } else if strings.HasPrefix(cfg.Region, "eu-") { pricingAPIRegion = "eu-central-1" } //create pricing config using pricing endpoint pricingCfg := cfg.Copy() pricingCfg.Region = pricingAPIRegion return pricing.NewFromConfig(pricingCfg) } func NewDefaultProvider(_ context.Context, pricing sdk.PricingAPI, ec2Api sdk.EC2API, region string) *DefaultProvider { p := &DefaultProvider{ region: region, ec2: ec2Api, pricing: pricing, cm: pretty.NewChangeMonitor(), } // sets the pricing data from the static default state for the provider p.Reset() return p } // InstanceTypes returns the list of all instance types for which either a spot or on-demand price is known. func (p *DefaultProvider) InstanceTypes() []ec2types.InstanceType { p.muOnDemand.RLock() p.muSpot.RLock() defer p.muOnDemand.RUnlock() defer p.muSpot.RUnlock() return lo.Union(lo.Keys(p.onDemandPrices), lo.Keys(p.spotPrices)) } // OnDemandPrice returns the last known on-demand price for a given instance type, returning an error if there is no // known on-demand pricing for the instance type. func (p *DefaultProvider) OnDemandPrice(instanceType ec2types.InstanceType) (float64, bool) { p.muOnDemand.RLock() defer p.muOnDemand.RUnlock() price, ok := p.onDemandPrices[instanceType] if !ok { return 0.0, false } return price, true } // SpotPrice returns the last known spot price for a given instance type and zone, returning an error // if there is no known spot pricing for that instance type or zone func (p *DefaultProvider) SpotPrice(instanceType ec2types.InstanceType, zone string) (float64, bool) { p.muSpot.RLock() defer p.muSpot.RUnlock() if val, ok := p.spotPrices[instanceType]; ok { if !p.spotPricingUpdated { return val.defaultPrice, true } if price, ok := p.spotPrices[instanceType].prices[zone]; ok { return price, true } return 0.0, false } return 0.0, false } func (p *DefaultProvider) UpdateOnDemandPricing(ctx context.Context) error { // standard on-demand instances var wg sync.WaitGroup var onDemandPrices, onDemandMetalPrices map[ec2types.InstanceType]float64 var onDemandErr, onDemandMetalErr error // if we are in isolated vpc, skip updating on demand pricing // as pricing api may not be available if options.FromContext(ctx).IsolatedVPC { if p.cm.HasChanged("on-demand-prices", nil) { log.FromContext(ctx).V(1).Info("running in an isolated VPC, on-demand pricing information will not be updated") } return nil } p.muOnDemand.Lock() defer p.muOnDemand.Unlock() wg.Add(1) go func() { defer wg.Done() onDemandPrices, onDemandErr = p.fetchOnDemandPricing(ctx, pricingtypes.Filter{ Field: aws.String("tenancy"), Type: "TERM_MATCH", Value: aws.String("Shared"), }, pricingtypes.Filter{ Field: aws.String("productFamily"), Type: "TERM_MATCH", Value: aws.String("Compute Instance"), }) }() // bare metal on-demand prices wg.Add(1) go func() { defer wg.Done() onDemandMetalPrices, onDemandMetalErr = p.fetchOnDemandPricing(ctx, pricingtypes.Filter{ Field: aws.String("tenancy"), Type: "TERM_MATCH", Value: aws.String("Dedicated"), }, pricingtypes.Filter{ Field: aws.String("productFamily"), Type: "TERM_MATCH", Value: aws.String("Compute Instance (bare metal)"), }) }() wg.Wait() err := multierr.Append(onDemandErr, onDemandMetalErr) if err != nil { return fmt.Errorf("retreiving on-demand pricing data, %w", err) } if len(onDemandPrices) == 0 || len(onDemandMetalPrices) == 0 { return fmt.Errorf("no on-demand pricing found") } // Maintain previously retrieved pricing data p.onDemandPrices = lo.Assign(p.onDemandPrices, onDemandPrices, onDemandMetalPrices) if p.cm.HasChanged("on-demand-prices", p.onDemandPrices) { log.FromContext(ctx).WithValues("instance-type-count", len(p.onDemandPrices)).V(1).Info("updated on-demand pricing") } return nil } func (p *DefaultProvider) fetchOnDemandPricing(ctx context.Context, additionalFilters ...pricingtypes.Filter) (map[ec2types.InstanceType]float64, error) { prices := map[ec2types.InstanceType]float64{} filters := append([]pricingtypes.Filter{ { Field: aws.String("regionCode"), Type: "TERM_MATCH", Value: aws.String(p.region), }, { Field: aws.String("serviceCode"), Type: "TERM_MATCH", Value: aws.String("AmazonEC2"), }, { Field: aws.String("preInstalledSw"), Type: "TERM_MATCH", Value: aws.String("NA"), }, { Field: aws.String("operatingSystem"), Type: "TERM_MATCH", Value: aws.String("Linux"), }, { Field: aws.String("capacitystatus"), Type: "TERM_MATCH", Value: aws.String("Used"), }, { Field: aws.String("marketoption"), Type: "TERM_MATCH", Value: aws.String("OnDemand"), }}, additionalFilters...) input := &pricing.GetProductsInput{ Filters: filters, ServiceCode: aws.String("AmazonEC2"), } paginator := pricing.NewGetProductsPaginator(p.pricing, input) for paginator.HasMorePages() { output, err := paginator.NextPage(ctx) if err != nil { return nil, fmt.Errorf("getting pricing data, %w", err) } prices = lo.Assign(prices, p.onDemandPage(ctx, output)) } return prices, nil } func (p *DefaultProvider) spotPage(ctx context.Context, output *ec2.DescribeSpotPriceHistoryOutput) map[ec2types.InstanceType]zonal { result := map[ec2types.InstanceType]zonal{} for _, sph := range output.SpotPriceHistory { spotPriceStr := aws.ToString(sph.SpotPrice) spotPrice, err := strconv.ParseFloat(spotPriceStr, 64) // these errors shouldn't occur, but if pricing API does have an error, we ignore the record if err != nil { log.FromContext(ctx).V(1).Info(fmt.Sprintf("unable to parse price record %#v", sph)) continue } if sph.Timestamp == nil { continue } instanceType := sph.InstanceType az := aws.ToString(sph.AvailabilityZone) _, ok := result[instanceType] if !ok { result[instanceType] = zonal{ prices: map[string]float64{}, } } result[instanceType].prices[az] = spotPrice } return result } // turning off cyclo here, it measures as a 12 due to all of the type checks of the pricing data which returns a deeply // nested map[string]interface{} // nolint: gocyclo func (p *DefaultProvider) onDemandPage(ctx context.Context, output *pricing.GetProductsOutput) map[ec2types.InstanceType]float64 { // this isn't the full pricing struct, just the portions we care about type priceItem struct { Product struct { Attributes struct { InstanceType string } } Terms struct { OnDemand map[string]struct { PriceDimensions map[string]struct { PricePerUnit map[string]string } } } } result := map[ec2types.InstanceType]float64{} currency := "USD" if strings.HasPrefix(p.region, "cn-") { currency = "CNY" } for _, outer := range output.PriceList { pItem := &priceItem{} if err := json.Unmarshal([]byte(outer), pItem); err != nil { log.FromContext(ctx).Error(err, "failed unmarshaling pricing data") } if pItem.Product.Attributes.InstanceType == "" { continue } for _, term := range pItem.Terms.OnDemand { for _, v := range term.PriceDimensions { price, err := strconv.ParseFloat(v.PricePerUnit[currency], 64) if err != nil || price == 0 { continue } result[ec2types.InstanceType(pItem.Product.Attributes.InstanceType)] = price } } } return result } // nolint: gocyclo func (p *DefaultProvider) UpdateSpotPricing(ctx context.Context) error { prices := map[ec2types.InstanceType]zonal{} p.muSpot.Lock() defer p.muSpot.Unlock() input := &ec2.DescribeSpotPriceHistoryInput{ ProductDescriptions: []string{ "Linux/UNIX", "Linux/UNIX (Amazon VPC)", }, // get the latest spot price for each instance type StartTime: aws.Time(time.Now()), } paginator := ec2.NewDescribeSpotPriceHistoryPaginator(p.ec2, input) for paginator.HasMorePages() { output, err := paginator.NextPage(ctx) if err != nil { return fmt.Errorf("retrieving spot pricing data, %w", err) } for it, z := range p.spotPage(ctx, output) { prices[it] = combineZonalPricing(prices[it], z) } } if len(prices) == 0 { return fmt.Errorf("no spot pricing found") } totalOfferings := 0 for it, zoneData := range prices { // Maintain previously retrieved pricing data p.spotPrices[it] = combineZonalPricing(p.spotPrices[it], zoneData) totalOfferings += len(zoneData.prices) } p.spotPricingUpdated = true if p.cm.HasChanged("spot-prices", p.spotPrices) { log.FromContext(ctx).WithValues( "instance-type-count", len(p.spotPrices), "offering-count", totalOfferings).V(1).Info("updated spot pricing with instance types and offerings") } return nil } func (p *DefaultProvider) LivenessProbe(_ *http.Request) error { // ensure we don't deadlock and nolint for the empty critical section p.muOnDemand.Lock() p.muSpot.Lock() //nolint: staticcheck p.muOnDemand.Unlock() p.muSpot.Unlock() return nil } func populateInitialSpotPricing(pricing map[ec2types.InstanceType]float64) map[ec2types.InstanceType]zonal { m := map[ec2types.InstanceType]zonal{} for it, price := range pricing { m[it] = newZonalPricing(price) } return m } func (p *DefaultProvider) Reset() { // see if we've got region specific pricing data staticPricing, ok := initialOnDemandPrices[p.region] if !ok { // and if not, fall back to the always available us-east-1 staticPricing = initialOnDemandPrices["us-east-1"] } p.onDemandPrices = staticPricing // default our spot pricing to the same as the on-demand pricing until a price update p.spotPrices = populateInitialSpotPricing(staticPricing) p.spotPricingUpdated = false }