cmd/amazon-cloudwatch-agent-target-allocator/allocation/consistent_hashing.go (197 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package allocation import ( "strings" "sync" "github.com/buraksezer/consistent" "github.com/cespare/xxhash/v2" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/aws/amazon-cloudwatch-agent-operator/cmd/amazon-cloudwatch-agent-target-allocator/diff" "github.com/aws/amazon-cloudwatch-agent-operator/cmd/amazon-cloudwatch-agent-target-allocator/target" ) var _ Allocator = &consistentHashingAllocator{} const consistentHashingStrategyName = "consistent-hashing" type hasher struct{} func (h hasher) Sum64(data []byte) uint64 { return xxhash.Sum64(data) } type consistentHashingAllocator struct { // m protects consistentHasher, collectors and targetItems for concurrent use. m sync.RWMutex consistentHasher *consistent.Consistent // collectors is a map from a Collector's name to a Collector instance // collectorKey -> collector pointer collectors map[string]*Collector // targetItems is a map from a target item's hash to the target items allocated state // targetItem hash -> target item pointer targetItems map[string]*target.Item // collectorKey -> job -> target item hash -> true targetItemsPerJobPerCollector map[string]map[string]map[string]bool log logr.Logger filter Filter } func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Allocator { config := consistent.Config{ PartitionCount: 1061, ReplicationFactor: 5, Load: 1.1, Hasher: hasher{}, } consistentHasher := consistent.New(nil, config) chAllocator := &consistentHashingAllocator{ consistentHasher: consistentHasher, collectors: make(map[string]*Collector), targetItems: make(map[string]*target.Item), targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool), log: log, } for _, opt := range opts { opt(chAllocator) } return chAllocator } // SetFilter sets the filtering hook to use. func (c *consistentHashingAllocator) SetFilter(filter Filter) { c.filter = filter } // addCollectorTargetItemMapping keeps track of which collector has which jobs and targets // this allows the allocator to respond without any extra allocations to http calls. The caller of this method // has to acquire a lock. func (c *consistentHashingAllocator) addCollectorTargetItemMapping(tg *target.Item) { if c.targetItemsPerJobPerCollector[tg.CollectorName] == nil { c.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool) } if c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil { c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool) } c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true } // addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. // INVARIANT: c.collectors must have at least 1 collector set. // NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target // item while it's being encoded by the server JSON handler. func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) { // Check if this is a reassignment, if so, decrement the previous collector's NumTargets if previousColName, ok := c.collectors[tg.CollectorName]; ok { previousColName.NumTargets-- delete(c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName], tg.Hash()) TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets)) } colOwner := c.consistentHasher.LocateKey([]byte(strings.Join(tg.TargetURL, ""))) tg.CollectorName = colOwner.String() c.targetItems[tg.Hash()] = tg c.addCollectorTargetItemMapping(tg) c.collectors[colOwner.String()].NumTargets++ TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets)) } // handleTargets receives the new and removed targets and reconciles the current state. // Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector. // Any net-new additions are assigned to the next available collector. func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Item]) { // Check for removals for k, item := range c.targetItems { // if the current item is in the removals list if _, ok := diff.Removals()[k]; ok { col := c.collectors[item.CollectorName] col.NumTargets-- delete(c.targetItems, k) delete(c.targetItemsPerJobPerCollector[item.CollectorName][item.JobName], item.Hash()) TargetsPerCollector.WithLabelValues(item.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets)) } } // Check for additions for k, item := range diff.Additions() { // Do nothing if the item is already there if _, ok := c.targetItems[k]; ok { continue } else { // Add item to item pool and assign a collector c.addTargetToTargetItems(item) } } } // handleCollectors receives the new and removed collectors and reconciles the current state. // Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map. // Finally, update all targets' collectors to match the consistent hashing. func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collector]) { // Clear removed collectors for _, k := range diff.Removals() { delete(c.collectors, k.Name) delete(c.targetItemsPerJobPerCollector, k.Name) c.consistentHasher.Remove(k.Name) TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0) } // Insert the new collectors for _, i := range diff.Additions() { c.collectors[i.Name] = NewCollector(i.Name) c.consistentHasher.Add(c.collectors[i.Name]) } // Re-Allocate all targets for _, item := range c.targetItems { c.addTargetToTargetItems(item) } } // SetTargets accepts a list of targets that will be used to make // load balancing decisions. This method should be called when there are // new targets discovered or existing targets are shutdown. func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) { timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName)) defer timer.ObserveDuration() if c.filter != nil { targets = c.filter.Apply(targets) } RecordTargetsKept(targets) c.m.Lock() defer c.m.Unlock() if len(c.collectors) == 0 { c.log.Info("No collector instances present, saving targets to allocate to collector(s)") // If there were no targets discovered previously, assign this as the new set of target items if len(c.targetItems) == 0 { c.log.Info("Not discovered any targets previously, saving targets found to the targetItems set") for k, item := range targets { c.targetItems[k] = item } } else { // If there were previously discovered targets, add or remove accordingly targetsDiffEmptyCollectorSet := diff.Maps(c.targetItems, targets) // Check for additions if len(targetsDiffEmptyCollectorSet.Additions()) > 0 { c.log.Info("New targets discovered, adding new targets to the targetItems set") for k, item := range targetsDiffEmptyCollectorSet.Additions() { // Do nothing if the item is already there if _, ok := c.targetItems[k]; ok { continue } else { // Add item to item pool c.targetItems[k] = item } } } // Check for deletions if len(targetsDiffEmptyCollectorSet.Removals()) > 0 { c.log.Info("Targets removed, Removing targets from the targetItems set") for k := range targetsDiffEmptyCollectorSet.Removals() { // Delete item from target items delete(c.targetItems, k) } } } return } // Check for target changes targetsDiff := diff.Maps(c.targetItems, targets) // If there are any additions or removals if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { c.handleTargets(targetsDiff) } } // SetCollectors sets the set of collectors with key=collectorName, value=Collector object. // This method is called when Collectors are added or removed. func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) { timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName)) defer timer.ObserveDuration() CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors))) if len(collectors) == 0 { c.log.Info("No collector instances present") return } c.m.Lock() defer c.m.Unlock() // Check for collector changes collectorsDiff := diff.Maps(c.collectors, collectors) if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { c.handleCollectors(collectorsDiff) } c.log.Info("Setting collector completed") } func (c *consistentHashingAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item { c.m.RLock() defer c.m.RUnlock() if _, ok := c.targetItemsPerJobPerCollector[collector]; !ok { return []*target.Item{} } if _, ok := c.targetItemsPerJobPerCollector[collector][job]; !ok { return []*target.Item{} } targetItemsCopy := make([]*target.Item, len(c.targetItemsPerJobPerCollector[collector][job])) index := 0 for targetHash := range c.targetItemsPerJobPerCollector[collector][job] { targetItemsCopy[index] = c.targetItems[targetHash] index++ } return targetItemsCopy } // TargetItems returns a shallow copy of the targetItems map. func (c *consistentHashingAllocator) TargetItems() map[string]*target.Item { c.m.RLock() defer c.m.RUnlock() targetItemsCopy := make(map[string]*target.Item) for k, v := range c.targetItems { targetItemsCopy[k] = v } return targetItemsCopy } // Collectors returns a shallow copy of the collectors map. func (c *consistentHashingAllocator) Collectors() map[string]*Collector { c.m.RLock() defer c.m.RUnlock() collectorsCopy := make(map[string]*Collector) for k, v := range c.collectors { collectorsCopy[k] = v } return collectorsCopy }