otelcollector/otel-allocator/internal/allocation/allocator.go (232 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package allocation
import (
"errors"
"runtime"
"slices"
"sync"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/internal/diff"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/internal/target"
)
/*
Target Allocator will serve on an HTTP server exposing /jobs/<job_id>/targets
The targets are allocated using the least connection method
Target Allocator will need information about the collectors in order to set the URLs
Keep a Map of what each collector currently holds and update it based on new scrape target updates
*/
var _ Allocator = &allocator{}
func newAllocator(log logr.Logger, strategy Strategy, opts ...Option) Allocator {
chAllocator := &allocator{
strategy: strategy,
collectors: make(map[string]*Collector),
targetItems: make(map[target.ItemHash]*target.Item),
targetItemsPerJobPerCollector: make(map[string]map[string]map[target.ItemHash]bool),
log: log,
}
for _, opt := range opts {
opt(chAllocator)
}
return chAllocator
}
type allocator struct {
strategy Strategy
// collectors is a map from a Collector's name to a Collector instance
// collectorKey -> collector pointer
collectors map[string]*Collector
// targetItems is a map from a target item's hash to the target items allocated state
// targetItem hash -> target item pointer
targetItems map[target.ItemHash]*target.Item
// collectorKey -> job -> target item hash -> true
targetItemsPerJobPerCollector map[string]map[string]map[target.ItemHash]bool
// m protects collectors, targetItems and targetItemsPerJobPerCollector for concurrent use.
m sync.RWMutex
log logr.Logger
filter Filter
}
// SetFilter sets the filtering hook to use.
func (a *allocator) SetFilter(filter Filter) {
a.filter = filter
}
// SetFallbackStrategy sets the fallback strategy to use.
func (a *allocator) SetFallbackStrategy(strategy Strategy) {
a.strategy.SetFallbackStrategy(strategy)
}
// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (a *allocator) SetTargets(targets []*target.Item) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", a.strategy.GetName()))
defer timer.ObserveDuration()
if a.filter != nil {
targets = a.filter.Apply(targets)
}
RecordTargetsKept(targets)
concurrency := runtime.NumCPU() * 2 // determined experimentally
targetMap := buildTargetMap(targets, concurrency)
a.m.Lock()
defer a.m.Unlock()
// Check for target changes
targetsDiff := diff.Maps(a.targetItems, targetMap)
// If there are any additions or removals
if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 {
a.handleTargets(targetsDiff)
}
}
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (a *allocator) SetCollectors(collectors map[string]*Collector) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", a.strategy.GetName()))
defer timer.ObserveDuration()
CollectorsAllocatable.WithLabelValues(a.strategy.GetName()).Set(float64(len(collectors)))
if len(collectors) == 0 {
a.log.Info("No collector instances present")
}
a.m.Lock()
defer a.m.Unlock()
// Check for collector changes
collectorsDiff := diff.Maps(a.collectors, collectors)
if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 {
a.handleCollectors(collectorsDiff)
}
}
func (a *allocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item {
a.m.RLock()
defer a.m.RUnlock()
if _, ok := a.targetItemsPerJobPerCollector[collector]; !ok {
return []*target.Item{}
}
if _, ok := a.targetItemsPerJobPerCollector[collector][job]; !ok {
return []*target.Item{}
}
targetItemsCopy := make([]*target.Item, len(a.targetItemsPerJobPerCollector[collector][job]))
index := 0
for targetHash := range a.targetItemsPerJobPerCollector[collector][job] {
targetItemsCopy[index] = a.targetItems[targetHash]
index++
}
return targetItemsCopy
}
// TargetItems returns a shallow copy of the targetItems map.
func (a *allocator) TargetItems() map[target.ItemHash]*target.Item {
a.m.RLock()
defer a.m.RUnlock()
targetItemsCopy := make(map[target.ItemHash]*target.Item)
for k, v := range a.targetItems {
targetItemsCopy[k] = v
}
return targetItemsCopy
}
// Collectors returns a shallow copy of the collectors map.
func (a *allocator) Collectors() map[string]*Collector {
a.m.RLock()
defer a.m.RUnlock()
collectorsCopy := make(map[string]*Collector)
for k, v := range a.collectors {
collectorsCopy[k] = v
}
return collectorsCopy
}
// handleTargets receives the new and removed targets and reconciles the current state.
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector.
// Any net-new additions are assigned to the collector on the same node as the target.
func (a *allocator) handleTargets(diff diff.Changes[target.ItemHash, *target.Item]) {
// Check for removals
for k, item := range a.targetItems {
// if the current item is in the removals list
if _, ok := diff.Removals()[k]; ok {
a.removeTargetItem(item)
}
}
// Check for additions
var assignmentErrors []error
for k, item := range diff.Additions() {
// Do nothing if the item is already there
if _, ok := a.targetItems[k]; ok {
continue
} else {
// TODO: track target -> collector relationship in a separate map
item.CollectorName = ""
// Add item to item pool and assign a collector
err := a.addTargetToTargetItems(item)
if err != nil {
assignmentErrors = append(assignmentErrors, err)
}
}
}
// Check for unassigned targets
unassignedTargets := len(assignmentErrors)
if unassignedTargets > 0 {
err := errors.Join(assignmentErrors...)
a.log.Info("Could not assign targets for some jobs", "targets", unassignedTargets, "error", err)
TargetsUnassigned.Set(float64(unassignedTargets))
}
}
func (a *allocator) addTargetToTargetItems(tg *target.Item) error {
a.targetItems[tg.Hash()] = tg
if len(a.collectors) == 0 {
return nil
}
colOwner, err := a.strategy.GetCollectorForTarget(a.collectors, tg)
if err != nil {
return err
}
// Check if this is a reassignment, if so, unassign first
// note: The ordering here is important, we want to determine the new assignment before unassigning, because
// the strategy might make use of previous assignment information
if _, ok := a.collectors[tg.CollectorName]; ok && tg.CollectorName != "" {
a.unassignTargetItem(tg)
}
tg.CollectorName = colOwner.Name
a.addCollectorTargetItemMapping(tg)
a.collectors[colOwner.Name].NumTargets++
TargetsPerCollector.WithLabelValues(colOwner.String(), a.strategy.GetName()).Set(float64(a.collectors[colOwner.String()].NumTargets))
return nil
}
// unassignTargetItem unassigns the target item from its Collector. The target item is still tracked.
func (a *allocator) unassignTargetItem(item *target.Item) {
collectorName := item.CollectorName
if collectorName == "" {
return
}
c, ok := a.collectors[collectorName]
if !ok {
return
}
c.NumTargets--
TargetsPerCollector.WithLabelValues(item.CollectorName, a.strategy.GetName()).Set(float64(c.NumTargets))
delete(a.targetItemsPerJobPerCollector[item.CollectorName][item.JobName], item.Hash())
if len(a.targetItemsPerJobPerCollector[item.CollectorName][item.JobName]) == 0 {
delete(a.targetItemsPerJobPerCollector[item.CollectorName], item.JobName)
}
item.CollectorName = ""
}
// removeTargetItem removes the target item from its Collector.
func (a *allocator) removeTargetItem(item *target.Item) {
a.unassignTargetItem(item)
delete(a.targetItems, item.Hash())
}
// removeCollector removes a Collector from the allocator.
func (a *allocator) removeCollector(collector *Collector) {
delete(a.collectors, collector.Name)
// Remove the collector from any target item records
for _, targetItems := range a.targetItemsPerJobPerCollector[collector.Name] {
for targetHash := range targetItems {
a.targetItems[targetHash].CollectorName = ""
}
}
delete(a.targetItemsPerJobPerCollector, collector.Name)
TargetsPerCollector.WithLabelValues(collector.Name, a.strategy.GetName()).Set(0)
}
// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets
// this allows the allocator to respond without any extra allocations to http calls. The caller of this method
// has to acquire a lock.
func (a *allocator) addCollectorTargetItemMapping(tg *target.Item) {
if a.targetItemsPerJobPerCollector[tg.CollectorName] == nil {
a.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[target.ItemHash]bool)
}
if a.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil {
a.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[target.ItemHash]bool)
}
a.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true
}
// handleCollectors receives the new and removed collectors and reconciles the current state.
// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map.
// Finally, update all targets' collector assignments.
func (a *allocator) handleCollectors(diff diff.Changes[string, *Collector]) {
// Clear removed collectors
for _, k := range diff.Removals() {
a.removeCollector(k)
}
// Insert the new collectors
for _, i := range diff.Additions() {
a.collectors[i.Name] = NewCollector(i.Name, i.NodeName)
}
// Set collectors on the strategy
a.strategy.SetCollectors(a.collectors)
// Re-Allocate all targets
var assignmentErrors []error
for _, item := range a.targetItems {
err := a.addTargetToTargetItems(item)
if err != nil {
assignmentErrors = append(assignmentErrors, err)
item.CollectorName = ""
}
}
// Check for unassigned targets
unassignedTargets := len(assignmentErrors)
if unassignedTargets > 0 {
err := errors.Join(assignmentErrors...)
a.log.Info("Could not assign targets for some jobs", "targets", unassignedTargets, "error", err)
TargetsUnassigned.Set(float64(unassignedTargets))
}
}
const minChunkSize = 100 // for small target counts, it's not worth it to spawn a lot of goroutines
// buildTargetMap builds a map of targets, using their hashes as keys. It does this concurrently, and the concurrency
// is configurable via the concurrency parameter. We do this in parallel because target hashing is surprisingly
// expensive.
func buildTargetMap(targets []*target.Item, concurrency int) map[target.ItemHash]*target.Item {
// technically there may be duplicates, so this may overallocate, but in the majority of cases it will be exact
result := make(map[target.ItemHash]*target.Item, len(targets))
chunkSize := len(targets) / concurrency
chunkSize = max(chunkSize, minChunkSize)
wg := sync.WaitGroup{}
for chunk := range slices.Chunk(targets, chunkSize) {
wg.Add(1)
go func(ch []*target.Item) {
defer wg.Done()
for _, item := range ch {
item.Hash()
}
}(chunk)
}
wg.Wait()
for _, item := range targets {
result[item.Hash()] = item
}
return result
}