func()

in pkg/export/transform.go [93:192]


func (b *sampleBuilder) next(metadata MetadataFunc, externalLabels labels.Labels, samples []record.RefSample, exemplars map[storage.SeriesRef]record.RefExemplar) ([]hashedSeries, []record.RefSample, error) {
	sample := samples[0]
	tailSamples := samples[1:]

	// Staleness markers are currently not supported by Cloud Monitoring.
	if value.IsStaleNaN(sample.V) {
		prometheusSamplesDiscarded.WithLabelValues("staleness-marker").Inc()
		discardExemplarIncIfExists(storage.SeriesRef(sample.Ref), exemplars, "staleness-marker")
		return nil, tailSamples, nil
	}

	entry, ok := b.series.get(sample, externalLabels, metadata)
	if !ok {
		prometheusSamplesDiscarded.WithLabelValues("no-cache-series-found").Inc()
		discardExemplarIncIfExists(storage.SeriesRef(sample.Ref), exemplars, "no-cache-series-found")
		return nil, tailSamples, nil
	}
	if entry.dropped {
		return nil, tailSamples, nil
	}

	result := make([]hashedSeries, 0, 2)

	// Shallow copy the cached series protos and populate them with a point. Only histograms
	// need a special case, for other Prometheus types we apply generic gauge/cumulative logic
	// based on the type determined in the series cache.
	// If both are set, we double-write the series as a gauge and a cumulative.
	if g := entry.protos.gauge; g.proto != nil {
		//nolint:govet
		ts := *g.proto

		ts.Points = []*monitoring_pb.Point{{
			Interval: &monitoring_pb.TimeInterval{
				EndTime: getTimestamp(sample.T),
			},
			Value: &monitoring_pb.TypedValue{
				Value: &monitoring_pb.TypedValue_DoubleValue{DoubleValue: sample.V},
			},
		}}
		result = append(result, hashedSeries{hash: g.hash, proto: &ts})
	}
	if c := entry.protos.cumulative; c.proto != nil {
		var (
			value          *monitoring_pb.TypedValue
			resetTimestamp int64
		)
		if entry.metadata.Type == textparse.MetricTypeHistogram {
			// Consume a set of series as a single distribution sample.

			// We pass in the original lset for matching since Prometheus's target label must
			// be the same as well.
			var v *distribution_pb.Distribution
			var err error
			v, resetTimestamp, tailSamples, err = b.buildDistribution(
				entry.metadata.Metric,
				entry.lset,
				samples,
				exemplars,
				externalLabels,
				metadata,
			)
			if err != nil {
				return nil, tailSamples, err
			}
			if v != nil {
				value = &monitoring_pb.TypedValue{
					Value: &monitoring_pb.TypedValue_DistributionValue{DistributionValue: v},
				}
			}
		} else {
			// A regular counter series.
			var v float64
			resetTimestamp, v, ok = b.series.getResetAdjusted(storage.SeriesRef(sample.Ref), sample.T, sample.V)
			if ok {
				value = &monitoring_pb.TypedValue{
					Value: &monitoring_pb.TypedValue_DoubleValue{DoubleValue: v},
				}
				discardExemplarIncIfExists(storage.SeriesRef(sample.Ref), exemplars, "counters-unsupported")
			}
		}
		// We may not have produced a value if:
		//
		//   1. It was the first sample of a cumulative and we only initialized  the reset timestamp with it.
		//   2. We could not observe all necessary series to build a full distribution sample.
		if value != nil {
			//nolint:govet
			ts := *c.proto

			ts.Points = []*monitoring_pb.Point{{
				Interval: &monitoring_pb.TimeInterval{
					StartTime: getTimestamp(resetTimestamp),
					EndTime:   getTimestamp(sample.T),
				},
				Value: value,
			}}
			result = append(result, hashedSeries{hash: c.hash, proto: &ts})
		}
	}
	return result, tailSamples, nil
}