receiver/adapter/accumulator/accumulator.go (161 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package accumulator import ( "context" "fmt" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/models" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" "go.uber.org/zap" "github.com/aws/amazon-cloudwatch-agent/internal/util" ) // OtelAccumulator implements the telegraf.Accumulator interface, but works as an OTel plugin by passing the metrics // onward to the next consumer type OtelAccumulator interface { // Accumulator Interface https://github.com/influxdata/telegraf/blob/381dc2272390cd9de1ce2b047a953f8337b55647/accumulator.go telegraf.Accumulator // GetOtelMetrics return the final OTEL metric that were gathered by scrape controller for each plugin GetOtelMetrics() pmetric.Metrics } /* otelAccumulator struct @input Telegraf input plugin @logger Zap Logger @precision Round the timestamp during collection @metrics Otel Metrics which stacks multiple metrics through AddCounter, AddGauge, etc before resetting */ type otelAccumulator struct { input *models.RunningInput isServiceInput bool ctx context.Context consumer consumer.Metrics logger *zap.Logger precision time.Duration metrics pmetric.Metrics mutex sync.Mutex } func NewAccumulator(input *models.RunningInput, ctx context.Context, consumer consumer.Metrics, logger *zap.Logger) OtelAccumulator { _, isServiceInput := input.Input.(telegraf.ServiceInput) return &otelAccumulator{ input: input, isServiceInput: isServiceInput, ctx: ctx, consumer: consumer, logger: logger, precision: time.Nanosecond, metrics: pmetric.NewMetrics(), } } func (o *otelAccumulator) AddGauge(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { o.addMetric(measurement, tags, fields, telegraf.Gauge, t...) } func (o *otelAccumulator) AddCounter(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { o.addMetric(measurement, tags, fields, telegraf.Counter, t...) } // AddSummary is only being used by OpenTelemetry and Prometheus. https://github.com/influxdata/telegraf/search?q=AddSummary // However, we already have a Prometheus Receiver which uses AddFields so there is actually no use case for AddSummary. func (o *otelAccumulator) AddSummary(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { o.logger.Error("CloudWatchAgent's adapter does not support Telegraf Summary.") } func (o *otelAccumulator) AddHistogram(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { o.addMetric(measurement, tags, fields, telegraf.Histogram, t...) } func (o *otelAccumulator) AddFields(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { o.addMetric(measurement, tags, fields, telegraf.Untyped, t...) } func (o *otelAccumulator) AddMetric(m telegraf.Metric) { m.SetTime(m.Time().Round(o.precision)) o.convertToOtelMetricsAndAddMetric(m) } func (o *otelAccumulator) SetPrecision(precision time.Duration) { o.precision = precision } func (o *otelAccumulator) AddError(err error) { if err == nil { return } o.logger.Error("Error with adapter", zap.Error(err)) } // addMetric implements from addFields https://github.com/influxdata/telegraf/blob/381dc2272390cd9de1ce2b047a953f8337b55647/agent/accumulator.go#L86-L97 // which will filter the subset metrics and modify metadata on the metrics (e.g name) func (o *otelAccumulator) addMetric( measurement string, tags map[string]string, fields map[string]interface{}, metricType telegraf.ValueType, t ...time.Time, ) { m := metric.New(measurement, tags, fields, o.getTime(t), metricType) o.convertToOtelMetricsAndAddMetric(m) } // convertToOtelMetricsAndAddMetric converts Telegraf's Metric model to OTEL Stream Model // and add the OTEl Metric to channel func (o *otelAccumulator) convertToOtelMetricsAndAddMetric(m telegraf.Metric) { mMetric, err := o.modifyMetricAndConvertToOtelValue(m) if err != nil { o.logger.Warn( "Conversion of metric values failed", zap.String("name", m.Name()), zap.Any("tags", m.Tags()), zap.Any("fields", m.Fields()), zap.Any("type", m.Type()), zap.Error(err), ) } if mMetric == nil { return } oMetric, err := ConvertTelegrafToOtelMetrics(mMetric.Name(), mMetric.Fields(), mMetric.Tags(), mMetric.Type(), mMetric.Time()) if err != nil { o.logger.Warn("Convert to Otel Metric failed", zap.Any("name", oMetric), zap.Any("tags", mMetric.Tags()), zap.Any("fields", mMetric.Fields()), zap.Any("type", mMetric.Type()), zap.Error(err)) return } // Gather and Start can add metrics concurrently. Therefore, a mutex ensures thread-safe access to the resource metrics o.mutex.Lock() defer o.mutex.Unlock() if o.isServiceInput { err := o.consumer.ConsumeMetrics(o.ctx, oMetric) if err != nil { o.AddError(err) } } else { oMetric.ResourceMetrics().MoveAndAppendTo(o.metrics.ResourceMetrics()) } } // GetOtelMetrics return the final OTEL metric that were gathered by scrape controller for each plugin func (o *otelAccumulator) GetOtelMetrics() pmetric.Metrics { finalMetrics := o.metrics o.metrics = pmetric.NewMetrics() return finalMetrics } // modifyMetricAndConvertToOtelValue modifies metric by filtering metrics, add prefix for each field in metrics, etc // and convert to value supported by OTEL (int64 and float64). // Distributions are not modified yet. func (o *otelAccumulator) modifyMetricAndConvertToOtelValue(m telegraf.Metric) (telegraf.Metric, error) { if len(m.Fields()) == 0 { return nil, nil } // MakeMetric modifies metrics (e.g filter metrics, add prefix for measurement) by customer config // https://github.com/influxdata/telegraf/blob/5479df2eb5e8401773d604a83590d789a158c735/models/running_input.go#L91-L114 mMetric := o.input.MakeMetric(m) if mMetric == nil { return nil, nil } if m.Type() == telegraf.Histogram { return mMetric, nil } // Otel only supports numeric data. Therefore, filter unsupported data type and convert metrics value to corresponding value before // converting the data model // https://github.com/open-telemetry/opentelemetry-collector/blob/bdc3e22d28006b6c9496568bd8d8bcf0aa1e4950/pdata/pmetric/metrics.go#L106-L113 var errs error for field, value := range mMetric.Fields() { // Convert all int,uint to int64 and float to float64 and bool to int. otelValue, err := util.ToOtelValue(value) if err != nil { errs = multierr.Append(errs, fmt.Errorf("field (%q): %w", field, err)) } if otelValue == nil { mMetric.RemoveField(field) } else if value != otelValue { mMetric.AddField(field, otelValue) } } if len(mMetric.Fields()) == 0 { return nil, fmt.Errorf("empty metrics after converting fields: %w", errs) } return mMetric, nil } // Adapted from https://github.com/influxdata/telegraf/blob/b526945c64a56450b836656a6a2002b8bf748b78/agent/accumulator.go#L112 func (o *otelAccumulator) getTime(t []time.Time) time.Time { var timestamp time.Time if len(t) > 0 { timestamp = t[0] } else { timestamp = time.Now() } return timestamp.Round(o.precision) } // TrackingAccumulator is an Accumulator that provides a signal when the // metric has been fully processed. It drives to solve these two issues // * https://github.com/influxdata/telegraf/issues/2905 // * https://github.com/influxdata/telegraf/issues/2919 // However, it will panic if the delivered message is reach to a certain threshold // https://github.com/aws/telegraf/blob/066eb60aa48d74bf63dcd4e10b8f13db12b43c3b/agent/accumulator.go#L155-L159against // which against CWA's goal (independent between input and output, etc) // and can be solved by using OTEL Exporter persistent queue // https://github.com/open-telemetry/opentelemetry-collector/tree/eebe590a465702b9f6b2a257ba3ab9735dd10152/exporter/exporterhelper#persistent-queue func (o *otelAccumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator { o.logger.Error("CloudWatchAgent's adapter does not support tracking metrics.") return nil }