connector/signaltometricsconnector/connector.go (216 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package signaltometricsconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector" import ( "context" "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" ) type signalToMetrics struct { next consumer.Metrics collectorInstanceInfo model.CollectorInstanceInfo logger *zap.Logger spanMetricDefs []model.MetricDef[ottlspan.TransformContext] dpMetricDefs []model.MetricDef[ottldatapoint.TransformContext] logMetricDefs []model.MetricDef[ottllog.TransformContext] component.StartFunc component.ShutdownFunc } func (sm *signalToMetrics) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { if len(sm.spanMetricDefs) == 0 { return nil } processedMetrics := pmetric.NewMetrics() processedMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len()) aggregator := aggregator.NewAggregator[ottlspan.TransformContext](processedMetrics) for i := 0; i < td.ResourceSpans().Len(); i++ { resourceSpan := td.ResourceSpans().At(i) resourceAttrs := resourceSpan.Resource().Attributes() for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { scopeSpan := resourceSpan.ScopeSpans().At(j) for k := 0; k < scopeSpan.Spans().Len(); k++ { span := scopeSpan.Spans().At(k) spanAttrs := span.Attributes() for _, md := range sm.spanMetricDefs { filteredSpanAttrs, ok := md.FilterAttributes(spanAttrs) if !ok { continue } // The transform context is created from original attributes so that the // OTTL expressions are also applied on the original attributes. tCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan) if md.Conditions != nil { match, err := md.Conditions.Eval(ctx, tCtx) if err != nil { return fmt.Errorf("failed to evaluate conditions: %w", err) } if !match { sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) continue } } filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo) if err := aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredSpanAttrs, 1); err != nil { return err } } } } } aggregator.Finalize(sm.spanMetricDefs) return sm.next.ConsumeMetrics(ctx, processedMetrics) } func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error { if len(sm.dpMetricDefs) == 0 { return nil } processedMetrics := pmetric.NewMetrics() processedMetrics.ResourceMetrics().EnsureCapacity(m.ResourceMetrics().Len()) aggregator := aggregator.NewAggregator[ottldatapoint.TransformContext](processedMetrics) for i := 0; i < m.ResourceMetrics().Len(); i++ { resourceMetric := m.ResourceMetrics().At(i) resourceAttrs := resourceMetric.Resource().Attributes() for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ { scopeMetric := resourceMetric.ScopeMetrics().At(j) for k := 0; k < scopeMetric.Metrics().Len(); k++ { metrics := scopeMetric.Metrics() metric := metrics.At(k) for _, md := range sm.dpMetricDefs { filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo) aggregate := func(dp any, dpAttrs pcommon.Map) error { // The transform context is created from original attributes so that the // OTTL expressions are also applied on the original attributes. tCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scopeMetric.Scope(), resourceMetric.Resource(), scopeMetric, resourceMetric) if md.Conditions != nil { match, err := md.Conditions.Eval(ctx, tCtx) if err != nil { return fmt.Errorf("failed to evaluate conditions: %w", err) } if !match { sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) return nil } } return aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, dpAttrs, 1) } //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeGauge: dps := metric.Gauge().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) if !ok { continue } if err := aggregate(dp, filteredDPAttrs); err != nil { return err } } case pmetric.MetricTypeSum: dps := metric.Sum().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) if !ok { continue } if err := aggregate(dp, filteredDPAttrs); err != nil { return err } } case pmetric.MetricTypeSummary: dps := metric.Summary().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) if !ok { continue } if err := aggregate(dp, filteredDPAttrs); err != nil { return err } } case pmetric.MetricTypeHistogram: dps := metric.Histogram().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) if !ok { continue } if err := aggregate(dp, filteredDPAttrs); err != nil { return err } } case pmetric.MetricTypeExponentialHistogram: dps := metric.ExponentialHistogram().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) if !ok { continue } if err := aggregate(dp, filteredDPAttrs); err != nil { return err } } case pmetric.MetricTypeEmpty: continue } } } } } aggregator.Finalize(sm.dpMetricDefs) return sm.next.ConsumeMetrics(ctx, processedMetrics) } func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) error { if len(sm.logMetricDefs) == 0 { return nil } processedMetrics := pmetric.NewMetrics() processedMetrics.ResourceMetrics().EnsureCapacity(logs.ResourceLogs().Len()) aggregator := aggregator.NewAggregator[ottllog.TransformContext](processedMetrics) for i := 0; i < logs.ResourceLogs().Len(); i++ { resourceLog := logs.ResourceLogs().At(i) resourceAttrs := resourceLog.Resource().Attributes() for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { scopeLog := resourceLog.ScopeLogs().At(j) for k := 0; k < scopeLog.LogRecords().Len(); k++ { log := scopeLog.LogRecords().At(k) logAttrs := log.Attributes() for _, md := range sm.logMetricDefs { filteredLogAttrs, ok := md.FilterAttributes(logAttrs) if !ok { continue } // The transform context is created from original attributes so that the // OTTL expressions are also applied on the original attributes. tCtx := ottllog.NewTransformContext(log, scopeLog.Scope(), resourceLog.Resource(), scopeLog, resourceLog) if md.Conditions != nil { match, err := md.Conditions.Eval(ctx, tCtx) if err != nil { return fmt.Errorf("failed to evaluate conditions: %w", err) } if !match { sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) continue } } filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo) if err := aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredLogAttrs, 1); err != nil { return err } } } } } aggregator.Finalize(sm.logMetricDefs) return sm.next.ConsumeMetrics(ctx, processedMetrics) }