connector/signaltometricsconnector/internal/model/model.go (205 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model" import ( "errors" "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/config" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" ) type AttributeKeyValue struct { Key string DefaultValue pcommon.Value } type MetricKey struct { Name string Description string } type ExplicitHistogram[K any] struct { Buckets []float64 Count *ottl.ValueExpression[K] Value *ottl.ValueExpression[K] } func (h *ExplicitHistogram[K]) fromConfig( mi *config.Histogram, parser ottl.Parser[K], ) error { if mi == nil { return nil } var err error h.Buckets = mi.Buckets if mi.Count != "" { h.Count, err = parser.ParseValueExpression(mi.Count) if err != nil { return fmt.Errorf("failed to parse count OTTL expression for explicit histogram: %w", err) } } h.Value, err = parser.ParseValueExpression(mi.Value) if err != nil { return fmt.Errorf("failed to parse value statement for explicit histogram: %w", err) } return nil } type ExponentialHistogram[K any] struct { MaxSize int32 Count *ottl.ValueExpression[K] Value *ottl.ValueExpression[K] } func (h *ExponentialHistogram[K]) fromConfig( mi *config.ExponentialHistogram, parser ottl.Parser[K], ) error { if mi == nil { return nil } var err error h.MaxSize = mi.MaxSize if mi.Count != "" { h.Count, err = parser.ParseValueExpression(mi.Count) if err != nil { return fmt.Errorf("failed to parse count OTTL expression for exponential histogram: %w", err) } } h.Value, err = parser.ParseValueExpression(mi.Value) if err != nil { return fmt.Errorf("failed to parse value OTTL expression for exponential histogram: %w", err) } return nil } type Sum[K any] struct { Value *ottl.ValueExpression[K] } func (s *Sum[K]) fromConfig( mi *config.Sum, parser ottl.Parser[K], ) error { if mi == nil { return nil } var err error s.Value, err = parser.ParseValueExpression(mi.Value) if err != nil { return fmt.Errorf("failed to parse value OTTL expression for sum: %w", err) } return nil } type MetricDef[K any] struct { Key MetricKey Unit string IncludeResourceAttributes []AttributeKeyValue Attributes []AttributeKeyValue Conditions *ottl.ConditionSequence[K] ExponentialHistogram *ExponentialHistogram[K] ExplicitHistogram *ExplicitHistogram[K] Sum *Sum[K] } func (md *MetricDef[K]) FromMetricInfo( mi config.MetricInfo, parser ottl.Parser[K], telemetrySettings component.TelemetrySettings, ) error { md.Key.Name = mi.Name md.Key.Description = mi.Description md.Unit = mi.Unit var err error md.IncludeResourceAttributes, err = parseAttributeConfigs(mi.IncludeResourceAttributes) if err != nil { return fmt.Errorf("failed to parse include resource attribute config: %w", err) } md.Attributes, err = parseAttributeConfigs(mi.Attributes) if err != nil { return fmt.Errorf("failed to parse attribute config: %w", err) } if len(mi.Conditions) > 0 { conditions, err := parser.ParseConditions(mi.Conditions) if err != nil { return fmt.Errorf("failed to parse OTTL conditions: %w", err) } condSeq := ottl.NewConditionSequence( conditions, telemetrySettings, ottl.WithLogicOperation[K](ottl.Or), ) md.Conditions = &condSeq } if mi.Histogram != nil { md.ExplicitHistogram = new(ExplicitHistogram[K]) if err := md.ExplicitHistogram.fromConfig(mi.Histogram, parser); err != nil { return fmt.Errorf("failed to parse histogram config: %w", err) } } if mi.ExponentialHistogram != nil { md.ExponentialHistogram = new(ExponentialHistogram[K]) if err := md.ExponentialHistogram.fromConfig(mi.ExponentialHistogram, parser); err != nil { return fmt.Errorf("failed to parse histogram config: %w", err) } } if mi.Sum != nil { md.Sum = new(Sum[K]) if err := md.Sum.fromConfig(mi.Sum, parser); err != nil { return fmt.Errorf("failed to parse sum config: %w", err) } } return nil } // FilterResourceAttributes filters resource attributes based on the // `IncludeResourceAttributes` list for the metric definition. Resource // attributes are only filtered if the list is specified, otherwise all the // resource attributes are used for creating the metrics from the metric // definition. func (md *MetricDef[K]) FilterResourceAttributes( attrs pcommon.Map, collectorInfo CollectorInstanceInfo, ) pcommon.Map { var filteredAttributes pcommon.Map switch { case len(md.IncludeResourceAttributes) == 0: filteredAttributes = pcommon.NewMap() filteredAttributes.EnsureCapacity(attrs.Len() + collectorInfo.Size()) attrs.CopyTo(filteredAttributes) default: expectedLen := len(md.IncludeResourceAttributes) + collectorInfo.Size() filteredAttributes = filterAttributes(attrs, md.IncludeResourceAttributes, expectedLen) } collectorInfo.Copy(filteredAttributes) return filteredAttributes } // FilterAttributes filters event attributes (datapoint, logrecord, spans) // based on the `Attributes` selected for the metric definition. If no // attributes are selected then an empty `pcommon.Map` is returned. Note // that, this filtering differs from resource attribute filtering as // in attribute filtering if any of the configured attributes is not present // in the data being processed then that metric definition is not processed. // The method returns a bool signaling if the filter was successful and metric // should be processed. If the bool value is false then the returned map // should not be used. func (md *MetricDef[K]) FilterAttributes(attrs pcommon.Map) (pcommon.Map, bool) { // Figure out if all the attributes are available, saves allocation for _, filter := range md.Attributes { if filter.DefaultValue.Type() != pcommon.ValueTypeEmpty { // will always add an attribute continue } if _, ok := attrs.Get(filter.Key); !ok { return pcommon.Map{}, false } } return filterAttributes(attrs, md.Attributes, len(md.Attributes)), true } func filterAttributes(attrs pcommon.Map, filters []AttributeKeyValue, expectedLen int) pcommon.Map { filteredAttrs := pcommon.NewMap() filteredAttrs.EnsureCapacity(expectedLen) for _, filter := range filters { if attr, ok := attrs.Get(filter.Key); ok { attr.CopyTo(filteredAttrs.PutEmpty(filter.Key)) continue } if filter.DefaultValue.Type() != pcommon.ValueTypeEmpty { filter.DefaultValue.CopyTo(filteredAttrs.PutEmpty(filter.Key)) } } return filteredAttrs } func parseAttributeConfigs(cfgs []config.Attribute) ([]AttributeKeyValue, error) { var errs []error kvs := make([]AttributeKeyValue, len(cfgs)) for i, attr := range cfgs { val := pcommon.NewValueEmpty() if err := val.FromRaw(attr.DefaultValue); err != nil { errs = append(errs, err) } kvs[i] = AttributeKeyValue{ Key: attr.Key, DefaultValue: val, } } if len(errs) > 0 { return nil, errors.Join(errs...) } return kvs, nil }