components/otelopscol/processor/casttosumprocessor/processor.go (58 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package casttosumprocessor import ( "context" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) // TODO - This processor shares a lot of similar intent with the MetricsAdjuster present in the // prometheus receiver. The relevant code should be merged together and made available in a way // where it is available to all receivers. // see: https://github.com/open-telemetry/opentelemetry-collector/blob/6e5beaf43b325e63ec6f1e864d9746a0d051cc35/receiver/prometheusreceiver/internal/metrics_adjuster.go#L187 type CastToSumProcessor struct { Metrics []string logger *zap.Logger } func newCastToSumProcessor(config *Config, logger *zap.Logger) *CastToSumProcessor { return &CastToSumProcessor{ Metrics: config.Metrics, logger: logger, } } // ProcessMetrics implements the MProcessor interface. func (ctsp *CastToSumProcessor) ProcessMetrics(_ context.Context, metrics pmetric.Metrics) (pmetric.Metrics, error) { for i := 0; i < metrics.ResourceMetrics().Len(); i++ { rms := metrics.ResourceMetrics().At(i) ctsp.transformMetrics(rms) } return metrics, nil } func (ctsp *CastToSumProcessor) transformMetrics(rms pmetric.ResourceMetrics) { ilms := rms.ScopeMetrics() for j := 0; j < ilms.Len(); j++ { ilm := ilms.At(j).Metrics() for k := 0; k < ilm.Len(); k++ { metric := ilm.At(k) ctsp.processMetric(rms.Resource(), metric) } } } func sliceContains(names []string, name string) bool { for _, n := range names { if name == n { return true } } return false } // processMetric processes a supported metric. func (ctsp *CastToSumProcessor) processMetric(_ pcommon.Resource, metric pmetric.Metric) { if !sliceContains(ctsp.Metrics, metric.Name()) { return } if metric.Type() == pmetric.MetricTypeGauge { newMetric := pmetric.NewMetric() metric.CopyTo(newMetric) newMetric.SetEmptySum() metric.Gauge().DataPoints().CopyTo(newMetric.Sum().DataPoints()) newMetric.CopyTo(metric) } else if metric.Type() != pmetric.MetricTypeSum { ctsp.logger.Info("Configured metric %s is neither gauge nor sum", zap.String("metric", metric.Name())) } metric.Sum().SetIsMonotonic(true) metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) }