pkg/providers/pricing/pricing.go (235 lines of code) (raw):
/*
Portions Copyright (c) Microsoft Corporation.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pricing
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/samber/lo"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
"github.com/Azure/karpenter-provider-azure/pkg/providers/pricing/client"
)
// pricingUpdatePeriod is how often we try to update our pricing information after the initial update on startup
const pricingUpdatePeriod = 12 * time.Hour
const defaultRegion = "eastus"
// Provider provides actual pricing data to the Azure cloud provider to allow it to make more informed decisions
// regarding which instances to launch. This is initialized at startup with a periodically updated static price list to
// support running in locations where pricing data is unavailable. In those cases the static pricing data provides a
// relative ordering that is still more accurate than our previous pricing model. In the event that a pricing update
// fails, the previous pricing information is retained and used which may be the static initial pricing data if pricing
// updates never succeed.
type Provider struct {
pricing client.PricingAPI
region string
cm *pretty.ChangeMonitor
mu sync.RWMutex
onDemandUpdateTime time.Time
onDemandPrices map[string]float64
spotUpdateTime time.Time
spotPrices map[string]float64
}
type Err struct {
error
lastOnDemandUpdateTime time.Time
lastSpotUpdateTime time.Time
}
// NewPricingAPI returns a pricing API
func NewAPI() client.PricingAPI {
return client.New()
}
func NewProvider(ctx context.Context, pricing client.PricingAPI, region string, startAsync <-chan struct{}) *Provider {
// see if we've got region specific pricing data
staticPricing, ok := initialOnDemandPrices[region]
if !ok {
// and if not, fall back to the always available eastus
staticPricing = initialOnDemandPrices[defaultRegion]
}
p := &Provider{
region: region,
onDemandUpdateTime: initialPriceUpdate,
onDemandPrices: staticPricing,
spotUpdateTime: initialPriceUpdate,
// default our spot pricing to the same as the on-demand pricing until a price update
spotPrices: staticPricing,
pricing: pricing,
cm: pretty.NewChangeMonitor(),
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithName("pricing"))
go func() {
// perform an initial price update at startup
p.updatePricing(ctx)
startup := time.Now()
// wait for leader election or to be signaled to exit
select {
case <-startAsync:
case <-ctx.Done():
return
}
// if it took many hours to be elected leader, we want to re-fetch pricing before we start our periodic
// polling
if time.Since(startup) > pricingUpdatePeriod {
p.updatePricing(ctx)
}
for {
select {
case <-ctx.Done():
return
case <-time.After(pricingUpdatePeriod):
p.updatePricing(ctx)
}
}
}()
return p
}
// InstanceTypes returns the list of all instance types for which either a price is known.
func (p *Provider) InstanceTypes() []string {
p.mu.RLock()
defer p.mu.RUnlock()
return lo.Union(lo.Keys(p.onDemandPrices), lo.Keys(p.spotPrices))
}
// OnDemandLastUpdated returns the time that the on-demand pricing was last updated
func (p *Provider) OnDemandLastUpdated() time.Time {
p.mu.RLock()
defer p.mu.RUnlock()
return p.onDemandUpdateTime
}
// SpotLastUpdated returns the time that the spot pricing was last updated
func (p *Provider) SpotLastUpdated() time.Time {
p.mu.RLock()
defer p.mu.RUnlock()
return p.spotUpdateTime
}
// OnDemandPrice returns the last known on-demand price for a given instance type, returning false if there is no
// known on-demand pricing for the instance type.
func (p *Provider) OnDemandPrice(instanceType string) (float64, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
price, ok := p.onDemandPrices[instanceType]
if !ok {
return 0.0, false
}
return price, true
}
// SpotPrice returns the last known spot price for a given instance type, returning false
// if there is no known spot pricing for that instance type
func (p *Provider) SpotPrice(instanceType string) (float64, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
price, ok := p.spotPrices[instanceType]
if !ok {
return 0.0, false
}
return price, true
}
func (p *Provider) updatePricing(ctx context.Context) {
if ctx.Err() != nil {
return
}
prices := map[client.Item]bool{}
err := p.fetchPricing(ctx, processPage(prices))
if err != nil {
if ctx.Err() != nil {
return
}
log.FromContext(ctx).Error(err, fmt.Sprintf("error fetching updated pricing for region %s, using existing pricing data, on-demand: %s, spot: %s", p.region, err.lastOnDemandUpdateTime.Format(time.RFC3339), err.lastSpotUpdateTime.Format(time.RFC3339)))
return
}
onDemandPrices, spotPrices := categorizePrices(prices)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := p.UpdateOnDemandPricing(ctx, onDemandPrices); err != nil {
log.FromContext(ctx).Error(err, fmt.Sprintf("error updating on-demand pricing for region %s, using existing pricing data from %s", p.region, err.lastOnDemandUpdateTime.Format(time.RFC3339)))
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := p.UpdateSpotPricing(ctx, spotPrices); err != nil {
log.FromContext(ctx).Error(err, fmt.Sprintf("error updating spot pricing for region %s, using existing pricing data from %s", p.region, err.lastSpotUpdateTime.Format(time.RFC3339)))
}
}()
wg.Wait()
}
func (p *Provider) UpdateOnDemandPricing(ctx context.Context, onDemandPrices map[string]float64) *Err {
p.mu.Lock()
defer p.mu.Unlock()
if len(onDemandPrices) == 0 {
return &Err{error: errors.New("no on-demand pricing found"), lastOnDemandUpdateTime: p.onDemandUpdateTime}
}
p.onDemandPrices = lo.Assign(onDemandPrices)
p.onDemandUpdateTime = time.Now()
if p.cm.HasChanged("on-demand-prices", p.onDemandPrices) {
log.FromContext(ctx).WithValues("instance-type-count", len(p.onDemandPrices)).Info(fmt.Sprintf("updated on-demand pricing for region %s", p.region))
}
return nil
}
func (p *Provider) fetchPricing(ctx context.Context, pageHandler func(output *client.ProductsPricePage)) *Err {
p.mu.Lock()
defer p.mu.Unlock()
filters := []*client.Filter{
{
Field: "priceType",
Operator: client.Equals,
Value: "Consumption",
},
{
Field: "currencyCode",
Operator: client.Equals,
Value: "USD",
},
{
Field: "serviceFamily",
Operator: client.Equals,
Value: "Compute",
},
{
Field: "serviceName",
Operator: client.Equals,
Value: "Virtual Machines",
},
{
Field: "armRegionName",
Operator: client.Equals,
Value: p.region,
}}
err := p.pricing.GetProductsPricePages(ctx, filters, pageHandler)
if err != nil {
return &Err{error: err, lastOnDemandUpdateTime: p.onDemandUpdateTime, lastSpotUpdateTime: p.spotUpdateTime}
}
return nil
}
func processPage(prices map[client.Item]bool) func(page *client.ProductsPricePage) {
return func(page *client.ProductsPricePage) {
for _, pItem := range page.Items {
if strings.HasSuffix(pItem.ProductName, " Windows") {
continue
}
if strings.HasSuffix(pItem.MeterName, " Low Priority") {
// https://learn.microsoft.com/en-us/azure/batch/batch-spot-vms#differences-between-spot-and-low-priority-vms
continue
}
prices[pItem] = true
}
}
}
func (p *Provider) UpdateSpotPricing(ctx context.Context, spotPrices map[string]float64) *Err {
p.mu.Lock()
defer p.mu.Unlock()
if len(spotPrices) == 0 {
return &Err{error: errors.New("no spot pricing found"), lastSpotUpdateTime: p.spotUpdateTime}
}
p.spotPrices = lo.Assign(spotPrices)
p.spotUpdateTime = time.Now()
if p.cm.HasChanged("spot-prices", p.spotPrices) {
log.FromContext(ctx).WithValues("instance-type-count", len(p.spotPrices)).Info(fmt.Sprintf("updated spot pricing for region %s", p.region))
}
return nil
}
func categorizePrices(prices map[client.Item]bool) (map[string]float64, map[string]float64) {
var onDemandPrices, spotPrices = map[string]float64{}, map[string]float64{}
for price := range prices {
if strings.HasSuffix(price.SkuName, " Spot") {
spotPrices[price.ArmSkuName] = price.RetailPrice
} else {
onDemandPrices[price.ArmSkuName] = price.RetailPrice
}
}
return onDemandPrices, spotPrices
}
func (p *Provider) LivenessProbe(_ *http.Request) error {
// ensure we don't deadlock and nolint for the empty critical section
p.mu.Lock()
//nolint: staticcheck
p.mu.Unlock()
return nil
}
func (p *Provider) Reset() {
// see if we've got region specific pricing data
staticPricing, ok := initialOnDemandPrices[p.region]
if !ok {
// and if not, fall back to the always available eastus
staticPricing = initialOnDemandPrices[defaultRegion]
}
p.mu.Lock()
defer p.mu.Unlock()
p.onDemandPrices = staticPricing
p.onDemandUpdateTime = initialPriceUpdate
}