connector/signaltometricsconnector/internal/aggregator/aggregator.go (270 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator" import ( "context" "fmt" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" ) // Aggregator provides a single interface to update all metrics // datastructures. The required datastructure is selected using // the metric definition. type Aggregator[K any] struct { result pmetric.Metrics // smLookup maps resourceID against scope metrics since the aggregator // always produces a single scope. smLookup map[[16]byte]pmetric.ScopeMetrics valueCounts map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP sums map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP timestamp time.Time } // NewAggregator creates a new instance of aggregator. func NewAggregator[K any](metrics pmetric.Metrics) *Aggregator[K] { return &Aggregator[K]{ result: metrics, smLookup: make(map[[16]byte]pmetric.ScopeMetrics), valueCounts: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP), sums: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP), timestamp: time.Now(), } } func (a *Aggregator[K]) Aggregate( ctx context.Context, tCtx K, md model.MetricDef[K], resAttrs, srcAttrs pcommon.Map, defaultCount int64, ) error { switch { case md.ExponentialHistogram != nil: val, count, err := getValueCount( ctx, tCtx, md.ExponentialHistogram.Value, md.ExponentialHistogram.Count, defaultCount, ) if err != nil { return err } return a.aggregateValueCount(md, resAttrs, srcAttrs, val, count) case md.ExplicitHistogram != nil: val, count, err := getValueCount( ctx, tCtx, md.ExplicitHistogram.Value, md.ExplicitHistogram.Count, defaultCount, ) if err != nil { return err } return a.aggregateValueCount(md, resAttrs, srcAttrs, val, count) case md.Sum != nil: raw, err := md.Sum.Value.Eval(ctx, tCtx) if err != nil { return fmt.Errorf("failed to execute OTTL value for sum: %w", err) } switch v := raw.(type) { case int64: return a.aggregateInt(md, resAttrs, srcAttrs, v) case float64: return a.aggregateDouble(md, resAttrs, srcAttrs, v) default: return fmt.Errorf( "failed to parse sum OTTL value of type %T into int64 or float64: %v", v, v, ) } } return nil } // Finalize finalizes the aggregations performed by the aggregator so far into // the pmetric.Metrics used to create this instance of the aggregator. Finalize // should be called once per aggregator instance and the aggregator instance // should not be used after Finalize is called. func (a *Aggregator[K]) Finalize(mds []model.MetricDef[K]) { for _, md := range mds { for resID, dpMap := range a.valueCounts[md.Key] { metrics := a.smLookup[resID].Metrics() var ( destExpHist pmetric.ExponentialHistogram destExplicitHist pmetric.Histogram ) switch { case md.ExponentialHistogram != nil: destMetric := metrics.AppendEmpty() destMetric.SetName(md.Key.Name) destMetric.SetDescription(md.Key.Description) destMetric.SetUnit(md.Unit) destExpHist = destMetric.SetEmptyExponentialHistogram() destExpHist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) destExpHist.DataPoints().EnsureCapacity(len(dpMap)) case md.ExplicitHistogram != nil: destMetric := metrics.AppendEmpty() destMetric.SetName(md.Key.Name) destMetric.SetDescription(md.Key.Description) destMetric.SetUnit(md.Unit) destExplicitHist = destMetric.SetEmptyHistogram() destExplicitHist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) destExplicitHist.DataPoints().EnsureCapacity(len(dpMap)) } for _, dp := range dpMap { dp.Copy( a.timestamp, destExpHist, destExplicitHist, ) } } for resID, dpMap := range a.sums[md.Key] { if md.Sum == nil { continue } metrics := a.smLookup[resID].Metrics() destMetric := metrics.AppendEmpty() destMetric.SetName(md.Key.Name) destMetric.SetDescription(md.Key.Description) destMetric.SetUnit(md.Unit) destCounter := destMetric.SetEmptySum() destCounter.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) destCounter.DataPoints().EnsureCapacity(len(dpMap)) for _, dp := range dpMap { dp.Copy(a.timestamp, destCounter.DataPoints().AppendEmpty()) } } // If there are two metric defined with the same key required by metricKey // then they will be aggregated within the same metric and produced // together. Deleting the key ensures this while preventing duplicates. delete(a.valueCounts, md.Key) delete(a.sums, md.Key) } } func (a *Aggregator[K]) aggregateInt( md model.MetricDef[K], resAttrs, srcAttrs pcommon.Map, v int64, ) error { resID := a.getResourceID(resAttrs) attrID := pdatautil.MapHash(srcAttrs) if _, ok := a.sums[md.Key]; !ok { a.sums[md.Key] = make(map[[16]byte]map[[16]byte]*sumDP) } if _, ok := a.sums[md.Key][resID]; !ok { a.sums[md.Key][resID] = make(map[[16]byte]*sumDP) } if _, ok := a.sums[md.Key][resID][attrID]; !ok { a.sums[md.Key][resID][attrID] = newSumDP(srcAttrs, false) } a.sums[md.Key][resID][attrID].AggregateInt(v) return nil } func (a *Aggregator[K]) aggregateDouble( md model.MetricDef[K], resAttrs, srcAttrs pcommon.Map, v float64, ) error { resID := a.getResourceID(resAttrs) attrID := pdatautil.MapHash(srcAttrs) if _, ok := a.sums[md.Key]; !ok { a.sums[md.Key] = make(map[[16]byte]map[[16]byte]*sumDP) } if _, ok := a.sums[md.Key][resID]; !ok { a.sums[md.Key][resID] = make(map[[16]byte]*sumDP) } if _, ok := a.sums[md.Key][resID][attrID]; !ok { a.sums[md.Key][resID][attrID] = newSumDP(srcAttrs, true) } a.sums[md.Key][resID][attrID].AggregateDouble(v) return nil } func (a *Aggregator[K]) aggregateValueCount( md model.MetricDef[K], resAttrs, srcAttrs pcommon.Map, value float64, count int64, ) error { if count == 0 { // Nothing to record as count is zero return nil } resID := a.getResourceID(resAttrs) attrID := pdatautil.MapHash(srcAttrs) if _, ok := a.valueCounts[md.Key]; !ok { a.valueCounts[md.Key] = make(map[[16]byte]map[[16]byte]*valueCountDP) } if _, ok := a.valueCounts[md.Key][resID]; !ok { a.valueCounts[md.Key][resID] = make(map[[16]byte]*valueCountDP) } if _, ok := a.valueCounts[md.Key][resID][attrID]; !ok { a.valueCounts[md.Key][resID][attrID] = newValueCountDP(md, srcAttrs) } a.valueCounts[md.Key][resID][attrID].Aggregate(value, count) return nil } func (a *Aggregator[K]) getResourceID(resourceAttrs pcommon.Map) [16]byte { resID := pdatautil.MapHash(resourceAttrs) if _, ok := a.smLookup[resID]; !ok { destResourceMetric := a.result.ResourceMetrics().AppendEmpty() destResAttrs := destResourceMetric.Resource().Attributes() destResAttrs.EnsureCapacity(resourceAttrs.Len() + 1) resourceAttrs.CopyTo(destResAttrs) destScopeMetric := destResourceMetric.ScopeMetrics().AppendEmpty() destScopeMetric.Scope().SetName(metadata.ScopeName) a.smLookup[resID] = destScopeMetric } return resID } // getValueCount evaluates OTTL to get count and value respectively. Count is // optional and defaults to the default count if the OTTL statement for count // is missing. Value is required and returns an error if OTTL statement for // value is missing. func getValueCount[K any]( ctx context.Context, tCtx K, valueExpr, countExpr *ottl.ValueExpression[K], defaultCount int64, ) (float64, int64, error) { val, err := getDoubleFromOTTL(ctx, tCtx, valueExpr) if err != nil { return 0, 0, fmt.Errorf("failed to get value from OTTL: %w", err) } count := defaultCount if countExpr != nil { count, err = getIntFromOTTL(ctx, tCtx, countExpr) if err != nil { return 0, 0, fmt.Errorf("failed to get count from OTTL: %w", err) } } return val, count, nil } func getIntFromOTTL[K any]( ctx context.Context, tCtx K, s *ottl.ValueExpression[K], ) (int64, error) { if s == nil { return 0, nil } raw, err := s.Eval(ctx, tCtx) if err != nil { return 0, err } switch v := raw.(type) { case int64: return v, nil case float64: return int64(v), nil default: return 0, fmt.Errorf( "failed to parse int OTTL value, expression returned value of type %T: %v", v, v, ) } } func getDoubleFromOTTL[K any]( ctx context.Context, tCtx K, s *ottl.ValueExpression[K], ) (float64, error) { if s == nil { return 0, nil } raw, err := s.Eval(ctx, tCtx) if err != nil { return 0, err } switch v := raw.(type) { case float64: return v, nil case int64: return float64(v), nil default: return 0, fmt.Errorf( "failed to parse double OTTL value, expression returned value of type %T: %v", v, v, ) } }