plugins/processors/kueueattributes/processor.go (87 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package kueueattributes import ( "context" "strings" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" "github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon" ) const ( kueueMetricsIdentifier = "kueue" ) var kueueLabelFilter = map[string]interface{}{ containerinsightscommon.ClusterNameKey: nil, containerinsightscommon.ClusterQueueNameKey: nil, containerinsightscommon.ClusterQueueStatusKey: nil, containerinsightscommon.ClusterQueueReasonKey: nil, containerinsightscommon.ClusterQueueResourceKey: nil, containerinsightscommon.Flavor: nil, containerinsightscommon.NodeNameKey: nil, } type kueueAttributesProcessor struct { *Config logger *zap.Logger labelFilter map[string]interface{} } func newKueueAttributesProcessor(config *Config, logger *zap.Logger) *kueueAttributesProcessor { d := &kueueAttributesProcessor{ Config: config, logger: logger, labelFilter: kueueLabelFilter, } return d } func (d *kueueAttributesProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { rms := md.ResourceMetrics() for i := 0; i < rms.Len(); i++ { rm := rms.At(i) sms := rm.ScopeMetrics() for j := 0; j < sms.Len(); j++ { metrics := sms.At(j).Metrics() for k := 0; k < metrics.Len(); k++ { m := metrics.At(k) d.processMetricAttributes(m) } } d.dropResourceMetricAttributes(rm) } return md, nil } func (d *kueueAttributesProcessor) processMetricAttributes(m pmetric.Metric) { // only decorate kueue metrics if !strings.HasPrefix(m.Name(), kueueMetricsIdentifier) { return } var dps pmetric.NumberDataPointSlice switch m.Type() { case pmetric.MetricTypeGauge: dps = m.Gauge().DataPoints() case pmetric.MetricTypeSum: dps = m.Sum().DataPoints() default: d.logger.Debug("Ignore unknown metric type", zap.String(containerinsightscommon.MetricType, m.Type().String())) } for i := 0; i < dps.Len(); i++ { d.filterAttributes(dps.At(i).Attributes()) } } func (d *kueueAttributesProcessor) filterAttributes(attributes pcommon.Map) { labels := d.labelFilter if len(labels) == 0 { return } // remove labels that are not in the keep list attributes.RemoveIf(func(k string, _ pcommon.Value) bool { if _, ok := labels[k]; ok { return false } return true }) } func (d *kueueAttributesProcessor) dropResourceMetricAttributes(resourceMetric pmetric.ResourceMetrics) { serviceNameKey := "service.name" attributes := resourceMetric.Resource().Attributes() serviceName, exists := attributes.Get(serviceNameKey) if exists && (serviceName.Str() == "containerInsightsKueueMetricsScraper") { resourceMetric.Resource().Attributes().Clear() } }