func()

in exporter/collector/internal/normalization/standard_normalizer.go [148:209]


func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier uint64) bool {
	start, hasStart := s.startCache.GetHistogramDataPoint(identifier)
	if !hasStart {
		if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) {
			// This is the first time we've seen this metric, or we received
			// an explicit reset point as described in
			// https://github.com/open-telemetry/opentelemetry-specification/blob/9555f9594c7ffe5dc333b53da5e0f880026cead1/specification/metrics/datamodel.md#resets-and-gaps
			// Record it in history and drop the point.
			s.startCache.SetHistogramDataPoint(identifier, point)
			s.previousCache.SetHistogramDataPoint(identifier, point)
			return false
		}
		// No normalization required, since we haven't cached anything, and the start TS is non-zero.
		return true
	}

	// The number of buckets changed, so we can't normalize points anymore.
	// Treat this as a reset.
	if !bucketBoundariesEqual(point.ExplicitBounds(), start.ExplicitBounds()) {
		s.startCache.SetHistogramDataPoint(identifier, point)
		s.previousCache.SetHistogramDataPoint(identifier, point)
		return false
	}

	previous, hasPrevious := s.previousCache.GetHistogramDataPoint(identifier)
	if !hasPrevious {
		// This should never happen, but fall-back to the start point if we
		// don't find a previous point
		previous = start
	}
	if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) ||
		(point.StartTimestamp() == 0 && lessThanHistogramDataPoint(point, previous)) {
		// This is a reset point, but we have seen this timeseries before, so we know the reset happened in the time period since the last point.
		// Assume the reset occurred at T - 1 ms, and leave the value untouched.
		point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond)))
		s.previousCache.SetHistogramDataPoint(identifier, point)
		// For subsequent points, we don't want to modify the value, but we do
		// want to make the start timestamp match the point we write here.
		// Store a point with the same timestamps, but zero value to achieve
		// that behavior.
		zeroPoint := pmetric.NewHistogramDataPoint()
		zeroPoint.SetTimestamp(point.StartTimestamp())
		point.ExplicitBounds().CopyTo(zeroPoint.ExplicitBounds())
		zeroPoint.BucketCounts().FromRaw(make([]uint64, point.BucketCounts().Len()))
		s.startCache.SetHistogramDataPoint(identifier, zeroPoint)
		return true
	}
	if !start.Timestamp().AsTime().Before(point.Timestamp().AsTime()) {
		// We found a cached start timestamp that wouldn't produce a valid point.
		// Drop it and log.
		s.log.Info(
			"data point being processed older than last recorded reset, will not be emitted",
			zap.String("lastRecordedReset", start.Timestamp().String()),
			zap.String("dataPoint", point.Timestamp().String()),
		)
		return false
	}
	// There was no reset, so normalize the point against the start point
	subtractHistogramDataPoint(point, start)
	s.previousCache.SetHistogramDataPoint(identifier, point)
	return true
}