in pkg/export/transform.go [386:495]
func (b *sampleBuilder) buildDistribution(
metric string,
_ labels.Labels,
samples []record.RefSample,
exemplars map[storage.SeriesRef]record.RefExemplar,
externalLabels labels.Labels,
metadata MetadataFunc,
) (*distribution_pb.Distribution, int64, []record.RefSample, error) {
// The Prometheus/OpenMetrics exposition format does not require all histogram series for a single distribution
// to be grouped together. But it does require that all series for a histogram metric in general are grouped
// together and that buckets for a single histogram are specified in order.
// Thus, we build a cache and conclude a histogram complete once we've seen it's _sum series and its +Inf bucket
// series. We return for the first histogram where this condition is fulfilled.
consumed := 0
Loop:
for _, s := range samples {
e, ok := b.series.get(s, externalLabels, metadata)
if !ok {
consumed++
prometheusSamplesDiscarded.WithLabelValues("no-cache-series-found").Inc()
discardExemplarIncIfExists(storage.SeriesRef(s.Ref), exemplars, "no-cache-series-found")
continue
}
name := e.lset.Get(labels.MetricName)
// Abort if the series is not for the intended histogram metric. All series for it must be grouped
// together so we can rely on no further relevant series are in the batch.
if !isHistogramSeries(metric, name) {
break
}
consumed++
// Create or update the cached distribution for the given histogram series
dist, ok := b.dists[e.protos.cumulative.hash]
if !ok {
dist = getDistribution()
dist.timestamp = s.T
b.dists[e.protos.cumulative.hash] = dist
}
// If there are diverging timestamps within a single batch, the histogram is not valid.
if s.T != dist.timestamp {
dist.skip = true
prometheusSamplesDiscarded.WithLabelValues("mismatching-histogram-timestamps").Inc()
discardExemplarIncIfExists(storage.SeriesRef(s.Ref), exemplars, "mismatching-histogram-timestamps")
continue
}
rt, v, ok := b.series.getResetAdjusted(storage.SeriesRef(s.Ref), s.T, s.V)
// If a series appeared for the first time, we won't get a valid reset timestamp yet.
// This may happen if the histogram is entirely new or if new series appeared through bucket changes.
// We skip the entire distribution sample in this case.
if !ok {
dist.skip = true
continue
}
// All series can in principle have a NaN value (staleness NaNs already filtered).
// We permit this for sum and count as we handle it explicitly when building the distribution.
// For buckets there's not sensible way to handle it however and we discard those bucket samples.
switch metricSuffix(name[len(metric):]) {
case metricSuffixSum:
dist.hasSum, dist.sum = true, v
case metricSuffixCount:
dist.hasCount, dist.count = true, v
// We take the count series as the authoritative source for the overall reset timestamp.
dist.resetTimestamp = rt
case metricSuffixBucket:
bound, err := strconv.ParseFloat(e.lset.Get(labels.BucketLabel), 64)
if err != nil {
prometheusSamplesDiscarded.WithLabelValues("malformed-bucket-le-label").Inc()
discardExemplarIncIfExists(storage.SeriesRef(s.Ref), exemplars, "malformed-bucket-le-label")
continue
}
if math.IsNaN(v) {
prometheusSamplesDiscarded.WithLabelValues("NaN-bucket-value").Inc()
discardExemplarIncIfExists(storage.SeriesRef(s.Ref), exemplars, "NaN-bucket-value")
continue
}
// Handle cases where +Inf bucket is out-of-order by not overwriting on the last-consumed bucket.
if !dist.hasInfBucket {
dist.hasInfBucket = math.IsInf(bound, 1)
}
dist.bounds = append(dist.bounds, bound)
dist.values = append(dist.values, int64(v))
if exemplar, ok := exemplars[storage.SeriesRef(s.Ref)]; ok {
dist.exemplars = append(dist.exemplars, exemplar)
}
default:
break Loop
}
if !dist.complete() {
continue
}
dp, err := dist.build(e.lset)
if err != nil {
return nil, 0, samples[consumed:], err
}
return dp, dist.resetTimestamp, samples[consumed:], nil
}
if consumed == 0 {
prometheusSamplesDiscarded.WithLabelValues("zero-histogram-samples-processed").Inc()
discardExemplarIncIfExists(storage.SeriesRef(samples[0].Ref), exemplars, "zero-histogram-samples-processed")
return nil, 0, samples[1:], errors.New("no sample consumed for histogram")
}
// Batch ended without completing a further distribution
return nil, 0, samples[consumed:], nil
}