in exporter/collector/internal/normalization/standard_normalizer.go [148:209]
func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier uint64) bool {
start, hasStart := s.startCache.GetHistogramDataPoint(identifier)
if !hasStart {
if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) {
// This is the first time we've seen this metric, or we received
// an explicit reset point as described in
// https://github.com/open-telemetry/opentelemetry-specification/blob/9555f9594c7ffe5dc333b53da5e0f880026cead1/specification/metrics/datamodel.md#resets-and-gaps
// Record it in history and drop the point.
s.startCache.SetHistogramDataPoint(identifier, point)
s.previousCache.SetHistogramDataPoint(identifier, point)
return false
}
// No normalization required, since we haven't cached anything, and the start TS is non-zero.
return true
}
// The number of buckets changed, so we can't normalize points anymore.
// Treat this as a reset.
if !bucketBoundariesEqual(point.ExplicitBounds(), start.ExplicitBounds()) {
s.startCache.SetHistogramDataPoint(identifier, point)
s.previousCache.SetHistogramDataPoint(identifier, point)
return false
}
previous, hasPrevious := s.previousCache.GetHistogramDataPoint(identifier)
if !hasPrevious {
// This should never happen, but fall-back to the start point if we
// don't find a previous point
previous = start
}
if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) ||
(point.StartTimestamp() == 0 && lessThanHistogramDataPoint(point, previous)) {
// This is a reset point, but we have seen this timeseries before, so we know the reset happened in the time period since the last point.
// Assume the reset occurred at T - 1 ms, and leave the value untouched.
point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond)))
s.previousCache.SetHistogramDataPoint(identifier, point)
// For subsequent points, we don't want to modify the value, but we do
// want to make the start timestamp match the point we write here.
// Store a point with the same timestamps, but zero value to achieve
// that behavior.
zeroPoint := pmetric.NewHistogramDataPoint()
zeroPoint.SetTimestamp(point.StartTimestamp())
point.ExplicitBounds().CopyTo(zeroPoint.ExplicitBounds())
zeroPoint.BucketCounts().FromRaw(make([]uint64, point.BucketCounts().Len()))
s.startCache.SetHistogramDataPoint(identifier, zeroPoint)
return true
}
if !start.Timestamp().AsTime().Before(point.Timestamp().AsTime()) {
// We found a cached start timestamp that wouldn't produce a valid point.
// Drop it and log.
s.log.Info(
"data point being processed older than last recorded reset, will not be emitted",
zap.String("lastRecordedReset", start.Timestamp().String()),
zap.String("dataPoint", point.Timestamp().String()),
)
return false
}
// There was no reset, so normalize the point against the start point
subtractHistogramDataPoint(point, start)
s.previousCache.SetHistogramDataPoint(identifier, point)
return true
}