in components/otelopscol/processor/normalizesumsprocessor/processor.go [106:159]
func (nsp *NormalizeSumsProcessor) processSumDataPoint(dp pmetric.NumberDataPoint, resource pcommon.Resource, metric pmetric.Metric, ndps pmetric.NumberDataPointSlice) {
metricIdentifier := dataPointIdentifier(resource, metric, dp.Attributes())
start := nsp.history[metricIdentifier]
// If this is the first time we've observed this unique metric,
// record it as the start point and do not report this data point
if start == nil {
newDP := pmetric.NewNumberDataPoint()
dp.CopyTo(newDP)
newDP2 := pmetric.NewNumberDataPoint()
newDP.CopyTo(newDP2)
newStart := startPoint{
start: newDP,
last: newDP2,
}
nsp.history[metricIdentifier] = &newStart
return
}
// If this data is older than the start point, we can't meaningfully report this point
// TODO - consider resetting on two subsequent data points older than current start timestamp.
// This could signify a permanent clock change.
if dp.Timestamp() <= start.start.Timestamp() {
nsp.logger.Info(
"data point being processed older than last recorded reset, will not be emitted",
zap.String("lastRecordedReset", start.start.Timestamp().String()),
zap.String("dataPoint", dp.Timestamp().String()),
)
return
}
// If data has rolled over or the counter has been restarted for
// any other reason, grab a new start point and do not report this data
if (dp.ValueType() == pmetric.NumberDataPointValueTypeInt && dp.IntValue() < start.last.IntValue()) || (dp.ValueType() == pmetric.NumberDataPointValueTypeDouble && dp.DoubleValue() < start.last.DoubleValue()) {
dp.CopyTo(start.start)
dp.CopyTo(start.last)
return
}
dp.CopyTo(start.last)
newDP := ndps.AppendEmpty()
dp.CopyTo(newDP)
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
newDP.SetIntValue(dp.IntValue() - start.start.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
newDP.SetDoubleValue(dp.DoubleValue() - start.start.DoubleValue())
}
newDP.SetStartTimestamp(start.start.Timestamp())
}