in pkg/export/transform.go [93:192]
func (b *sampleBuilder) next(metadata MetadataFunc, externalLabels labels.Labels, samples []record.RefSample, exemplars map[storage.SeriesRef]record.RefExemplar) ([]hashedSeries, []record.RefSample, error) {
sample := samples[0]
tailSamples := samples[1:]
// Staleness markers are currently not supported by Cloud Monitoring.
if value.IsStaleNaN(sample.V) {
prometheusSamplesDiscarded.WithLabelValues("staleness-marker").Inc()
discardExemplarIncIfExists(storage.SeriesRef(sample.Ref), exemplars, "staleness-marker")
return nil, tailSamples, nil
}
entry, ok := b.series.get(sample, externalLabels, metadata)
if !ok {
prometheusSamplesDiscarded.WithLabelValues("no-cache-series-found").Inc()
discardExemplarIncIfExists(storage.SeriesRef(sample.Ref), exemplars, "no-cache-series-found")
return nil, tailSamples, nil
}
if entry.dropped {
return nil, tailSamples, nil
}
result := make([]hashedSeries, 0, 2)
// Shallow copy the cached series protos and populate them with a point. Only histograms
// need a special case, for other Prometheus types we apply generic gauge/cumulative logic
// based on the type determined in the series cache.
// If both are set, we double-write the series as a gauge and a cumulative.
if g := entry.protos.gauge; g.proto != nil {
//nolint:govet
ts := *g.proto
ts.Points = []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
EndTime: getTimestamp(sample.T),
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_DoubleValue{DoubleValue: sample.V},
},
}}
result = append(result, hashedSeries{hash: g.hash, proto: &ts})
}
if c := entry.protos.cumulative; c.proto != nil {
var (
value *monitoring_pb.TypedValue
resetTimestamp int64
)
if entry.metadata.Type == textparse.MetricTypeHistogram {
// Consume a set of series as a single distribution sample.
// We pass in the original lset for matching since Prometheus's target label must
// be the same as well.
var v *distribution_pb.Distribution
var err error
v, resetTimestamp, tailSamples, err = b.buildDistribution(
entry.metadata.Metric,
entry.lset,
samples,
exemplars,
externalLabels,
metadata,
)
if err != nil {
return nil, tailSamples, err
}
if v != nil {
value = &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_DistributionValue{DistributionValue: v},
}
}
} else {
// A regular counter series.
var v float64
resetTimestamp, v, ok = b.series.getResetAdjusted(storage.SeriesRef(sample.Ref), sample.T, sample.V)
if ok {
value = &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_DoubleValue{DoubleValue: v},
}
discardExemplarIncIfExists(storage.SeriesRef(sample.Ref), exemplars, "counters-unsupported")
}
}
// We may not have produced a value if:
//
// 1. It was the first sample of a cumulative and we only initialized the reset timestamp with it.
// 2. We could not observe all necessary series to build a full distribution sample.
if value != nil {
//nolint:govet
ts := *c.proto
ts.Points = []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: getTimestamp(resetTimestamp),
EndTime: getTimestamp(sample.T),
},
Value: value,
}}
result = append(result, hashedSeries{hash: c.hash, proto: &ts})
}
}
return result, tailSamples, nil
}