plugins/processors/awsapplicationsignals/processor.go (309 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package awsapplicationsignals import ( "context" "unicode" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "golang.org/x/text/cases" "golang.org/x/text/language" appsignalsconfig "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/cardinalitycontrol" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/metrichandlers" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/normalizer" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/resolver" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/rules" ) const ( failedToProcessAttribute = "failed to process attributes" failedToProcessAttributeWithLimiter = "failed to process attributes with limiter, keep the data" ) var metricCaser = cases.Title(language.English) // this is used to Process some attributes (like IP addresses) to a generic form to reduce high cardinality type attributesMutator interface { Process(attributes, resourceAttributes pcommon.Map, isTrace bool) error } type allowListMutator interface { ShouldBeDropped(attributes pcommon.Map) (bool, error) } type stopper interface { Stop(context.Context) error } type awsapplicationsignalsprocessor struct { logger *zap.Logger config *appsignalsconfig.Config replaceActions *rules.ReplaceActions allowlistMutators []allowListMutator metricMutators []attributesMutator traceMutators []attributesMutator limiter cardinalitycontrol.Limiter aggregationMutator metrichandlers.AggregationMutator stoppers []stopper } func (ap *awsapplicationsignalsprocessor) StartMetrics(ctx context.Context, _ component.Host) error { attributesResolver := resolver.NewAttributesResolver(ap.config.Resolvers, ap.logger) ap.stoppers = []stopper{attributesResolver} attributesNormalizer := normalizer.NewAttributesNormalizer(ap.logger) ap.metricMutators = []attributesMutator{attributesResolver, attributesNormalizer} limiterConfig := ap.config.Limiter if limiterConfig == nil { limiterConfig = appsignalsconfig.NewDefaultLimiterConfig() } if limiterConfig.ParentContext == nil { limiterConfig.ParentContext = ctx } if !limiterConfig.Disabled { ap.limiter = cardinalitycontrol.NewMetricsLimiter(limiterConfig, ap.logger) } else { ap.logger.Info("metrics limiter is disabled.") } ap.replaceActions = rules.NewReplacer(ap.config.Rules, !limiterConfig.Disabled) pruner := metrichandlers.NewPruner() keeper := rules.NewKeeper(ap.config.Rules, !limiterConfig.Disabled) dropper := rules.NewDropper(ap.config.Rules) ap.allowlistMutators = []allowListMutator{pruner, keeper, dropper} ap.aggregationMutator = metrichandlers.NewAggregationMutator() return nil } func (ap *awsapplicationsignalsprocessor) StartTraces(_ context.Context, _ component.Host) error { attributesResolver := resolver.NewAttributesResolver(ap.config.Resolvers, ap.logger) attributesNormalizer := normalizer.NewAttributesNormalizer(ap.logger) customReplacer := rules.NewReplacer(ap.config.Rules, false) ap.stoppers = append(ap.stoppers, attributesResolver) ap.traceMutators = append(ap.traceMutators, attributesResolver, attributesNormalizer, customReplacer) return nil } func (ap *awsapplicationsignalsprocessor) Shutdown(ctx context.Context) error { for _, stopper := range ap.stoppers { err := stopper.Stop(ctx) if err != nil { ap.logger.Error("failed to stop", zap.Error(err)) } } return nil } func (ap *awsapplicationsignalsprocessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { rss := td.ResourceSpans() for i := 0; i < rss.Len(); i++ { rs := rss.At(i) ilss := rs.ScopeSpans() resourceAttributes := rs.Resource().Attributes() for j := 0; j < ilss.Len(); j++ { ils := ilss.At(j) spans := ils.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) for _, Mutator := range ap.traceMutators { err := Mutator.Process(span.Attributes(), resourceAttributes, true) if err != nil { ap.logger.Debug("failed to Process span", zap.Error(err)) } } } } } return td, nil } func (ap *awsapplicationsignalsprocessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { rms := md.ResourceMetrics() for i := 0; i < rms.Len(); i++ { rs := rms.At(i) ilms := rs.ScopeMetrics() resourceAttributes := rs.Resource().Attributes() for j := 0; j < ilms.Len(); j++ { ils := ilms.At(j) metrics := ils.Metrics() for k := 0; k < metrics.Len(); k++ { m := metrics.At(k) // Check if the first letter of the metric name is not capitalized if len(m.Name()) > 0 && !unicode.IsUpper(rune(m.Name()[0])) { m.SetName(metricCaser.String(m.Name())) // Ensure metric name is in sentence case } ap.processMetricAttributes(ctx, m, resourceAttributes) ap.aggregationMutator.ProcessMetrics(ctx, m, resourceAttributes) } } } return md, nil } // Attributes are provided for each log and trace, but not at the metric level // Need to process attributes for every data point within a metric. func (ap *awsapplicationsignalsprocessor) processMetricAttributes(_ context.Context, m pmetric.Metric, resourceAttribes pcommon.Map) { // This is a lot of repeated code, but since there is no single parent superclass // between metric data types, we can't use polymorphism. switch m.Type() { case pmetric.MetricTypeGauge: dps := m.Gauge().DataPoints() for i := 0; i < dps.Len(); i++ { for _, mutator := range ap.metricMutators { err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } } dps.RemoveIf(func(d pmetric.NumberDataPoint) bool { for _, mutator := range ap.allowlistMutators { shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes()) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } if shouldBeDropped { return true } } return false }) for i := 0; i < dps.Len(); i++ { err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } if ap.limiter != nil { for i := 0; i < dps.Len(); i++ { if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil { ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err)) } } } case pmetric.MetricTypeSum: dps := m.Sum().DataPoints() for i := 0; i < dps.Len(); i++ { for _, mutator := range ap.metricMutators { err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } } dps.RemoveIf(func(d pmetric.NumberDataPoint) bool { for _, mutator := range ap.allowlistMutators { shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes()) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } if shouldBeDropped { return true } } return false }) for i := 0; i < dps.Len(); i++ { err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } if ap.limiter != nil { for i := 0; i < dps.Len(); i++ { if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil { ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err)) } } } case pmetric.MetricTypeHistogram: dps := m.Histogram().DataPoints() for i := 0; i < dps.Len(); i++ { for _, mutator := range ap.metricMutators { err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } } dps.RemoveIf(func(d pmetric.HistogramDataPoint) bool { for _, mutator := range ap.allowlistMutators { shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes()) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } if shouldBeDropped { return true } } return false }) for i := 0; i < dps.Len(); i++ { err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } if ap.limiter != nil { for i := 0; i < dps.Len(); i++ { if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil { ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err)) } } } case pmetric.MetricTypeExponentialHistogram: dps := m.ExponentialHistogram().DataPoints() for i := 0; i < dps.Len(); i++ { for _, mutator := range ap.metricMutators { err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } } dps.RemoveIf(func(d pmetric.ExponentialHistogramDataPoint) bool { for _, mutator := range ap.allowlistMutators { shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes()) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } if shouldBeDropped { return true } } return false }) for i := 0; i < dps.Len(); i++ { err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } if ap.limiter != nil { for i := 0; i < dps.Len(); i++ { if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil { ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err)) } } } case pmetric.MetricTypeSummary: dps := m.Summary().DataPoints() for i := 0; i < dps.Len(); i++ { for _, mutator := range ap.metricMutators { err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } } dps.RemoveIf(func(d pmetric.SummaryDataPoint) bool { for _, mutator := range ap.allowlistMutators { shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes()) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } if shouldBeDropped { return true } } return false }) for i := 0; i < dps.Len(); i++ { err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false) if err != nil { ap.logger.Debug(failedToProcessAttribute, zap.Error(err)) } } if ap.limiter != nil { for i := 0; i < dps.Len(); i++ { if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil { ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err)) } } } default: ap.logger.Debug("Ignore unknown metric type", zap.String("type", m.Type().String())) } }