func()

in pkg/export/transform.go [386:495]


func (b *sampleBuilder) buildDistribution(
	metric string,
	_ labels.Labels,
	samples []record.RefSample,
	exemplars map[storage.SeriesRef]record.RefExemplar,
	externalLabels labels.Labels,
	metadata MetadataFunc,
) (*distribution_pb.Distribution, int64, []record.RefSample, error) {
	// The Prometheus/OpenMetrics exposition format does not require all histogram series for a single distribution
	// to be grouped together. But it does require that all series for a histogram metric in general are grouped
	// together and that buckets for a single histogram are specified in order.
	// Thus, we build a cache and conclude a histogram complete once we've seen it's _sum series and its +Inf bucket
	// series. We return for the first histogram where this condition is fulfilled.
	consumed := 0
Loop:
	for _, s := range samples {
		e, ok := b.series.get(s, externalLabels, metadata)
		if !ok {
			consumed++
			prometheusSamplesDiscarded.WithLabelValues("no-cache-series-found").Inc()
			discardExemplarIncIfExists(storage.SeriesRef(s.Ref), exemplars, "no-cache-series-found")
			continue
		}
		name := e.lset.Get(labels.MetricName)
		// Abort if the series is not for the intended histogram metric. All series for it must be grouped
		// together so we can rely on no further relevant series are in the batch.
		if !isHistogramSeries(metric, name) {
			break
		}
		consumed++

		// Create or update the cached distribution for the given histogram series
		dist, ok := b.dists[e.protos.cumulative.hash]
		if !ok {
			dist = getDistribution()
			dist.timestamp = s.T
			b.dists[e.protos.cumulative.hash] = dist
		}
		// If there are diverging timestamps within a single batch, the histogram is not valid.
		if s.T != dist.timestamp {
			dist.skip = true
			prometheusSamplesDiscarded.WithLabelValues("mismatching-histogram-timestamps").Inc()
			discardExemplarIncIfExists(storage.SeriesRef(s.Ref), exemplars, "mismatching-histogram-timestamps")
			continue
		}

		rt, v, ok := b.series.getResetAdjusted(storage.SeriesRef(s.Ref), s.T, s.V)
		// If a series appeared for the first time, we won't get a valid reset timestamp yet.
		// This may happen if the histogram is entirely new or if new series appeared through bucket changes.
		// We skip the entire distribution sample in this case.
		if !ok {
			dist.skip = true
			continue
		}

		// All series can in principle have a NaN value (staleness NaNs already filtered).
		// We permit this for sum and count as we handle it explicitly when building the distribution.
		// For buckets there's not sensible way to handle it however and we discard those bucket samples.
		switch metricSuffix(name[len(metric):]) {
		case metricSuffixSum:
			dist.hasSum, dist.sum = true, v

		case metricSuffixCount:
			dist.hasCount, dist.count = true, v
			// We take the count series as the authoritative source for the overall reset timestamp.
			dist.resetTimestamp = rt

		case metricSuffixBucket:
			bound, err := strconv.ParseFloat(e.lset.Get(labels.BucketLabel), 64)
			if err != nil {
				prometheusSamplesDiscarded.WithLabelValues("malformed-bucket-le-label").Inc()
				discardExemplarIncIfExists(storage.SeriesRef(s.Ref), exemplars, "malformed-bucket-le-label")
				continue
			}
			if math.IsNaN(v) {
				prometheusSamplesDiscarded.WithLabelValues("NaN-bucket-value").Inc()
				discardExemplarIncIfExists(storage.SeriesRef(s.Ref), exemplars, "NaN-bucket-value")
				continue
			}
			// Handle cases where +Inf bucket is out-of-order by not overwriting on the last-consumed bucket.
			if !dist.hasInfBucket {
				dist.hasInfBucket = math.IsInf(bound, 1)
			}
			dist.bounds = append(dist.bounds, bound)
			dist.values = append(dist.values, int64(v))
			if exemplar, ok := exemplars[storage.SeriesRef(s.Ref)]; ok {
				dist.exemplars = append(dist.exemplars, exemplar)
			}

		default:
			break Loop
		}

		if !dist.complete() {
			continue
		}
		dp, err := dist.build(e.lset)
		if err != nil {
			return nil, 0, samples[consumed:], err
		}
		return dp, dist.resetTimestamp, samples[consumed:], nil
	}
	if consumed == 0 {
		prometheusSamplesDiscarded.WithLabelValues("zero-histogram-samples-processed").Inc()
		discardExemplarIncIfExists(storage.SeriesRef(samples[0].Ref), exemplars, "zero-histogram-samples-processed")
		return nil, 0, samples[1:], errors.New("no sample consumed for histogram")
	}
	// Batch ended without completing a further distribution
	return nil, 0, samples[consumed:], nil
}