connector/sumconnector/sum.go (126 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package sumconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/sumconnector" import ( "context" "errors" "fmt" "strconv" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" ) var noAttributes = [16]byte{} func newSummer[K any](metricDefs map[string]metricDef[K]) *summer[K] { return &summer[K]{ metricDefs: metricDefs, sums: make(map[string]map[[16]byte]*attrSummer, len(metricDefs)), timestamp: time.Now(), } } type summer[K any] struct { metricDefs map[string]metricDef[K] sums map[string]map[[16]byte]*attrSummer timestamp time.Time } type attrSummer struct { attrs pcommon.Map sum float64 } func (c *summer[K]) update(ctx context.Context, attrs pcommon.Map, tCtx K) error { var multiError error for name, md := range c.metricDefs { sourceAttribute := md.sourceAttr sumAttrs := pcommon.NewMap() var sumVal float64 // Get source attribute value if sourceAttrVal, ok := attrs.Get(sourceAttribute); ok { switch { case sourceAttrVal.Str() != "": sumVal, _ = strconv.ParseFloat(sourceAttrVal.Str(), 64) case sourceAttrVal.Double() != 0: sumVal = sourceAttrVal.Double() case sourceAttrVal.Int() != 0: sumVal = float64(sourceAttrVal.Int()) } } // Get attribute values to include otherwise use default value for _, attr := range md.attrs { if attrVal, ok := attrs.Get(attr.Key); ok { switch { case attrVal.Str() != "": sumAttrs.PutStr(attr.Key, attrVal.Str()) case attrVal.Double() != 0: sumAttrs.PutStr(attr.Key, fmt.Sprintf("%v", attrVal.Double())) case attrVal.Int() != 0: sumAttrs.PutStr(attr.Key, fmt.Sprintf("%v", attrVal.Int())) } } else if attr.DefaultValue != nil { switch v := attr.DefaultValue.(type) { case string: if v != "" { sumAttrs.PutStr(attr.Key, v) } case int: if v != 0 { sumAttrs.PutInt(attr.Key, int64(v)) } case float64: if v != 0 { sumAttrs.PutDouble(attr.Key, float64(v)) } } } } // Missing necessary attributes if sumAttrs.Len() != len(md.attrs) { continue } // Perform condition matching or not if md.condition == nil { multiError = errors.Join(multiError, c.increment(name, sumVal, sumAttrs)) continue } if match, err := md.condition.Eval(ctx, tCtx); err != nil { multiError = errors.Join(multiError, err) } else if match { multiError = errors.Join(multiError, c.increment(name, sumVal, sumAttrs)) } } return multiError } func (c *summer[K]) increment(metricName string, sumVal float64, attrs pcommon.Map) error { if _, ok := c.sums[metricName]; !ok { c.sums[metricName] = make(map[[16]byte]*attrSummer) } key := noAttributes if attrs.Len() > 0 { key = pdatautil.MapHash(attrs) } if _, ok := c.sums[metricName][key]; !ok { c.sums[metricName][key] = &attrSummer{attrs: attrs} } for strings := range c.sums[metricName][key].attrs.AsRaw() { if _, ok := c.sums[metricName][key].attrs.Get(strings); ok { c.sums[metricName][key].sum += sumVal } } if attrs.Len() == 0 { c.sums[metricName][key].sum += sumVal } return nil } func (c *summer[K]) appendMetricsTo(metricSlice pmetric.MetricSlice) { for name, md := range c.metricDefs { if len(c.sums[name]) == 0 { continue } sumMetric := metricSlice.AppendEmpty() sumMetric.SetName(name) sumMetric.SetDescription(md.desc) sum := sumMetric.SetEmptySum() // The delta value is always positive, so a value accumulated downstream is monotonic sum.SetIsMonotonic(true) sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) for _, dpSum := range c.sums[name] { dp := sum.DataPoints().AppendEmpty() dpSum.attrs.CopyTo(dp.Attributes()) dp.SetDoubleValue(dpSum.sum) dp.SetTimestamp(pcommon.NewTimestampFromTime(c.timestamp)) } } }