connector/sumconnector/connector.go (160 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package sumconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/sumconnector" import ( "context" "errors" "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" ) // sum can sum attribute values from spans, span event, metrics, data points, or log records // and emit the sums onto a metrics pipeline. type sum struct { metricsConsumer consumer.Metrics component.StartFunc component.ShutdownFunc spansMetricDefs map[string]metricDef[ottlspan.TransformContext] spanEventsMetricDefs map[string]metricDef[ottlspanevent.TransformContext] metricsMetricDefs map[string]metricDef[ottlmetric.TransformContext] dataPointsMetricDefs map[string]metricDef[ottldatapoint.TransformContext] logsMetricDefs map[string]metricDef[ottllog.TransformContext] } func (c *sum) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } func (c *sum) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var multiError error sumMetrics := pmetric.NewMetrics() sumMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len()) for i := 0; i < td.ResourceSpans().Len(); i++ { resourceSpan := td.ResourceSpans().At(i) spansSummer := newSummer[ottlspan.TransformContext](c.spansMetricDefs) spanEventsSummer := newSummer[ottlspanevent.TransformContext](c.spanEventsMetricDefs) for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { scopeSpan := resourceSpan.ScopeSpans().At(j) for k := 0; k < scopeSpan.Spans().Len(); k++ { span := scopeSpan.Spans().At(k) sCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan) multiError = errors.Join(multiError, spansSummer.update(ctx, span.Attributes(), sCtx)) for l := 0; l < span.Events().Len(); l++ { event := span.Events().At(l) eCtx := ottlspanevent.NewTransformContext(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan) multiError = errors.Join(multiError, spanEventsSummer.update(ctx, event.Attributes(), eCtx)) } } } if len(spansSummer.sums)+len(spanEventsSummer.sums) == 0 { continue // don't add an empty resource } sumResource := sumMetrics.ResourceMetrics().AppendEmpty() resourceSpan.Resource().Attributes().CopyTo(sumResource.Resource().Attributes()) sumResource.ScopeMetrics().EnsureCapacity(resourceSpan.ScopeSpans().Len()) sumScope := sumResource.ScopeMetrics().AppendEmpty() spansSummer.appendMetricsTo(sumScope.Metrics()) spanEventsSummer.appendMetricsTo(sumScope.Metrics()) } if multiError != nil { return multiError } return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics) } func (c *sum) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { var multiError error sumMetrics := pmetric.NewMetrics() sumMetrics.ResourceMetrics().EnsureCapacity(md.ResourceMetrics().Len()) for i := 0; i < md.ResourceMetrics().Len(); i++ { resourceMetric := md.ResourceMetrics().At(i) metricsSummer := newSummer[ottlmetric.TransformContext](c.metricsMetricDefs) dataPointsSummer := newSummer[ottldatapoint.TransformContext](c.dataPointsMetricDefs) for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ { scopeMetrics := resourceMetric.ScopeMetrics().At(j) for k := 0; k < scopeMetrics.Metrics().Len(); k++ { metric := scopeMetrics.Metrics().At(k) mCtx := ottlmetric.NewTransformContext(metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) multiError = errors.Join(multiError, metricsSummer.update(ctx, pcommon.NewMap(), mCtx)) //exhaustive:enforce // For metric types each must be handled in exactly the same way // Switch case required because each type calls DataPoints() differently switch metric.Type() { case pmetric.MetricTypeGauge: dps := metric.Gauge().DataPoints() for i := 0; i < dps.Len(); i++ { dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx)) } case pmetric.MetricTypeSum: dps := metric.Sum().DataPoints() for i := 0; i < dps.Len(); i++ { dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx)) } case pmetric.MetricTypeSummary: dps := metric.Summary().DataPoints() for i := 0; i < dps.Len(); i++ { dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx)) } case pmetric.MetricTypeHistogram: dps := metric.Histogram().DataPoints() for i := 0; i < dps.Len(); i++ { dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx)) } case pmetric.MetricTypeExponentialHistogram: dps := metric.ExponentialHistogram().DataPoints() for i := 0; i < dps.Len(); i++ { dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx)) } case pmetric.MetricTypeEmpty: multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type())) } } } if len(metricsSummer.sums)+len(dataPointsSummer.sums) == 0 { continue // don't add an empty resource } sumResource := sumMetrics.ResourceMetrics().AppendEmpty() resourceMetric.Resource().Attributes().CopyTo(sumResource.Resource().Attributes()) sumResource.ScopeMetrics().EnsureCapacity(resourceMetric.ScopeMetrics().Len()) sumScope := sumResource.ScopeMetrics().AppendEmpty() metricsSummer.appendMetricsTo(sumScope.Metrics()) dataPointsSummer.appendMetricsTo(sumScope.Metrics()) } if multiError != nil { return multiError } return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics) } func (c *sum) ConsumeLogs(ctx context.Context, ld plog.Logs) error { var multiError error sumMetrics := pmetric.NewMetrics() sumMetrics.ResourceMetrics().EnsureCapacity(ld.ResourceLogs().Len()) for i := 0; i < ld.ResourceLogs().Len(); i++ { resourceLog := ld.ResourceLogs().At(i) summer := newSummer[ottllog.TransformContext](c.logsMetricDefs) for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { scopeLogs := resourceLog.ScopeLogs().At(j) for k := 0; k < scopeLogs.LogRecords().Len(); k++ { logRecord := scopeLogs.LogRecords().At(k) lCtx := ottllog.NewTransformContext(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog) multiError = errors.Join(multiError, summer.update(ctx, logRecord.Attributes(), lCtx)) } } if len(summer.sums) == 0 { continue // don't add an empty resource } sumResource := sumMetrics.ResourceMetrics().AppendEmpty() resourceLog.Resource().Attributes().CopyTo(sumResource.Resource().Attributes()) sumResource.ScopeMetrics().EnsureCapacity(resourceLog.ScopeLogs().Len()) sumScope := sumResource.ScopeMetrics().AppendEmpty() summer.appendMetricsTo(sumScope.Metrics()) } if multiError != nil { return multiError } return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics) }