pkg/export/series_cache.go (377 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package export
import (
"context"
"encoding/binary"
"errors"
"fmt"
"hash"
"hash/fnv"
"math/rand"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/record"
monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
metric_pb "google.golang.org/genproto/googleapis/api/metric"
monitoredres_pb "google.golang.org/genproto/googleapis/api/monitoredres"
)
// seriesCache holds a mapping from series reference to label set.
// It can garbage collect obsolete entries based on the most recent WAL checkpoint.
// Implements seriesGetter.
type seriesCache struct {
logger log.Logger
now func() time.Time
pool *pool
// Guards access to the entries and intervals maps and the lastRefresh
// field of individual cache entries.
mtx sync.Mutex
// Map from series reference to various cached information about it.
entries map[storage.SeriesRef]*seriesCacheEntry
// Function to retrieve a label set for a series reference number.
// Returns nil if the reference is no longer valid.
getLabelsByRef func(storage.SeriesRef) labels.Labels
// A list of metric selectors. Exported Prometheus are discarded if they
// don't match at least one of the matchers.
// If the matchers are empty, all series pass.
matchers Matchers
// Prefix under which metrics are written to GCM.
metricTypePrefix string
}
type seriesCacheEntry struct {
// The uniquely identifying set of labels for the series.
lset labels.Labels
// Metadata for the metric of the series.
metadata MetricMetadata
// A pre-populated time protobuf to be sent to the GCM API. It can
// be shallow-copied and populated with point values to avoid excessive
// allocations for each datapoint exported for the series.
protos cachedProtos
// The well-known Prometheus metric name suffix if any.
suffix metricSuffix
// Timestamp after which to refresh the cached state.
nextRefresh int64
// Unix timestamp at which the we last used the entry.
lastUsed int64
// Whether the series is dropped from exporting.
dropped bool
// Tracked counter reset state for conversion to GCM cumulatives.
hasReset bool
resetValue float64
lastValue float64
resetTimestamp int64
}
type hashedSeries struct {
hash uint64
proto *monitoring_pb.TimeSeries
}
type cachedProtos struct {
gauge, cumulative hashedSeries
}
func (cp *cachedProtos) empty() bool {
return cp.gauge.proto == nil && cp.cumulative.proto == nil
}
const (
refreshInterval = 10 * time.Minute
refreshJitter = 10 * time.Minute
)
// valid returns true if the Prometheus series can be converted to a GCM series.
func (e *seriesCacheEntry) valid() bool {
return !e.lset.IsEmpty() && (e.dropped || !e.protos.empty())
}
// shouldRefresh returns true if the cached state should be refreshed.
func (e *seriesCacheEntry) shouldRefresh() bool {
// Matchers cannot be changed at runtime and are applied to the local time series labels
// without external labels. Thus the dropped status can never change at runtime and thus
// no refresh is required.
return !e.dropped && time.Now().Unix() > e.nextRefresh
}
// setNextRefresh determines a timestamp for the next refresh.
func (e *seriesCacheEntry) setNextRefresh() {
// Randomly offset the timestamp around the targeted average so a bulk of simultaneously
// created series are not invalidated all at once, causing potential CPU and allocation
// spikes.
jitter := time.Duration((rand.Float64() - 0.5) * float64(refreshJitter))
e.nextRefresh = time.Now().Add(refreshInterval).Add(jitter).Unix()
}
func newSeriesCache(
logger log.Logger,
reg prometheus.Registerer,
metricTypePrefix string,
matchers Matchers,
) *seriesCache {
if logger == nil {
logger = log.NewNopLogger()
}
return &seriesCache{
logger: logger,
now: time.Now,
pool: newPool(reg),
entries: map[storage.SeriesRef]*seriesCacheEntry{},
matchers: matchers,
metricTypePrefix: metricTypePrefix,
}
}
func (c *seriesCache) run(ctx context.Context) {
tick := time.NewTicker(10 * time.Minute)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
if err := c.garbageCollect(10 * time.Minute); err != nil {
//nolint:errcheck
//nolint:errcheck
level.Error(c.logger).Log("msg", "garbage collection failed", "err", err)
}
}
}
}
// forceRefresh forces all series to be reconstructed on the next sample. This will not
// invalidate counter reset state.
func (c *seriesCache) forceRefresh() {
c.mtx.Lock()
defer c.mtx.Unlock()
// Set next refresh to the zero timestamp to trigger a refresh.
for _, e := range c.entries {
e.nextRefresh = 0
}
}
// clear the entire cache state.
func (c *seriesCache) clear() {
c.mtx.Lock()
defer c.mtx.Unlock()
for ref, entry := range c.entries {
c.pool.release(entry.protos.gauge.proto)
c.pool.release(entry.protos.cumulative.proto)
delete(c.entries, ref)
}
}
// garbageCollect drops obsolete cache entries that have not been updated for
// the given delay duration.
func (c *seriesCache) garbageCollect(delay time.Duration) error {
c.mtx.Lock()
defer c.mtx.Unlock()
start := c.now()
// Drop all series that haven't been used in 10 minutes.
//
// Alternatively, we could call getLabelsByRef on each series and discard it if the
// result is nil. While more reliable in only evicting entries that will never come back
// it may mean stale entries sit around for up to 3 hours.
// Since we can always re-populate cache entries, this is not worth it as it may blow
// up our memory usage in high-churn environments.
deleteBefore := start.Add(-delay).Unix()
i := 0
for ref, entry := range c.entries {
if entry.lastUsed >= deleteBefore {
continue
}
c.pool.release(entry.protos.gauge.proto)
c.pool.release(entry.protos.cumulative.proto)
delete(c.entries, ref)
i++
}
//nolint:errcheck
level.Info(c.logger).Log("msg", "garbage collection completed", "took", time.Since(start), "seriesPurged", i)
return nil
}
// get a cache entry for the given series reference. The passed timestamp indicates when data was
// last seen for the entry.
// If the series cannot be converted the returned boolean is false.
func (c *seriesCache) get(s record.RefSample, externalLabels labels.Labels, metadata MetadataFunc) (*seriesCacheEntry, bool) {
c.mtx.Lock()
defer c.mtx.Unlock()
ref := storage.SeriesRef(s.Ref)
e, ok := c.entries[ref]
if !ok {
e = &seriesCacheEntry{}
c.entries[ref] = e
}
if e.shouldRefresh() {
if err := c.populate(ref, e, externalLabels, metadata); err != nil {
//nolint:errcheck
level.Debug(c.logger).Log("msg", "populating series failed", "ref", s.Ref, "err", err)
}
e.setNextRefresh()
}
// Store millisecond sample timestamp in seconds.
e.lastUsed = s.T / 1000
return e, e.valid()
}
// getResetAdjusted takes a sample for a referenced series and returns
// its reset timestamp and adjusted value.
// If the last return argument is false, the sample should be dropped.
func (c *seriesCache) getResetAdjusted(ref storage.SeriesRef, t int64, v float64) (int64, float64, bool) {
c.mtx.Lock()
e, ok := c.entries[ref]
c.mtx.Unlock()
if !ok {
return 0, 0, false
}
hasReset := e.hasReset
e.hasReset = true
if !hasReset {
e.resetTimestamp = t
e.resetValue = v
// If we just initialized the reset timestamp, this sample should be skipped.
// We don't know the window over which the current cumulative value was built up over.
// The next sample for will be considered from this point onwards.
return 0, 0, false
} else if t <= e.resetTimestamp {
// Otherwise if the current sample's time was already processed, drop sample.
// Keeping the sample is not desirable because it results in:
// - (at best) performing excessive API write calls with redundant data
// - sending API bad requests in the form of zero-ranged sample intervals
// - attempting to update a previous point, resulting in an error response.
//
// Note: this will only omit duplicates of the initial "reset" sample.
// Omitting duplicates of all incoming samples would require
// more sophisticated state management.
return 0, 0, false
}
if v < e.lastValue {
// If the series was reset, set the reset timestamp to be one millisecond
// before the timestamp of the current sample.
// We don't know the true reset time but this ensures the range is non-zero
// while unlikely to conflict with any previous sample.
e.resetValue = 0
e.resetTimestamp = t - 1
}
e.lastValue = v
return e.resetTimestamp, v - e.resetValue, true
}
// getMetricType creates a GCM metric type from the Prometheus metric name and a type suffix.
// Optionally, a secondary type suffix may be provided for series for which a Prometheus type
// may be written as different GCM series.
// The general rule is that if the primary suffix is ambigious about whether the specific series
// is to be treated as a counter or gauge at query time, the secondarySuffix is set to "counter"
// for the counter variant, and left empty for the gauge variant.
func (c *seriesCache) getMetricType(name string, suffix, secondarySuffix gcmMetricSuffix) string {
if secondarySuffix == gcmMetricSuffixNone {
return fmt.Sprintf("%s/%s/%s", c.metricTypePrefix, name, suffix)
}
return fmt.Sprintf("%s/%s/%s:%s", c.metricTypePrefix, name, suffix, secondarySuffix)
}
// Metric name suffixes used by various Prometheus metric types.
type metricSuffix string
const (
metricSuffixNone metricSuffix = ""
metricSuffixTotal metricSuffix = "_total"
metricSuffixBucket metricSuffix = "_bucket"
metricSuffixSum metricSuffix = "_sum"
metricSuffixCount metricSuffix = "_count"
)
// Suffixes appended to GCM metric types. They are equivalent to the respective
// Prometheus types but we redfine them here to ensure they don't unexpectedly change
// by updating a Prometheus library.
type gcmMetricSuffix string
const (
gcmMetricSuffixNone gcmMetricSuffix = ""
gcmMetricSuffixUnknown gcmMetricSuffix = "unknown"
gcmMetricSuffixGauge gcmMetricSuffix = "gauge"
gcmMetricSuffixCounter gcmMetricSuffix = "counter"
gcmMetricSuffixHistogram gcmMetricSuffix = "histogram"
gcmMetricSuffixSummary gcmMetricSuffix = "summary"
)
// Maximum number of labels allowed on GCM series.
const maxLabelCount = 100
// populate cached state for the given entry.
func (c *seriesCache) populate(ref storage.SeriesRef, entry *seriesCacheEntry, externalLabels labels.Labels, getMetadata MetadataFunc) error {
if entry.lset.IsEmpty() {
entry.lset = c.getLabelsByRef(ref)
if entry.lset.IsEmpty() {
return errors.New("series reference invalid")
}
entry.dropped = !c.matchers.Matches(entry.lset)
}
if entry.dropped {
return nil
}
// Break the series into resource and metric labels.
resource, metricLabels, err := extractResource(externalLabels, entry.lset)
if err != nil {
return fmt.Errorf("extracting resource for series %s failed: %w", entry.lset, err)
}
// Remove the __name__ label as it becomes the metric type in the GCM time series.
metricLabelsBuilder := labels.NewBuilder(metricLabels)
metricLabelsBuilder.Del(labels.MetricName)
metricLabels = metricLabelsBuilder.Labels()
// Drop series with too many labels.
// TODO: remove once field limit is lifted in the GCM API.
if metricLabels.Len() > maxLabelCount {
return fmt.Errorf("metric labels %s exceed the limit of %d", metricLabels, maxLabelCount)
}
var (
metricName = entry.lset.Get("__name__")
baseMetricName = metricName
suffix metricSuffix
)
metadata, ok := getMetadata(metricName)
if !ok {
// The full name didn't turn anything up. Check again in case it's a summary
// or histogram without the metric name suffix. If the underlying target
// returned the OpenMetrics format, counter metadata is also stored with the
// _total suffix stripped.
var ok bool
if baseMetricName, suffix, ok = splitMetricSuffix(metricName); ok {
metadata, ok = getMetadata(baseMetricName)
}
if !ok {
return fmt.Errorf("no metadata found for metric name %q", metricName)
}
}
// Handle label modifications for histograms early so we don't build the label map twice.
// We have to remove the 'le' label which defines the bucket boundary.
if metadata.Type == textparse.MetricTypeHistogram {
metricLabelsBuilder := labels.NewBuilder(metricLabels)
metricLabelsBuilder.Del(labels.BucketLabel)
metricLabels = metricLabelsBuilder.Labels()
}
newSeries := func(mtype string, kind metric_pb.MetricDescriptor_MetricKind, vtype metric_pb.MetricDescriptor_ValueType) hashedSeries {
s := &monitoring_pb.TimeSeries{
Resource: resource,
Metric: &metric_pb.Metric{Type: mtype, Labels: metricLabels.Map()},
MetricKind: kind,
ValueType: vtype,
}
return hashedSeries{hash: hashSeries(s), proto: s}
}
var protos cachedProtos
switch metadata.Type {
case textparse.MetricTypeCounter:
protos.cumulative = newSeries(
c.getMetricType(metricName, gcmMetricSuffixCounter, gcmMetricSuffixNone),
metric_pb.MetricDescriptor_CUMULATIVE,
metric_pb.MetricDescriptor_DOUBLE)
case textparse.MetricTypeGauge:
protos.gauge = newSeries(
c.getMetricType(metricName, gcmMetricSuffixGauge, gcmMetricSuffixNone),
metric_pb.MetricDescriptor_GAUGE,
metric_pb.MetricDescriptor_DOUBLE)
case textparse.MetricTypeUnknown:
protos.gauge = newSeries(
c.getMetricType(metricName, gcmMetricSuffixUnknown, gcmMetricSuffixNone),
metric_pb.MetricDescriptor_GAUGE,
metric_pb.MetricDescriptor_DOUBLE)
protos.cumulative = newSeries(
c.getMetricType(metricName, gcmMetricSuffixUnknown, gcmMetricSuffixCounter),
metric_pb.MetricDescriptor_CUMULATIVE,
metric_pb.MetricDescriptor_DOUBLE)
case textparse.MetricTypeSummary:
switch suffix {
case metricSuffixSum:
protos.cumulative = newSeries(
c.getMetricType(metricName, gcmMetricSuffixSummary, gcmMetricSuffixCounter),
metric_pb.MetricDescriptor_CUMULATIVE,
metric_pb.MetricDescriptor_DOUBLE)
case metricSuffixCount:
protos.cumulative = newSeries(
c.getMetricType(metricName, gcmMetricSuffixSummary, gcmMetricSuffixNone),
metric_pb.MetricDescriptor_CUMULATIVE,
metric_pb.MetricDescriptor_DOUBLE)
case metricSuffixNone: // Actual quantiles.
protos.gauge = newSeries(
c.getMetricType(metricName, gcmMetricSuffixSummary, gcmMetricSuffixNone),
metric_pb.MetricDescriptor_GAUGE,
metric_pb.MetricDescriptor_DOUBLE)
default:
return fmt.Errorf("unexpected metric name suffix %q for metric %q", suffix, metricName)
}
case textparse.MetricTypeHistogram:
protos.cumulative = newSeries(
c.getMetricType(baseMetricName, gcmMetricSuffixHistogram, gcmMetricSuffixNone),
metric_pb.MetricDescriptor_CUMULATIVE,
metric_pb.MetricDescriptor_DISTRIBUTION)
default:
return fmt.Errorf("unexpected metric type %s for metric %q", metadata.Type, metricName)
}
c.pool.release(entry.protos.gauge.proto)
c.pool.release(entry.protos.cumulative.proto)
c.pool.intern(protos.gauge.proto)
c.pool.intern(protos.cumulative.proto)
entry.protos = protos
entry.metadata = metadata
entry.suffix = suffix
return nil
}
// extractResource returns the monitored resource, the entry labels, and whether the operation succeeded
// for the provided external labels and Prometheus series labels.
// The returned entry labels are a subset of `lset` without the labels that were used as resource labels.
func extractResource(externalLabels, lset labels.Labels) (*monitoredres_pb.MonitoredResource, labels.Labels, error) {
// Prometheus allows to configure external labels, which are attached when exporting data out of
// the instance to disambiguate data across instances. For us they generally include 'project_id',
// 'location' and 'cluster'.
// Per Prometheus semantics external labels are merged into lset, while lset takes precedence on
// label name collisions.
//
// This can be problematic as it violates hierarchical precedence. Especially 'project_id'
// or 'location' being overwritten from a metric label could likely fill in an invalid value.
// A sensible solution could be to adopt Prometheus collision resolution for target and metric
// labels, in which colliding metric label keys are prefixed with 'exported_'.
//
// However, the semantics are also right and important for recording rules, where one would generally
// want to retain original resource fields of the metrics but would want to default to a 'project_id' and
// 'location' for rules which aggregated away the original fields.
// For example a recording of 'sum(up)' would still need a fallback 'project_id' and 'location' to
// be stored in.
//
// Thus we stick with the upstream semantics and consider how to address unintended consequences if
// and when they come up.
builder := labels.NewBuilder(lset)
externalLabels.Range(func(l labels.Label) {
if !lset.Has(l.Name) {
builder.Set(l.Name, l.Value)
}
})
lset = builder.Labels()
// Ensure project_id and location are set but leave validating of the values to the API.
if lset.Get(KeyProjectID) == "" {
return nil, labels.EmptyLabels(), fmt.Errorf("missing required resource field %q", KeyProjectID)
}
if lset.Get(KeyLocation) == "" {
return nil, labels.EmptyLabels(), fmt.Errorf("missing required resource field %q", KeyLocation)
}
// Transfer resource fields from label set onto the resource. If they are not set,
// the respective field is set to an empty string. This explicitly is a valid value
// in Cloud Monitoring and not the same as being unset.
mres := &monitoredres_pb.MonitoredResource{
Type: "prometheus_target",
Labels: map[string]string{
KeyProjectID: lset.Get(KeyProjectID),
KeyLocation: lset.Get(KeyLocation),
KeyCluster: lset.Get(KeyCluster),
KeyNamespace: lset.Get(KeyNamespace),
KeyJob: lset.Get(KeyJob),
KeyInstance: lset.Get(KeyInstance),
},
}
builder.Del(KeyProjectID)
builder.Del(KeyLocation)
builder.Del(KeyCluster)
builder.Del(KeyNamespace)
builder.Del(KeyJob)
builder.Del(KeyInstance)
return mres, builder.Labels(), nil
}
func splitMetricSuffix(name string) (prefix string, suffix metricSuffix, ok bool) {
if strings.HasSuffix(name, string(metricSuffixTotal)) {
return name[:len(name)-len(metricSuffixTotal)], metricSuffixTotal, true
}
if strings.HasSuffix(name, string(metricSuffixBucket)) {
return name[:len(name)-len(metricSuffixBucket)], metricSuffixBucket, true
}
if strings.HasSuffix(name, string(metricSuffixCount)) {
return name[:len(name)-len(metricSuffixCount)], metricSuffixCount, true
}
if strings.HasSuffix(name, string(metricSuffixSum)) {
return name[:len(name)-len(metricSuffixSum)], metricSuffixSum, true
}
return name, metricSuffixNone, false
}
func hashSeries(s *monitoring_pb.TimeSeries) uint64 {
h := fnv.New64a()
h.Write([]byte(s.Resource.Type))
hashLabels(h, s.Resource.Labels)
h.Write([]byte(s.Metric.Type))
hashLabels(h, s.Metric.Labels)
//nolint:errcheck
binary.Write(h, binary.LittleEndian, s.MetricKind)
//nolint:errcheck
binary.Write(h, binary.LittleEndian, s.ValueType)
return h.Sum64()
}
func hashLabels(h hash.Hash, lset map[string]string) {
sep := []byte{'\xff'}
// Map iteration is randomized. We thus convert the labels to sorted slices
// with labels.FromMap before hashing.
labels.FromMap(lset).Range(func(l labels.Label) {
h.Write(sep)
h.Write([]byte(l.Name))
h.Write(sep)
h.Write([]byte(l.Value))
})
h.Write(sep)
}