func()

in exporter/signalfxexporter/internal/translation/translator.go [402:546]


func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoints []*sfxpb.DataPoint) []*sfxpb.DataPoint {
	processedDataPoints := sfxDataPoints

	for _, tr := range mp.rules {
		switch tr.Action {
		case ActionRenameDimensionKeys:
			for _, dp := range processedDataPoints {
				if len(tr.MetricNames) > 0 && !tr.MetricNames[dp.Metric] {
					continue
				}
				for _, d := range dp.Dimensions {
					if newKey, ok := tr.Mapping[d.Key]; ok {
						d.Key = newKey
					}
				}
			}
		case ActionRenameMetrics:
			var additionalDimensions []*sfxpb.Dimension
			if tr.AddDimensions != nil {
				for k, v := range tr.AddDimensions {
					additionalDimensions = append(additionalDimensions, &sfxpb.Dimension{Key: k, Value: v})
				}
			}

			for _, dp := range processedDataPoints {
				if newKey, ok := tr.Mapping[dp.Metric]; ok {
					dp.Metric = newKey
					if tr.CopyDimensions != nil {
						for _, d := range dp.Dimensions {
							if k, ok := tr.CopyDimensions[d.Key]; ok {
								dp.Dimensions = append(dp.Dimensions, &sfxpb.Dimension{Key: k, Value: d.Value})
							}
						}
					}
					if len(additionalDimensions) > 0 {
						dp.Dimensions = append(dp.Dimensions, additionalDimensions...)
					}
				}
			}
		case ActionMultiplyInt:
			for _, dp := range processedDataPoints {
				if multiplier, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
					v := dp.GetValue().IntValue
					if v != nil {
						*v *= multiplier
					}
				}
			}
		case ActionDivideInt:
			for _, dp := range processedDataPoints {
				if divisor, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
					v := dp.GetValue().IntValue
					if v != nil {
						*v /= divisor
					}
				}
			}
		case ActionMultiplyFloat:
			for _, dp := range processedDataPoints {
				if multiplier, ok := tr.ScaleFactorsFloat[dp.Metric]; ok {
					v := dp.GetValue().DoubleValue
					if v != nil {
						*v *= multiplier
					}
				}
			}
		case ActionCopyMetrics:
			for _, dp := range processedDataPoints {
				if newMetric, ok := tr.Mapping[dp.Metric]; ok {
					newDataPoint := copyMetric(tr, dp, newMetric)
					if newDataPoint != nil {
						processedDataPoints = append(processedDataPoints, newDataPoint)
					}
				}
			}
		case ActionSplitMetric:
			for _, dp := range processedDataPoints {
				if tr.MetricName == dp.Metric {
					splitMetric(dp, tr.DimensionKey, tr.Mapping)
				}
			}
		case ActionConvertValues:
			for _, dp := range processedDataPoints {
				if newType, ok := tr.TypesMapping[dp.Metric]; ok {
					convertMetricValue(logger, dp, newType)
				}
			}
		case ActionCalculateNewMetric:
			pairs := calcNewMetricInputPairs(processedDataPoints, tr)
			for _, pair := range pairs {
				newPt := calculateNewMetric(logger, pair[0], pair[1], tr)
				if newPt == nil {
					continue
				}
				processedDataPoints = append(processedDataPoints, newPt)
			}

		case ActionAggregateMetric:
			// NOTE: Based on the usage of TranslateDataPoints we can assume that the datapoints batch []*sfxpb.DataPoint
			// represents only one metric and all the datapoints can be aggregated together.
			var dpsToAggregate []*sfxpb.DataPoint
			var otherDps []*sfxpb.DataPoint
			for i, dp := range processedDataPoints {
				if dp.Metric == tr.MetricName {
					if dpsToAggregate == nil {
						dpsToAggregate = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
					}
					dpsToAggregate = append(dpsToAggregate, dp)
				} else {
					if otherDps == nil {
						otherDps = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
					}
					// This slice can contain additional datapoints from a different metric
					// for example copied in a translation step before
					otherDps = append(otherDps, dp)
				}
			}
			aggregatedDps := aggregateDatapoints(dpsToAggregate, tr.WithoutDimensions, tr.AggregationMethod)
			processedDataPoints = otherDps
			processedDataPoints = append(processedDataPoints, aggregatedDps...)

		case ActionDropMetrics:
			resultSliceLen := 0
			for i, dp := range processedDataPoints {
				if match := tr.MetricNames[dp.Metric]; !match {
					if resultSliceLen < i {
						processedDataPoints[resultSliceLen] = dp
					}
					resultSliceLen++
				}
			}
			processedDataPoints = processedDataPoints[:resultSliceLen]

		case ActionDeltaMetric:
			processedDataPoints = mp.deltaTranslator.translate(processedDataPoints, tr)

		case ActionDropDimensions:
			for _, dp := range processedDataPoints {
				dropDimensions(dp, tr)
			}
		}
	}

	return processedDataPoints
}