otelcollector/otel-allocator/internal/allocation/consistent_hashing.go (55 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package allocation import ( "fmt" "github.com/buraksezer/consistent" "github.com/cespare/xxhash/v2" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/internal/target" ) const consistentHashingStrategyName = "consistent-hashing" type hasher struct{} func (h hasher) Sum64(data []byte) uint64 { return xxhash.Sum64(data) } var _ Strategy = &consistentHashingStrategy{} type consistentHashingStrategy struct { config consistent.Config consistentHasher *consistent.Consistent } func newConsistentHashingStrategy() Strategy { config := consistent.Config{ PartitionCount: 1061, ReplicationFactor: 5, Load: 1.1, Hasher: hasher{}, } consistentHasher := consistent.New(nil, config) chStrategy := &consistentHashingStrategy{ consistentHasher: consistentHasher, config: config, } return chStrategy } func (s *consistentHashingStrategy) GetName() string { return consistentHashingStrategyName } func (s *consistentHashingStrategy) GetCollectorForTarget(collectors map[string]*Collector, item *target.Item) (*Collector, error) { hashKey := item.TargetURL member := s.consistentHasher.LocateKey([]byte(hashKey)) collectorName := member.String() collector, ok := collectors[collectorName] if !ok { return nil, fmt.Errorf("unknown collector %s", collectorName) } return collector, nil } func (s *consistentHashingStrategy) SetCollectors(collectors map[string]*Collector) { // we simply recreate the hasher with the new member set // this isn't any more expensive than doing a diff and then applying the change var members []consistent.Member if len(collectors) > 0 { members = make([]consistent.Member, 0, len(collectors)) for _, collector := range collectors { members = append(members, collector) } } s.consistentHasher = consistent.New(members, s.config) } func (s *consistentHashingStrategy) SetFallbackStrategy(fallbackStrategy Strategy) {}