func()

in connector/signaltometricsconnector/connector.go [91:197]


func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error {
	if len(sm.dpMetricDefs) == 0 {
		return nil
	}

	processedMetrics := pmetric.NewMetrics()
	processedMetrics.ResourceMetrics().EnsureCapacity(m.ResourceMetrics().Len())
	aggregator := aggregator.NewAggregator[ottldatapoint.TransformContext](processedMetrics)
	for i := 0; i < m.ResourceMetrics().Len(); i++ {
		resourceMetric := m.ResourceMetrics().At(i)
		resourceAttrs := resourceMetric.Resource().Attributes()
		for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
			scopeMetric := resourceMetric.ScopeMetrics().At(j)
			for k := 0; k < scopeMetric.Metrics().Len(); k++ {
				metrics := scopeMetric.Metrics()
				metric := metrics.At(k)
				for _, md := range sm.dpMetricDefs {
					filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo)
					aggregate := func(dp any, dpAttrs pcommon.Map) error {
						// The transform context is created from original attributes so that the
						// OTTL expressions are also applied on the original attributes.
						tCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scopeMetric.Scope(), resourceMetric.Resource(), scopeMetric, resourceMetric)
						if md.Conditions != nil {
							match, err := md.Conditions.Eval(ctx, tCtx)
							if err != nil {
								return fmt.Errorf("failed to evaluate conditions: %w", err)
							}
							if !match {
								sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name))
								return nil
							}
						}
						return aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, dpAttrs, 1)
					}

					//exhaustive:enforce
					switch metric.Type() {
					case pmetric.MetricTypeGauge:
						dps := metric.Gauge().DataPoints()
						for l := 0; l < dps.Len(); l++ {
							dp := dps.At(l)
							filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
							if !ok {
								continue
							}
							if err := aggregate(dp, filteredDPAttrs); err != nil {
								return err
							}
						}
					case pmetric.MetricTypeSum:
						dps := metric.Sum().DataPoints()
						for l := 0; l < dps.Len(); l++ {
							dp := dps.At(l)
							filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
							if !ok {
								continue
							}
							if err := aggregate(dp, filteredDPAttrs); err != nil {
								return err
							}
						}
					case pmetric.MetricTypeSummary:
						dps := metric.Summary().DataPoints()
						for l := 0; l < dps.Len(); l++ {
							dp := dps.At(l)
							filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
							if !ok {
								continue
							}
							if err := aggregate(dp, filteredDPAttrs); err != nil {
								return err
							}
						}
					case pmetric.MetricTypeHistogram:
						dps := metric.Histogram().DataPoints()
						for l := 0; l < dps.Len(); l++ {
							dp := dps.At(l)
							filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
							if !ok {
								continue
							}
							if err := aggregate(dp, filteredDPAttrs); err != nil {
								return err
							}
						}
					case pmetric.MetricTypeExponentialHistogram:
						dps := metric.ExponentialHistogram().DataPoints()
						for l := 0; l < dps.Len(); l++ {
							dp := dps.At(l)
							filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
							if !ok {
								continue
							}
							if err := aggregate(dp, filteredDPAttrs); err != nil {
								return err
							}
						}
					case pmetric.MetricTypeEmpty:
						continue
					}
				}
			}
		}
	}
	aggregator.Finalize(sm.dpMetricDefs)
	return sm.next.ConsumeMetrics(ctx, processedMetrics)
}