func()

in components/otelopscol/processor/normalizesumsprocessor/processor.go [106:159]


func (nsp *NormalizeSumsProcessor) processSumDataPoint(dp pmetric.NumberDataPoint, resource pcommon.Resource, metric pmetric.Metric, ndps pmetric.NumberDataPointSlice) {
	metricIdentifier := dataPointIdentifier(resource, metric, dp.Attributes())

	start := nsp.history[metricIdentifier]
	// If this is the first time we've observed this unique metric,
	// record it as the start point and do not report this data point
	if start == nil {
		newDP := pmetric.NewNumberDataPoint()
		dp.CopyTo(newDP)
		newDP2 := pmetric.NewNumberDataPoint()
		newDP.CopyTo(newDP2)

		newStart := startPoint{
			start: newDP,
			last:  newDP2,
		}
		nsp.history[metricIdentifier] = &newStart

		return
	}

	// If this data is older than the start point, we can't meaningfully report this point
	// TODO - consider resetting on two subsequent data points older than current start timestamp.
	// This could signify a permanent clock change.
	if dp.Timestamp() <= start.start.Timestamp() {
		nsp.logger.Info(
			"data point being processed older than last recorded reset, will not be emitted",
			zap.String("lastRecordedReset", start.start.Timestamp().String()),
			zap.String("dataPoint", dp.Timestamp().String()),
		)
		return
	}

	// If data has rolled over or the counter has been restarted for
	// any other reason, grab a new start point and do not report this data
	if (dp.ValueType() == pmetric.NumberDataPointValueTypeInt && dp.IntValue() < start.last.IntValue()) || (dp.ValueType() == pmetric.NumberDataPointValueTypeDouble && dp.DoubleValue() < start.last.DoubleValue()) {
		dp.CopyTo(start.start)
		dp.CopyTo(start.last)

		return
	}

	dp.CopyTo(start.last)

	newDP := ndps.AppendEmpty()
	dp.CopyTo(newDP)
	switch dp.ValueType() {
	case pmetric.NumberDataPointValueTypeInt:
		newDP.SetIntValue(dp.IntValue() - start.start.IntValue())
	case pmetric.NumberDataPointValueTypeDouble:
		newDP.SetDoubleValue(dp.DoubleValue() - start.start.DoubleValue())
	}
	newDP.SetStartTimestamp(start.start.Timestamp())
}