processor/lsmintervalprocessor/internal/merger/datapoints.go (123 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package merger // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger" import ( "errors" "fmt" "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data" "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/identity" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" ) type dataPointSlice[DP dataPoint[DP]] interface { Len() int At(i int) DP AppendEmpty() DP } type dataPoint[Self any] interface { pmetric.NumberDataPoint | pmetric.SummaryDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint Timestamp() pcommon.Timestamp SetTimestamp(pcommon.Timestamp) Attributes() pcommon.Map CopyTo(dest Self) } func mergeDataPoints[DPS dataPointSlice[DP], DP dataPoint[DP]]( from DPS, toMetricID identity.Metric, toMetric pdataMetric, addDP func(identity.Metric, pdataMetric, DP) (DP, bool), temporality pmetric.AggregationTemporality, maxExponentialHistogramBuckets int, ) error { switch temporality { case pmetric.AggregationTemporalityCumulative: return mergeCumulative(from, toMetricID, toMetric, addDP) case pmetric.AggregationTemporalityDelta: return mergeDelta(from, toMetricID, toMetric, addDP, maxExponentialHistogramBuckets) default: return fmt.Errorf("unsupported aggregation temporality: %s", temporality) } } type addDPFunc[DP dataPoint[DP]] func(identity.Metric, pdataMetric, DP) (DP, bool) func mergeCumulative[DPS dataPointSlice[DP], DP dataPoint[DP]]( from DPS, toMetricID identity.Metric, toMetric pdataMetric, addDP addDPFunc[DP], ) error { for i := 0; i < from.Len(); i++ { fromDP := from.At(i) toDP, exists := addDP(toMetricID, toMetric, fromDP) if exists && fromDP.Timestamp() > toDP.Timestamp() { fromDP.CopyTo(toDP) } } return nil } func mergeDelta[DPS dataPointSlice[DP], DP dataPoint[DP]]( from DPS, toMetricID identity.Metric, toMetric pdataMetric, addDP addDPFunc[DP], maxExponentialHistogramBuckets int, ) error { var errs []error for i := 0; i < from.Len(); i++ { fromDP := from.At(i) var err error if toDP, exists := addDP(toMetricID, toMetric, fromDP); exists { switch fromDP := any(fromDP).(type) { case pmetric.NumberDataPoint: err = mergeDeltaSumDP(fromDP, any(toDP).(pmetric.NumberDataPoint)) case pmetric.HistogramDataPoint: err = mergeDeltaHistogramDP(fromDP, any(toDP).(pmetric.HistogramDataPoint)) case pmetric.ExponentialHistogramDataPoint: err = mergeDeltaExponentialHistogramDP( fromDP, any(toDP).(pmetric.ExponentialHistogramDataPoint), maxExponentialHistogramBuckets, ) } toDP.SetTimestamp(fromDP.Timestamp()) } if err != nil { errs = append(errs, err) } } if len(errs) > 0 { return fmt.Errorf("failed to merge delta datapoints: %w", errors.Join(errs...)) } return nil } func mergeDeltaSumDP(from, to pmetric.NumberDataPoint) error { if err := (data.Adder{}).Numbers(to, from); err != nil { return fmt.Errorf("failed to merge sum datapoint: %w", err) } return nil } func mergeDeltaHistogramDP(from, to pmetric.HistogramDataPoint) error { if from.Count() == 0 { return nil } if to.Count() == 0 { from.CopyTo(to) return nil } if err := (data.Adder{}).Histograms(to, from); err != nil { return fmt.Errorf("failed to merge histogram datapoint: %w", err) } return nil } func mergeDeltaExponentialHistogramDP( from, to pmetric.ExponentialHistogramDataPoint, maxBuckets int, ) error { if from.Count() == 0 { return nil } if to.Count() == 0 { from.CopyTo(to) return nil } if err := data.NewAdder(maxBuckets).Exponential(to, from); err != nil { return fmt.Errorf("failed to merge exponential histogram datapoint: %w", err) } return nil }