in exporter/signalfxexporter/internal/translation/translator.go [402:546]
func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoints []*sfxpb.DataPoint) []*sfxpb.DataPoint {
processedDataPoints := sfxDataPoints
for _, tr := range mp.rules {
switch tr.Action {
case ActionRenameDimensionKeys:
for _, dp := range processedDataPoints {
if len(tr.MetricNames) > 0 && !tr.MetricNames[dp.Metric] {
continue
}
for _, d := range dp.Dimensions {
if newKey, ok := tr.Mapping[d.Key]; ok {
d.Key = newKey
}
}
}
case ActionRenameMetrics:
var additionalDimensions []*sfxpb.Dimension
if tr.AddDimensions != nil {
for k, v := range tr.AddDimensions {
additionalDimensions = append(additionalDimensions, &sfxpb.Dimension{Key: k, Value: v})
}
}
for _, dp := range processedDataPoints {
if newKey, ok := tr.Mapping[dp.Metric]; ok {
dp.Metric = newKey
if tr.CopyDimensions != nil {
for _, d := range dp.Dimensions {
if k, ok := tr.CopyDimensions[d.Key]; ok {
dp.Dimensions = append(dp.Dimensions, &sfxpb.Dimension{Key: k, Value: d.Value})
}
}
}
if len(additionalDimensions) > 0 {
dp.Dimensions = append(dp.Dimensions, additionalDimensions...)
}
}
}
case ActionMultiplyInt:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v *= multiplier
}
}
}
case ActionDivideInt:
for _, dp := range processedDataPoints {
if divisor, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v /= divisor
}
}
}
case ActionMultiplyFloat:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsFloat[dp.Metric]; ok {
v := dp.GetValue().DoubleValue
if v != nil {
*v *= multiplier
}
}
}
case ActionCopyMetrics:
for _, dp := range processedDataPoints {
if newMetric, ok := tr.Mapping[dp.Metric]; ok {
newDataPoint := copyMetric(tr, dp, newMetric)
if newDataPoint != nil {
processedDataPoints = append(processedDataPoints, newDataPoint)
}
}
}
case ActionSplitMetric:
for _, dp := range processedDataPoints {
if tr.MetricName == dp.Metric {
splitMetric(dp, tr.DimensionKey, tr.Mapping)
}
}
case ActionConvertValues:
for _, dp := range processedDataPoints {
if newType, ok := tr.TypesMapping[dp.Metric]; ok {
convertMetricValue(logger, dp, newType)
}
}
case ActionCalculateNewMetric:
pairs := calcNewMetricInputPairs(processedDataPoints, tr)
for _, pair := range pairs {
newPt := calculateNewMetric(logger, pair[0], pair[1], tr)
if newPt == nil {
continue
}
processedDataPoints = append(processedDataPoints, newPt)
}
case ActionAggregateMetric:
// NOTE: Based on the usage of TranslateDataPoints we can assume that the datapoints batch []*sfxpb.DataPoint
// represents only one metric and all the datapoints can be aggregated together.
var dpsToAggregate []*sfxpb.DataPoint
var otherDps []*sfxpb.DataPoint
for i, dp := range processedDataPoints {
if dp.Metric == tr.MetricName {
if dpsToAggregate == nil {
dpsToAggregate = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
dpsToAggregate = append(dpsToAggregate, dp)
} else {
if otherDps == nil {
otherDps = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
// This slice can contain additional datapoints from a different metric
// for example copied in a translation step before
otherDps = append(otherDps, dp)
}
}
aggregatedDps := aggregateDatapoints(dpsToAggregate, tr.WithoutDimensions, tr.AggregationMethod)
processedDataPoints = otherDps
processedDataPoints = append(processedDataPoints, aggregatedDps...)
case ActionDropMetrics:
resultSliceLen := 0
for i, dp := range processedDataPoints {
if match := tr.MetricNames[dp.Metric]; !match {
if resultSliceLen < i {
processedDataPoints[resultSliceLen] = dp
}
resultSliceLen++
}
}
processedDataPoints = processedDataPoints[:resultSliceLen]
case ActionDeltaMetric:
processedDataPoints = mp.deltaTranslator.translate(processedDataPoints, tr)
case ActionDropDimensions:
for _, dp := range processedDataPoints {
dropDimensions(dp, tr)
}
}
}
return processedDataPoints
}