otelcollector/otel-allocator/internal/allocation/strategy.go (101 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package allocation import ( "fmt" "github.com/buraksezer/consistent" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/internal/target" ) type AllocatorProvider func(log logr.Logger, opts ...Option) Allocator var ( strategies = map[string]Strategy{ leastWeightedStrategyName: newleastWeightedStrategy(), consistentHashingStrategyName: newConsistentHashingStrategy(), perNodeStrategyName: newPerNodeStrategy(), } // TargetsPerCollector records how many targets have been assigned to each collector. // It is currently the responsibility of the strategy to track this information. TargetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "opentelemetry_allocator_targets_per_collector", Help: "The number of targets for each collector.", }, []string{"collector_name", "strategy"}) CollectorsAllocatable = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "opentelemetry_allocator_collectors_allocatable", Help: "Number of collectors the allocator is able to allocate to.", }, []string{"strategy"}) TimeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "opentelemetry_allocator_time_to_allocate", Help: "The time it takes to allocate", }, []string{"method", "strategy"}) TargetsRemaining = promauto.NewGauge(prometheus.GaugeOpts{ Name: "opentelemetry_allocator_targets_remaining", Help: "Number of targets kept after filtering.", }) TargetsUnassigned = promauto.NewGauge(prometheus.GaugeOpts{ Name: "opentelemetry_allocator_targets_unassigned", Help: "Number of targets that could not be assigned due to missing node label.", }) ) type Option func(Allocator) type Filter interface { Apply([]*target.Item) []*target.Item } func WithFilter(filter Filter) Option { return func(allocator Allocator) { allocator.SetFilter(filter) } } func WithFallbackStrategy(fallbackStrategy string) Option { var strategy, ok = strategies[fallbackStrategy] if fallbackStrategy != "" && !ok { panic(fmt.Errorf("unregistered strategy used as fallback: %s", fallbackStrategy)) } return func(allocator Allocator) { allocator.SetFallbackStrategy(strategy) } } func RecordTargetsKept(targets []*target.Item) { TargetsRemaining.Set(float64(len(targets))) } func New(name string, log logr.Logger, opts ...Option) (Allocator, error) { if strategy, ok := strategies[name]; ok { return newAllocator(log.WithValues("allocator", name), strategy, opts...), nil } return nil, fmt.Errorf("unregistered strategy: %s", name) } func GetRegisteredAllocatorNames() []string { var names []string for s := range strategies { names = append(names, s) } return names } type Allocator interface { SetCollectors(collectors map[string]*Collector) SetTargets(targets []*target.Item) TargetItems() map[target.ItemHash]*target.Item Collectors() map[string]*Collector GetTargetsForCollectorAndJob(collector string, job string) []*target.Item SetFilter(filter Filter) SetFallbackStrategy(strategy Strategy) } type Strategy interface { GetCollectorForTarget(map[string]*Collector, *target.Item) (*Collector, error) // SetCollectors exists for strategies where changing the collector set is potentially an expensive operation. // The caller must guarantee that the collectors map passed in GetCollectorForTarget is consistent with the latest // SetCollectors call. Strategies which don't need this information can just ignore it. SetCollectors(map[string]*Collector) GetName() string // SetFallbackStrategy adds fallback strategy for strategies whose main allocation method can sometimes leave targets unassigned SetFallbackStrategy(Strategy) } var _ consistent.Member = Collector{} // Collector Creates a struct that holds Collector information. // This struct will be parsed into endpoint with Collector and jobs info. // This struct can be extended with information like annotations and labels in the future. type Collector struct { Name string NodeName string NumTargets int } func (c Collector) Hash() string { return c.Name } func (c Collector) String() string { return c.Name } func NewCollector(name, node string) *Collector { return &Collector{Name: name, NodeName: node} }