in plugins/processors/awsapplicationsignals/processor.go [158:340]
func (ap *awsapplicationsignalsprocessor) processMetricAttributes(_ context.Context, m pmetric.Metric, resourceAttribes pcommon.Map) {
// This is a lot of repeated code, but since there is no single parent superclass
// between metric data types, we can't use polymorphism.
switch m.Type() {
case pmetric.MetricTypeGauge:
dps := m.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
for _, mutator := range ap.metricMutators {
err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
}
dps.RemoveIf(func(d pmetric.NumberDataPoint) bool {
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
}
}
return false
})
for i := 0; i < dps.Len(); i++ {
err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
if ap.limiter != nil {
for i := 0; i < dps.Len(); i++ {
if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil {
ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err))
}
}
}
case pmetric.MetricTypeSum:
dps := m.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
for _, mutator := range ap.metricMutators {
err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
}
dps.RemoveIf(func(d pmetric.NumberDataPoint) bool {
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
}
}
return false
})
for i := 0; i < dps.Len(); i++ {
err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
if ap.limiter != nil {
for i := 0; i < dps.Len(); i++ {
if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil {
ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err))
}
}
}
case pmetric.MetricTypeHistogram:
dps := m.Histogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
for _, mutator := range ap.metricMutators {
err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
}
dps.RemoveIf(func(d pmetric.HistogramDataPoint) bool {
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
}
}
return false
})
for i := 0; i < dps.Len(); i++ {
err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
if ap.limiter != nil {
for i := 0; i < dps.Len(); i++ {
if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil {
ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err))
}
}
}
case pmetric.MetricTypeExponentialHistogram:
dps := m.ExponentialHistogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
for _, mutator := range ap.metricMutators {
err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
}
dps.RemoveIf(func(d pmetric.ExponentialHistogramDataPoint) bool {
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
}
}
return false
})
for i := 0; i < dps.Len(); i++ {
err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
if ap.limiter != nil {
for i := 0; i < dps.Len(); i++ {
if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil {
ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err))
}
}
}
case pmetric.MetricTypeSummary:
dps := m.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
for _, mutator := range ap.metricMutators {
err := mutator.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
}
dps.RemoveIf(func(d pmetric.SummaryDataPoint) bool {
for _, mutator := range ap.allowlistMutators {
shouldBeDropped, err := mutator.ShouldBeDropped(d.Attributes())
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
if shouldBeDropped {
return true
}
}
return false
})
for i := 0; i < dps.Len(); i++ {
err := ap.replaceActions.Process(dps.At(i).Attributes(), resourceAttribes, false)
if err != nil {
ap.logger.Debug(failedToProcessAttribute, zap.Error(err))
}
}
if ap.limiter != nil {
for i := 0; i < dps.Len(); i++ {
if _, err := ap.limiter.Admit(m.Name(), dps.At(i).Attributes(), resourceAttribes); err != nil {
ap.logger.Debug(failedToProcessAttributeWithLimiter, zap.Error(err))
}
}
}
default:
ap.logger.Debug("Ignore unknown metric type", zap.String("type", m.Type().String()))
}
}