plugins/processors/awsapplicationsignals/internal/cardinalitycontrol/metrics_limiter.go (335 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package cardinalitycontrol import ( "context" "fmt" "sort" "sync" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.uber.org/zap" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config" ) const ( UnprocessedServiceOperationValue = "AllOtherOperations" UnprocessedRemoteServiceOperationValue = "AllOtherRemoteOperations" ) const ( defaultCMSDepth = 3 defaultCMSWidth = 5000 ) var awsDeclaredMetricAttributes = []string{ common.AttributeEKSClusterName, common.AttributeK8SClusterName, common.AttributeK8SNamespace, common.CWMetricAttributeEnvironment, common.CWMetricAttributeLocalService, common.CWMetricAttributeLocalOperation, common.CWMetricAttributeRemoteService, common.CWMetricAttributeRemoteOperation, common.CWMetricAttributeRemoteResourceIdentifier, common.CWMetricAttributeRemoteEnvironment, } type Limiter interface { Admit(name string, attributes, resourceAttributes pcommon.Map) (bool, error) } type MetricsLimiter struct { DropThreshold int LogDroppedMetrics bool RotationInterval time.Duration logger *zap.Logger ctx context.Context mapLock sync.RWMutex services map[string]*service } func NewMetricsLimiter(config *config.LimiterConfig, logger *zap.Logger) Limiter { logger.Info("creating metrics limiter with config", zap.Any("config", config)) ctx := config.ParentContext if ctx == nil { ctx = context.TODO() } limiter := &MetricsLimiter{ DropThreshold: config.Threshold, LogDroppedMetrics: config.LogDroppedMetrics, RotationInterval: config.RotationInterval, logger: logger, ctx: ctx, services: map[string]*service{}, } go func() { for { select { case <-ctx.Done(): return default: limiter.removeStaleServices() time.Sleep(config.GarbageCollectionInterval) } } }() logger.Info("metrics limiter created.") return limiter } func (m *MetricsLimiter) Admit(metricName string, attributes, resourceAttributes pcommon.Map) (bool, error) { labels, serviceName, found := m.filterAWSDeclaredAttributes(attributes, resourceAttributes) if !found { return true, nil } admitted := true m.mapLock.RLock() svc := m.services[serviceName] m.mapLock.RUnlock() if svc == nil { m.mapLock.Lock() svc = m.services[serviceName] if svc == nil { svc = newService(serviceName, m.DropThreshold, m.RotationInterval, m.ctx, m.logger) m.services[serviceName] = svc } m.mapLock.Unlock() } metricData := newMetricData(serviceName, metricName, labels) reserved, _ := attributes.Get(common.AttributeTmpReserved) if reserved.Bool() { attributes.Remove(common.AttributeTmpReserved) return true, nil } if !svc.admitMetricData(metricData) { svc.rollupMetricData(attributes) svc.totalRollup++ admitted = false if m.LogDroppedMetrics { m.logger.Debug(fmt.Sprintf("[%s] drop metric data", svc.name), zap.Any("labels", labels)) } } svc.totalMetricSent++ svc.rwLock.RLock() defer svc.rwLock.RUnlock() svc.totalCount++ svc.InsertMetricDataToPrimary(metricData) svc.InsertMetricDataToSecondary(metricData) return admitted, nil } func (m *MetricsLimiter) filterAWSDeclaredAttributes(attributes, resourceAttributes pcommon.Map) (map[string]string, string, bool) { svcNameAttr, exists := attributes.Get(common.CWMetricAttributeLocalService) if !exists { return nil, "", false } labels := map[string]string{} svcName := svcNameAttr.AsString() for _, attrKey := range awsDeclaredMetricAttributes { if attr, ok := attributes.Get(attrKey); ok { labels[attrKey] = attr.AsString() } } return labels, svcName, true } func (m *MetricsLimiter) removeStaleServices() { var svcToRemove []string for name, svc := range m.services { if svc.rotations > 3 { if svc.countSnapshot[0] == svc.countSnapshot[1] && svc.countSnapshot[1] == svc.countSnapshot[2] { svc.cancelFunc() svcToRemove = append(svcToRemove, name) } } } m.mapLock.Lock() defer m.mapLock.Unlock() for _, name := range svcToRemove { m.logger.Info("remove stale service " + name + ".") delete(m.services, name) } } type service struct { logger *zap.Logger name string cancelFunc context.CancelFunc rwLock sync.RWMutex primaryCMS *CountMinSketch primaryTopK *topKMetrics secondaryCMS *CountMinSketch secondaryTopK *topKMetrics totalCount int rotations int countSnapshot []int totalRollup int totalMetricSent int } func (s *service) InsertMetricDataToPrimary(md *MetricData) { s.primaryCMS.Insert(md) updatedFrequency := s.primaryCMS.Get(md) updatedMd := copyMetricDataWithUpdatedFrequency(md, updatedFrequency) s.primaryTopK.Push(md, updatedMd) } func (s *service) InsertMetricDataToSecondary(md *MetricData) { if s.secondaryCMS != nil { s.secondaryCMS.Insert(md) updatedFrequency := s.secondaryCMS.Get(md) updatedMd := copyMetricDataWithUpdatedFrequency(md, updatedFrequency) s.secondaryTopK.Push(md, updatedMd) } } // MetricData represents a key-value pair. type MetricData struct { hashKey string name string service string frequency int } func (m MetricData) HashKey() string { return m.hashKey } func (m MetricData) Frequency() int { return m.frequency } func newMetricData(serviceName, metricName string, labels map[string]string) *MetricData { hashID := sortAndConcatLabels(labels) return &MetricData{ hashKey: hashID, name: metricName, service: serviceName, frequency: 1, } } func copyMetricDataWithUpdatedFrequency(md *MetricData, frequency int) *MetricData { return &MetricData{ hashKey: md.hashKey, name: md.name, service: md.service, frequency: frequency, } } func sortAndConcatLabels(labels map[string]string) string { keys := make([]string, 0, len(labels)) for key := range labels { keys = append(keys, key) } sort.Strings(keys) var concatenatedLabels string for _, key := range keys { concatenatedLabels += labels[key] } keys = nil return concatenatedLabels } // topKMetrics represents the priority queue with a map for key lookup and a size limit. type topKMetrics struct { metricMap map[string]*MetricData minMetric *MetricData sizeLimit int } // newTopKMetrics creates a new topKMetrics with a specified size limit. func newTopKMetrics(sizeLimit int) *topKMetrics { return &topKMetrics{ metricMap: make(map[string]*MetricData), minMetric: nil, sizeLimit: sizeLimit, } } // Push adds a key-value pair to the priority queue. If the value already exists, it updates the frequency. func (t *topKMetrics) Push(oldMetric, newMetric *MetricData) { hashValue := oldMetric.hashKey if t.minMetric == nil { t.minMetric = oldMetric } _, found := t.metricMap[hashValue] if found { // Update the frequency. t.metricMap[hashValue].frequency = newMetric.frequency // Check if this oldMetric is the new minimum, find the new minMetric after the updates if t.minMetric.hashKey == hashValue { // Find the new minMetrics after update the frequency t.minMetric = t.findMinMetric() } return } // If exceeded size limit, delete the smallest if len(t.metricMap) >= t.sizeLimit { if newMetric.frequency > t.minMetric.frequency { delete(t.metricMap, t.minMetric.hashKey) t.metricMap[hashValue] = newMetric t.minMetric = t.findMinMetric() } } else { // Check if this newMetric is the new minimum. if newMetric.frequency < t.minMetric.frequency { t.minMetric = newMetric } t.metricMap[hashValue] = newMetric } } // findMinMetric removes and returns the key-value pair with the minimum value. func (t *topKMetrics) findMinMetric() *MetricData { // Find the new minimum metric and smallest frequency. var newMinMetric *MetricData smallestFrequency := int(^uint(0) >> 1) // Initialize with the maximum possible integer value for _, metric := range t.metricMap { if metric.frequency < smallestFrequency { smallestFrequency = metric.frequency newMinMetric = metric } } return newMinMetric } func (s *service) admitMetricData(metric *MetricData) bool { _, found := s.primaryTopK.metricMap[metric.hashKey] if len(s.primaryTopK.metricMap) < s.primaryTopK.sizeLimit || found { return true } return false } func (s *service) rollupMetricData(attributes pcommon.Map) { for _, indexAttr := range awsDeclaredMetricAttributes { if (indexAttr == common.CWMetricAttributeEnvironment) || (indexAttr == common.CWMetricAttributeLocalService) || (indexAttr == common.CWMetricAttributeRemoteService) { continue } if indexAttr == common.CWMetricAttributeLocalOperation { attributes.PutStr(indexAttr, UnprocessedServiceOperationValue) } else if indexAttr == common.CWMetricAttributeRemoteOperation { attributes.PutStr(indexAttr, UnprocessedRemoteServiceOperationValue) } else { attributes.PutStr(indexAttr, "-") } } } // As a starting point, you can use rules of thumb, such as setting the depth to be around 4-6 times the logarithm of the expected number of distinct items and the width based on your memory constraints. However, these are rough guidelines, and the optimal size will depend on your unique application and requirements. func newService(name string, limit int, rotationInterval time.Duration, parentCtx context.Context, logger *zap.Logger) *service { depth := defaultCMSDepth width := defaultCMSWidth ctx, cancel := context.WithCancel(parentCtx) svc := &service{ logger: logger, name: name, cancelFunc: cancel, primaryCMS: NewCountMinSketch(depth, width), primaryTopK: newTopKMetrics(limit), countSnapshot: make([]int, 3), } // Create a ticker to create a new countMinSketch every 1 hour rotationTicker := time.NewTicker(rotationInterval) //defer rotationTicker.Stop() // Create a goroutine to handle rotationTicker.C go func() { for { select { case <-rotationTicker.C: svc.logger.Info(fmt.Sprintf("[%s] rotating visit records, current rotation %d", name, svc.rotations)) if err := rotateVisitRecords(svc); err != nil { svc.logger.Error(fmt.Sprintf("[%s] failed to rotate visit records.", name), zap.Error(err)) } case <-ctx.Done(): return default: // Continue running the main program time.Sleep(1 * time.Second) } } }() svc.logger.Info(fmt.Sprintf("[%s] service entry is created.\n", name)) return svc } func rotateVisitRecords(svc *service) error { svc.rwLock.Lock() defer svc.rwLock.Unlock() cmsDepth := svc.primaryCMS.depth cmsWidth := svc.primaryCMS.width topKLimit := svc.primaryTopK.sizeLimit nextPrimaryCMS := svc.secondaryCMS nextPrimaryTopK := svc.secondaryTopK svc.secondaryCMS = NewCountMinSketch(cmsDepth, cmsWidth) svc.secondaryTopK = newTopKMetrics(topKLimit) if nextPrimaryCMS != nil && nextPrimaryTopK != nil { svc.primaryCMS = nextPrimaryCMS svc.primaryTopK = nextPrimaryTopK } else { svc.logger.Info(fmt.Sprintf("[%s] secondary visit records are nil.", svc.name)) } svc.countSnapshot[svc.rotations%3] = svc.totalCount svc.rotations++ return nil }