pkg/pricing/pricing.go (206 lines of code) (raw):
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pricing
import (
"context"
"errors"
"net/http"
"strings"
"sync"
"time"
"github.com/Azure/aks-node-viewer/pkg/pricing/client"
"github.com/samber/lo"
)
// Provider provides actual pricing data to the Azure cloud provider to allow it to make more informed decisions
// regarding which instances to launch. This is initialized at startup with a periodically updated static price list to
// support running in locations where pricing data is unavailable. In those cases the static pricing data provides a
// relative ordering that is still more accurate than our previous pricing model. In the event that a pricing update
// fails, the previous pricing information is retained and used which may be the static initial pricing data if pricing
// updates never succeed.
type Provider struct {
pricing client.PricingAPI
region string
mu sync.RWMutex
onDemandUpdateTime time.Time
onDemandPrices map[string]float64
spotUpdateTime time.Time
spotPrices map[string]float64
}
type Err struct {
error
lastUpdateTime time.Time
}
// NewPricingAPI returns a pricing API
func NewAPI() client.PricingAPI {
return client.New()
}
func NewProvider(ctx context.Context, pricing client.PricingAPI, region string) *Provider {
// see if we've got region specific pricing data
staticPricing, ok := initialOnDemandPrices[region]
if !ok {
// and if not, fall back to the always available eastus
staticPricing = initialOnDemandPrices["eastus"]
}
p := &Provider{
region: region,
onDemandUpdateTime: initialPriceUpdate,
onDemandPrices: staticPricing,
spotUpdateTime: initialPriceUpdate,
// default our spot pricing to the same as the on-demand pricing until a price update
spotPrices: staticPricing,
pricing: pricing,
}
go func() {
// perform an initial price update at startup
p.updatePricing(ctx)
}()
return p
}
// InstanceTypes returns the list of all instance types for which either a price is known.
func (p *Provider) InstanceTypes() []string {
p.mu.RLock()
defer p.mu.RUnlock()
return lo.Union(lo.Keys(p.onDemandPrices), lo.Keys(p.spotPrices))
}
// OnDemandLastUpdated returns the time that the on-demand pricing was last updated
func (p *Provider) OnDemandLastUpdated() time.Time {
p.mu.RLock()
defer p.mu.RUnlock()
return p.onDemandUpdateTime
}
// SpotLastUpdated returns the time that the spot pricing was last updated
func (p *Provider) SpotLastUpdated() time.Time {
p.mu.RLock()
defer p.mu.RUnlock()
return p.spotUpdateTime
}
// OnDemandPrice returns the last known on-demand price for a given instance type, returning false if there is no
// known on-demand pricing for the instance type.
func (p *Provider) OnDemandPrice(instanceType string) (float64, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
price, ok := p.onDemandPrices[instanceType]
if !ok {
return 0.0, false
}
return price, true
}
// SpotPrice returns the last known spot price for a given instance type, returning false
// if there is no known spot pricing for that instance type
func (p *Provider) SpotPrice(instanceType string) (float64, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
price, ok := p.spotPrices[instanceType]
if !ok {
return 0.0, false
}
return price, true
}
func (p *Provider) updatePricing(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := p.UpdateOnDemandPricing(ctx); err != nil {
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := p.UpdateSpotPricing(ctx); err != nil {
}
}()
wg.Wait()
}
func (p *Provider) UpdateOnDemandPricing(ctx context.Context) *Err {
// standard on-demand instances
var wg sync.WaitGroup
var onDemandPrices = map[string]float64{}
var onDemandErr error
wg.Add(1)
go func() {
defer wg.Done()
onDemandErr = p.fetchPricing(ctx, onDemandPage(onDemandPrices))
}()
wg.Wait()
p.mu.Lock()
defer p.mu.Unlock()
err := onDemandErr
if err != nil {
return &Err{error: err, lastUpdateTime: p.onDemandUpdateTime}
}
if len(onDemandPrices) == 0 {
return &Err{error: errors.New("no on-demand pricing found"), lastUpdateTime: p.onDemandUpdateTime}
}
p.onDemandPrices = lo.Assign(onDemandPrices)
p.onDemandUpdateTime = time.Now()
return nil
}
func (p *Provider) fetchPricing(ctx context.Context, pageHandler func(output *client.ProductsPricePage)) error {
filters := []*client.Filter{
{
Field: "priceType",
Operator: client.Equals,
Value: "Consumption",
},
{
Field: "currencyCode",
Operator: client.Equals,
Value: "USD",
},
{
Field: "serviceFamily",
Operator: client.Equals,
Value: "Compute",
},
{
Field: "serviceName",
Operator: client.Equals,
Value: "Virtual Machines",
},
{
Field: "armRegionName",
Operator: client.Equals,
Value: p.region,
}}
return p.pricing.GetProductsPricePages(ctx, filters, pageHandler)
}
func onDemandPage(prices map[string]float64) func(page *client.ProductsPricePage) {
return func(page *client.ProductsPricePage) {
for _, pItem := range page.Items {
if strings.HasSuffix(pItem.ProductName, " Windows") {
continue
}
if strings.HasSuffix(pItem.MeterName, " Low Priority") {
// https://learn.microsoft.com/en-us/azure/batch/batch-spot-vms#differences-between-spot-and-low-priority-vms
continue
}
if strings.HasSuffix(pItem.SkuName, " Spot") {
continue
}
prices[pItem.ArmSkuName] = pItem.RetailPrice
}
}
}
func (p *Provider) UpdateSpotPricing(ctx context.Context) *Err {
// standard on-demand instances
var wg sync.WaitGroup
var spotPrices = map[string]float64{}
var spotErr error
wg.Add(1)
go func() {
defer wg.Done()
spotErr = p.fetchPricing(ctx, spotPage(spotPrices))
}()
wg.Wait()
p.mu.Lock()
defer p.mu.Unlock()
err := spotErr
if err != nil {
return &Err{error: err, lastUpdateTime: p.spotUpdateTime}
}
if len(spotPrices) == 0 {
return &Err{error: errors.New("no spot pricing found"), lastUpdateTime: p.spotUpdateTime}
}
p.spotPrices = lo.Assign(spotPrices)
p.spotUpdateTime = time.Now()
return nil
}
func spotPage(prices map[string]float64) func(page *client.ProductsPricePage) {
return func(page *client.ProductsPricePage) {
for _, pItem := range page.Items {
if strings.HasSuffix(pItem.ProductName, " Windows") {
continue
}
if strings.HasSuffix(pItem.MeterName, " Low Priority") {
// https://learn.microsoft.com/en-us/azure/batch/batch-spot-vms#differences-between-spot-and-low-priority-vms
continue
}
if !strings.HasSuffix(pItem.SkuName, " Spot") {
continue
}
prices[pItem.ArmSkuName] = pItem.RetailPrice
}
}
}
func (p *Provider) LivenessProbe(req *http.Request) error {
// ensure we don't deadlock and nolint for the empty critical section
p.mu.Lock()
//nolint: staticcheck
p.mu.Unlock()
return nil
}