func()

in exporter/collector/internal/normalization/standard_normalizer.go [247:297]


func (s *standardNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier uint64) bool {
	start, hasStart := s.startCache.GetNumberDataPoint(identifier)
	if !hasStart {
		if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) {
			// This is the first time we've seen this metric, or we received
			// an explicit reset point as described in
			// https://github.com/open-telemetry/opentelemetry-specification/blob/9555f9594c7ffe5dc333b53da5e0f880026cead1/specification/metrics/datamodel.md#resets-and-gaps
			// Record it in history and drop the point.
			s.startCache.SetNumberDataPoint(identifier, point)
			s.previousCache.SetNumberDataPoint(identifier, point)
			return false
		}
		// No normalization required, since we haven't cached anything, and the start TS is non-zer0.
		return true
	}

	previous, hasPrevious := s.previousCache.GetNumberDataPoint(identifier)
	if !hasPrevious {
		// This should never happen, but fall-back to the start point if we
		// don't find a previous point
		previous = start
	}
	if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) || (point.StartTimestamp() == 0 && lessThanNumberDataPoint(point, previous)) {
		// This is a reset point, but we have seen this timeseries before, so we know the reset happened in the time period since the last point.
		// Assume the reset occurred at T - 1 ms, and leave the value untouched.
		point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond)))
		s.previousCache.SetNumberDataPoint(identifier, point)
		// For subsequent points, we don't want to modify the value, but we do
		// want to make the start timestamp match the point we write here.
		// Store a point with the same timestamps, but zero value to achieve
		// that behavior.
		zeroPoint := pmetric.NewNumberDataPoint()
		zeroPoint.SetTimestamp(point.StartTimestamp())
		s.startCache.SetNumberDataPoint(identifier, zeroPoint)
		return true
	}
	if !start.Timestamp().AsTime().Before(point.Timestamp().AsTime()) {
		// We found a cached start timestamp that wouldn't produce a valid point.
		// Drop it and log.
		s.log.Info(
			"data point being processed older than last recorded reset, will not be emitted",
			zap.String("lastRecordedReset", start.Timestamp().String()),
			zap.String("dataPoint", point.Timestamp().String()),
		)
		return false
	}
	// There was no reset, so normalize the point against the start point
	subtractNumberDataPoint(point, start)
	s.previousCache.SetNumberDataPoint(identifier, point)
	return true
}