in connector/signaltometricsconnector/connector.go [91:197]
func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error {
if len(sm.dpMetricDefs) == 0 {
return nil
}
processedMetrics := pmetric.NewMetrics()
processedMetrics.ResourceMetrics().EnsureCapacity(m.ResourceMetrics().Len())
aggregator := aggregator.NewAggregator[ottldatapoint.TransformContext](processedMetrics)
for i := 0; i < m.ResourceMetrics().Len(); i++ {
resourceMetric := m.ResourceMetrics().At(i)
resourceAttrs := resourceMetric.Resource().Attributes()
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetric := resourceMetric.ScopeMetrics().At(j)
for k := 0; k < scopeMetric.Metrics().Len(); k++ {
metrics := scopeMetric.Metrics()
metric := metrics.At(k)
for _, md := range sm.dpMetricDefs {
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo)
aggregate := func(dp any, dpAttrs pcommon.Map) error {
// The transform context is created from original attributes so that the
// OTTL expressions are also applied on the original attributes.
tCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scopeMetric.Scope(), resourceMetric.Resource(), scopeMetric, resourceMetric)
if md.Conditions != nil {
match, err := md.Conditions.Eval(ctx, tCtx)
if err != nil {
return fmt.Errorf("failed to evaluate conditions: %w", err)
}
if !match {
sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name))
return nil
}
}
return aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, dpAttrs, 1)
}
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeEmpty:
continue
}
}
}
}
}
aggregator.Finalize(sm.dpMetricDefs)
return sm.next.ConsumeMetrics(ctx, processedMetrics)
}