pkg/export/export.go (736 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package export import ( "bytes" "context" "errors" "fmt" "math" "os" "os/exec" "reflect" "runtime/debug" "strings" "sync" "testing" "time" monitoring "cloud.google.com/go/monitoring/apiv3/v2" monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/go-kit/log" "github.com/go-kit/log/level" gax "github.com/googleapis/gax-go/v2" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/record" "golang.org/x/oauth2/google" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" ) var ( samplesExported = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gcm_export_samples_exported_total", Help: "Number of samples exported at scrape time.", }) exemplarsExported = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gcm_export_exemplars_exported_total", Help: "Number of exemplars exported at scrape time.", }) samplesDropped = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "gcm_export_samples_dropped_total", Help: "Number of exported samples that were intentionally dropped.", }, []string{"reason"}) exemplarsDropped = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "gcm_export_exemplars_dropped_total", Help: "Number of exported exemplars that were intentionally dropped.", }, []string{"reason"}) samplesSent = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gcm_export_samples_sent_total", Help: "Number of exported samples sent to GCM.", }) samplesSendErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "gcm_export_samples_send_errors_total", Help: "Number of errors encountered while sending samples to GCM", }, []string{"project_id"}) sendIterations = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gcm_export_send_iterations_total", Help: "Number of processing iterations of the sample export send handler.", }) shardProcess = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gcm_export_shard_process_total", Help: "Number of shard retrievals.", }) shardProcessPending = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gcm_export_shard_process_pending_total", Help: "Number of shard retrievals with an empty result.", }) shardProcessSamplesTaken = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "gcm_export_shard_process_samples_taken", Help: "Number of samples taken when processing a shard.", // Limit buckets to 200, which is the real-world batch size for GCM. Buckets: []float64{1, 2, 5, 10, 20, 50, 100, 150, 200}, }) pendingRequests = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "gcm_export_pending_requests", Help: "Number of in-flight requests to GCM.", }) projectsPerBatch = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "gcm_export_projects_per_batch", Help: "Number of different projects in a batch that's being sent.", Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}, }) samplesPerRPCBatch = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "gcm_export_samples_per_rpc_batch", Help: "Number of samples that ended up in a single RPC batch.", // Limit buckets to 200, which is the real-world batch size for GCM. Buckets: []float64{1, 2, 5, 10, 20, 50, 100, 150, 200}, }) ErrLocationGlobal = errors.New("location must be set to a named Google Cloud " + "region and cannot be set to \"global\". please choose the " + "Google Cloud region that is physically nearest to your cluster. " + "see https://www.cloudinfrastructuremap.com/") ) type metricServiceClient interface { Close() error CreateTimeSeries(context.Context, *monitoring_pb.CreateTimeSeriesRequest, ...gax.CallOption) error } // Exporter converts Prometheus samples into Cloud Monitoring samples and exports them. type Exporter struct { logger log.Logger ctx context.Context opts ExporterOpts metricClient metricServiceClient seriesCache *seriesCache shards []*shard // Channel for signaling that there may be more work items to // be processed. nextc chan struct{} // The external labels may be updated asynchronously by configuration changes // and must be locked with mtx. mtx sync.RWMutex externalLabels labels.Labels // A set of metrics for which we defaulted the metadata to untyped and have // issued a warning about that. warnedUntypedMetrics map[string]struct{} // A lease on a time range for which the exporter send sample data. // It is checked for on each batch provided to the Export method. // If unset, data is always sent. lease Lease // Used to construct a new metric client when options change, or at initialization. It // is exposed as a variable so that unit tests may change the constructor. newMetricClient func(ctx context.Context, opts ExporterOpts) (metricServiceClient, error) } const ( // DefaultShardCount represents number of shards by which series are bucketed. DefaultShardCount = 1024 // DefaultShardBufferSize represents the buffer size for each individual shard. // Each element in buffer (queue) consists of sample and hash. DefaultShardBufferSize = 2048 // BatchSizeMax represents maximum number of samples to pack into a batch sent to GCM. BatchSizeMax = 200 // Time after an accumulating batch is flushed to GCM. This avoids data being // held indefinititely if not enough new data flows in to fill up the batch. batchDelayMax = 50 * time.Millisecond // Prefix for GCM metric. MetricTypePrefix = "prometheus.googleapis.com" ) // Supported gRPC compression formats. const ( CompressionNone = "none" CompressionGZIP = "gzip" ) // ExporterOpts holds options for an exporter. type ExporterOpts struct { // Whether to disable exporting of metrics. Disable bool // GCM API endpoint to send metric data to. Endpoint string // Compression format to use for gRPC requests. Compression string // Credentials file for authentication with the GCM API. CredentialsFile string // CredentialsFromJSON represents content of credentials file for // authentication with the GCM API. CredentialsFile has priority over this. CredentialsFromJSON []byte // Disable authentication (for debugging purposes). DisableAuth bool // A user agent product string added to the regular user agent. // See: https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3 UserAgentProduct string // A string added as a suffix to the regular user agent. UserAgentMode string // UserAgentEnv where calls to GCM API are made. UserAgentEnv string // Default monitored resource fields set on exported data. ProjectID string Location string Cluster string // A list of metric matchers. Only Prometheus time series satisfying at // least one of the matchers are exported. // This option matches the semantics of the Prometheus federation match[] // parameter. Matchers Matchers // Prefix under which metrics are written to GCM. MetricTypePrefix string // Request URL and body for generating an alternative GCE token source. // This allows metrics to be exported to an alternative project. TokenURL string TokenBody string // The project ID of an alternative project for quota attribution. QuotaProject string // Efficiency represents exporter options that allows fine-tuning of // internal data structure sizes. Only for advance users. No compatibility // guarantee (might change in future). Efficiency EfficiencyOpts } // DefaultUnsetFields defaults any zero-valued fields. func (opts *ExporterOpts) DefaultUnsetFields() { if opts.Efficiency.BatchSize == 0 { opts.Efficiency.BatchSize = BatchSizeMax } if opts.Efficiency.ShardCount == 0 { opts.Efficiency.ShardCount = DefaultShardCount } if opts.Efficiency.ShardBufferSize == 0 { opts.Efficiency.ShardBufferSize = DefaultShardBufferSize } if opts.Endpoint == "" { opts.Endpoint = "monitoring.googleapis.com:443" } if opts.Compression == "" { opts.Compression = CompressionNone } if opts.MetricTypePrefix == "" { opts.MetricTypePrefix = MetricTypePrefix } if opts.UserAgentMode == "" { opts.UserAgentMode = "unspecified" } } func (opts *ExporterOpts) Validate() error { if opts.Efficiency.BatchSize > BatchSizeMax { return fmt.Errorf("maximum supported batch size is %d, got %d", BatchSizeMax, opts.Efficiency.BatchSize) } return nil } // EfficiencyOpts represents exporter options that allows fine-tuning of // internal data structure sizes. Only for advance users. No compatibility // guarantee (might change in future). type EfficiencyOpts struct { // BatchSize controls a maximum batch size to use when sending data to the GCM // API. Defaults to BatchSizeMax when 0. The BatchSizeMax is also // the maximum number this field can have due to GCM quota for write requests // size. See https://cloud.google.com/monitoring/quotas?hl=en#custom_metrics_quotas. BatchSize uint // ShardCount controls number of shards. Refer to Exporter.Run documentation // to learn more about algorithm. Defaults to DefaultShardCount when 0. ShardCount uint // ShardBufferSize controls the size for each individual shard. Each element // in buffer (queue) consists of sample and hash. Refer to Exporter.Run // documentation to learn more about algorithm. Defaults to // DefaultShardBufferSize when 0. ShardBufferSize uint } // NopExporter returns a permanently inactive exporter. func NopExporter() *Exporter { return &Exporter{ opts: ExporterOpts{ Disable: true, }, } } // Lease determines a currently owned time range. type Lease interface { // Range informs whether the caller currently holds the lease and for what time range. // The range is inclusive. Range() (start, end time.Time, ok bool) // Run background processing until context is cancelled. Run(context.Context) // OnLeaderChange sets a callback that is invoked when the lease leader changes. // Must be called before Run. OnLeaderChange(func()) } // NopLease returns a lease that disables leasing. func NopLease() Lease { return &alwaysLease{} } // alwaysLease is a lease that is always held. type alwaysLease struct{} func (alwaysLease) Range() (start time.Time, end time.Time, ok bool) { return time.UnixMilli(math.MinInt64), time.UnixMilli(math.MaxInt64), true } func (alwaysLease) Run(ctx context.Context) { <-ctx.Done() } func (alwaysLease) OnLeaderChange(func()) { // We never lose the lease as it's always owned. } func defaultNewMetricClient(ctx context.Context, opts ExporterOpts) (metricServiceClient, error) { version, err := Version() if err != nil { return nil, fmt.Errorf("unable to fetch user agent version: %w", err) } // Identity User Agent for all gRPC requests. ua := strings.TrimSpace(fmt.Sprintf("%s/%s %s (env:%s;mode:%s)", ClientName, version, opts.UserAgentProduct, opts.UserAgentEnv, opts.UserAgentMode)) clientOpts := []option.ClientOption{ option.WithGRPCDialOption(grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor)), option.WithUserAgent(ua), } if opts.Endpoint != "" { clientOpts = append(clientOpts, option.WithEndpoint(opts.Endpoint)) } // Disable auth when the exporter is disabled because we don't want a panic when default // credentials are not found. if opts.DisableAuth || opts.Disable { clientOpts = append(clientOpts, option.WithoutAuthentication(), option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), ) } else if opts.CredentialsFile == "" && len(opts.CredentialsFromJSON) == 0 { // If no credentials are found, gRPC panics so we check manually. _, err := google.FindDefaultCredentials(ctx) if err != nil { return nil, err } } if opts.CredentialsFile != "" { clientOpts = append(clientOpts, option.WithCredentialsFile(opts.CredentialsFile)) } else if len(opts.CredentialsFromJSON) > 0 { clientOpts = append(clientOpts, option.WithCredentialsJSON(opts.CredentialsFromJSON)) } if opts.TokenURL != "" && opts.TokenBody != "" { tokenSource := NewAltTokenSource(opts.TokenURL, opts.TokenBody) clientOpts = append(clientOpts, option.WithTokenSource(tokenSource)) } if opts.QuotaProject != "" { clientOpts = append(clientOpts, option.WithQuotaProject(opts.QuotaProject)) } client, err := monitoring.NewMetricClient(ctx, clientOpts...) if err != nil { return nil, err } if opts.Compression == CompressionGZIP { client.CallOptions.CreateTimeSeries = append(client.CallOptions.CreateTimeSeries, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name))) } return client, nil } // New returns a new Cloud Monitoring Exporter. func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts ExporterOpts, lease Lease) (*Exporter, error) { grpc_prometheus.EnableClientHandlingTimeHistogram( grpc_prometheus.WithHistogramBuckets([]float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 20, 30, 40, 50, 60}), ) if logger == nil { logger = log.NewNopLogger() } if reg != nil { reg.MustRegister( prometheusSamplesDiscarded, samplesExported, samplesDropped, samplesSent, samplesSendErrors, sendIterations, shardProcess, shardProcessPending, shardProcessSamplesTaken, pendingRequests, projectsPerBatch, samplesPerRPCBatch, ) } if err := opts.Validate(); err != nil { return nil, err } if lease == nil { lease = NopLease() } metricClient, err := defaultNewMetricClient(ctx, opts) if err != nil { return nil, fmt.Errorf("create metric client: %w", err) } e := &Exporter{ logger: logger, ctx: ctx, opts: opts, metricClient: metricClient, seriesCache: newSeriesCache(logger, reg, opts.MetricTypePrefix, opts.Matchers), externalLabels: createLabelSet(&config.Config{}, &opts), newMetricClient: defaultNewMetricClient, nextc: make(chan struct{}, 1), shards: make([]*shard, opts.Efficiency.ShardCount), warnedUntypedMetrics: map[string]struct{}{}, lease: lease, } // Whenever the lease is lost, clear the series cache so we don't start off of out-of-range // reset timestamps when we gain the lease again. lease.OnLeaderChange(e.seriesCache.clear) for i := range e.shards { e.shards[i] = newShard(opts.Efficiency.ShardBufferSize) } return e, nil } // The target label keys used for the Prometheus monitored resource. const ( KeyProjectID = "project_id" KeyLocation = "location" KeyCluster = "cluster" KeyNamespace = "namespace" KeyJob = "job" KeyInstance = "instance" ) // ApplyConfig updates the exporter state to the given configuration. The given `ExporterOpts`, // if non-nil, is applied to the exporter, potentially recreating the metric client. It must be // defaulted and validated. func (e *Exporter) ApplyConfig(cfg *config.Config, opts *ExporterOpts) (err error) { // Note: We don't expect the NopExporter to call this. Only the config reloader calls it. e.mtx.Lock() defer e.mtx.Unlock() // Don't recreate the metric client each time. If the metric client is recreated, it has to // potentially redo the TCP handshake. With HTTP/2, TCP connections are kept alive for a small // amount of time to reduce load when multiple requests are made to the same server in // succession. In our case, we might send a CMP call every 50ms at the worst case, which is // highly likely to benefit from the persistent TPC connection. optsChanged := false if opts != nil { optsChanged = !reflect.DeepEqual(e.opts, opts) if optsChanged { e.opts = *opts } } lset := createLabelSet(cfg, &e.opts) labelsChanged := !labels.Equal(e.externalLabels, lset) // We don't need to validate if there's no scrape configs or rules, i.e. at startup. hasScrapeConfigs := len(cfg.ScrapeConfigs) != 0 || len(cfg.ScrapeConfigFiles) != 0 hasRules := len(cfg.RuleFiles) != 0 if hasScrapeConfigs || hasRules { if err := validateLabelSet(lset); err != nil { return err } } // If changed, or we're calling this for the first time, we need to recreate the client. if optsChanged { e.metricClient, err = e.newMetricClient(e.ctx, e.opts) if err != nil { return fmt.Errorf("create metric client: %w", err) } } if labelsChanged { e.externalLabels = lset // New external labels possibly invalidate the cached series conversions. e.seriesCache.forceRefresh() } return nil } func createLabelSet(cfg *config.Config, opts *ExporterOpts) labels.Labels { // If project_id, location, or cluster were set through the external_labels in the config // file, these values take precedence. If they are unset, the flag value, which defaults // to an environment-specific value on GCE/GKE, is used. builder := labels.NewBuilder(cfg.GlobalConfig.ExternalLabels) if !cfg.GlobalConfig.ExternalLabels.Has(KeyProjectID) { builder.Set(KeyProjectID, opts.ProjectID) } if !cfg.GlobalConfig.ExternalLabels.Has(KeyLocation) { builder.Set(KeyLocation, opts.Location) } if !cfg.GlobalConfig.ExternalLabels.Has(KeyCluster) { builder.Set(KeyCluster, opts.Cluster) } return builder.Labels() } func validateLabelSet(lset labels.Labels) error { // We expect location and project ID to be set. They are effectively only a default // however as they may be overridden by metric labels. if lset.Get(KeyProjectID) == "" { return fmt.Errorf("no label %q set via external labels or flag", KeyProjectID) } // In production scenarios, "location" should most likely never be overridden as it // means crossing failure domains. Instead, each location should run a replica of the // evaluator with the same rules. switch loc := lset.Get(KeyLocation); loc { case "": return fmt.Errorf("no label %q set via external labels or flag", KeyLocation) case "global": return ErrLocationGlobal default: return nil } } // SetLabelsByIDFunc injects a function that can be used to retrieve a label set // based on a series ID we got through exported sample records. // Must be called before any call to Export is made. func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) { if e.seriesCache == nil { // We don't have a cache in a nop exporter, so we skip. return } if e.seriesCache.getLabelsByRef != nil { panic("SetLabelsByIDFunc must only be called once") } e.seriesCache.getLabelsByRef = f } // Export enqueues the samples and exemplars to be written to Cloud Monitoring. func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemplarMap map[storage.SeriesRef]record.RefExemplar) { // Wether we're sending data or not, add batchsize of samples exported by // Prometheus from appender commit. batchSize := len(batch) samplesExported.Add(float64(batchSize)) if e.opts.Disable { return } metadata = e.wrapMetadata(metadata) e.mtx.Lock() externalLabels := e.externalLabels start, end, ok := e.lease.Range() e.mtx.Unlock() if !ok { exemplarsDropped.WithLabelValues("not-in-ha-range").Add(float64(len(exemplarMap))) samplesDropped.WithLabelValues("not-in-ha-range").Add(float64(batchSize)) return } builder := newSampleBuilder(e.seriesCache) defer builder.close() exemplarsExported.Add(float64(len(exemplarMap))) for len(batch) > 0 { var ( samples []hashedSeries err error ) samples, batch, err = builder.next(metadata, externalLabels, batch, exemplarMap) if err != nil { //nolint:errcheck level.Debug(e.logger).Log("msg", "building sample failed", "err", err) continue } for _, s := range samples { // Only enqueue samples for within our HA range. if sampleInRange(s.proto, start, end) { e.enqueue(s.hash, s.proto) } else { // Hashed series protos should only ever have one point. If this is // a distribution increase exemplarsDropped if there are exemplars. if dist := s.proto.Points[0].Value.GetDistributionValue(); dist != nil { exemplarsDropped.WithLabelValues("not-in-ha-range").Add(float64(len(dist.GetExemplars()))) } samplesDropped.WithLabelValues("not-in-ha-range").Inc() } } } // Signal that new data is available. e.triggerNext() } func sampleInRange(sample *monitoring_pb.TimeSeries, start, end time.Time) bool { // A sample has exactly one point in the time series. The start timestamp may be unset for gauges. if s := sample.Points[0].Interval.StartTime; s != nil && s.AsTime().Before(start) { return false } if sample.Points[0].Interval.EndTime.AsTime().After(end) { return false } return true } func (e *Exporter) enqueue(hash uint64, sample *monitoring_pb.TimeSeries) { idx := hash % uint64(len(e.shards)) e.shards[idx].enqueue(hash, sample) } func (e *Exporter) triggerNext() { select { case e.nextc <- struct{}{}: default: } } const ( // ClientName is used to identify the User Agent. ClientName = "prometheus-engine-export" // mainModuleVersion is the version of the main module. Align with git tag. // TODO(TheSpiritXIII): Remove with https://github.com/golang/go/issues/50603 mainModuleVersion = "v0.15.0-rc.7" // x-release-please-version // mainModuleName is the name of the main module. Align with go.mod. mainModuleName = "github.com/GoogleCloudPlatform/prometheus-engine" ) // Version is used in the User Agent. This version is automatically detected if // this function is imported as a library. However, the version is statically // set if this function is used in a binary in prometheus-engine due to Golang // restrictions. While testing, the static version is validated for correctness. func Version() (string, error) { if testing.Testing() { // TODO(TheSpiritXIII): After https://github.com/golang/go/issues/50603 just return an empty // string here. For now, use the opportunity to confirm that the static version is correct. // We manually get the closest git tag if the user is running the unit test locally, but // fallback to the GIT_TAG environment variable in case the user is running the test via // Docker (like `make test` does by default). if testTag, found := os.LookupEnv("TEST_TAG"); !found || testTag == "false" { return mainModuleVersion, nil } cmd := exec.Command("git", "describe", "--tags", "--abbrev=0") var stdout bytes.Buffer cmd.Stdout = &stdout version := "" if err := cmd.Run(); err != nil { version = strings.TrimSpace(os.Getenv("GIT_TAG")) if version == "" { return "", errors.New("unable to detect git tag, please set GIT_TAG env variable") } } else { version = strings.TrimSpace(stdout.String()) } return version, nil } // TODO(TheSpiritXIII): Due to https://github.com/golang/go/issues/50603 we must use a static // string for the main module (when we import this function locally for binaries). bi, ok := debug.ReadBuildInfo() if !ok { return "", errors.New("unable to retrieve build info") } if bi.Main.Path == mainModuleName { return mainModuleVersion, nil } var exportDep *debug.Module for _, dep := range bi.Deps { if dep.Path == mainModuleName { exportDep = dep break } } if exportDep == nil { return "", fmt.Errorf("unable to find module %q %v", mainModuleName, bi.Deps) } return exportDep.Version, nil } // Run sends exported samples to Google Cloud Monitoring. Must be called at most once. // // Run starts a loop that gathers samples and sends them to GCM. // // Samples must not arrive at the GCM API out of order. To ensure that, there // must be at most one in-flight request per series. Tracking every series individually // would also require separate queue per series. This would come with a lot of overhead // and implementation complexity. // Instead, we shard the series space and maintain one queue per shard. For every shard // we ensure that there is at most one in-flight request. // // One solution would be to have a separate send loop per shard that reads from // the queue, accumulates a batch, and sends it to the GCM API. The drawback is that one // has to get the number of shards right. Too low, and samples per shard cannot be sent // fast enough. Too high, and batches do not fill up, potentially sending new requests // for every sample. // As a result, fine-tuning at startup but also runtime is necessary to respond to changing // load patterns and latency of the API. // // We largely avoid this issue by filling up batches from multiple shards. Under high load, // a batch contains samples from fewer shards, under low load from more. // The per-shard overhead is minimal and thus a high number can be picked, which allows us // to cover a large range of potential throughput and latency combinations without requiring // user configuration or, even worse, runtime changes to the shard number. func (e *Exporter) Run() error { // Note: We don't expect the NopExporter to call this. Only the main binary calls this. defer e.close() go e.seriesCache.run(e.ctx) go e.lease.Run(e.ctx) timer := time.NewTimer(batchDelayMax) stopTimer := func() { if !timer.Stop() { select { case <-timer.C: default: } } } defer stopTimer() e.mtx.RLock() opts := e.opts e.mtx.RUnlock() curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize) // Send the currently accumulated batch to GCM asynchronously. send := func() { e.mtx.RLock() opts := e.opts sendFunc := e.metricClient.CreateTimeSeries e.mtx.RUnlock() // Send the batch and once it completed, trigger next to process remaining data in the // shards that were part of the batch. This ensures that if we didn't take all samples // from a shard when filling the batch, we'll come back for them and any queue built-up // gets sent eventually. go func(ctx context.Context, b *batch) { if !opts.Disable { b.send(ctx, sendFunc) } // We could only trigger if we didn't fully empty shards in this batch. // Benchmarking showed no beneficial impact of this optimization. e.triggerNext() }(e.ctx, curBatch) // Reset state for new batch. stopTimer() timer.Reset(batchDelayMax) curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize) } for { select { // NOTE(freinartz): we will terminate once context is cancelled and not flush remaining // buffered data. In-flight requests will be aborted as well. // This is fine once we persist data submitted via Export() but for now there may be some // data loss on shutdown. case <-e.ctx.Done(): return nil // This is activated for each new sample that arrives case <-e.nextc: sendIterations.Inc() // Drain shards to fill up the batch. // // If the shard count is high given the overall throughput, a lot of shards may // be packed into the same batch. A slow request will then block all those shards // from further parallel sends. // If this becomes a problem (especially when we grow maximum batch size), consider // adding a heuristic to send partial batches in favor of limiting the number of // shards they span. for _, shard := range e.shards { shard.fill(curBatch) if curBatch.full() { send() } } case <-timer.C: // Flush batch that has been pending for too long. if !curBatch.empty() { send() } else { timer.Reset(batchDelayMax) } } } } func (e *Exporter) close() { e.mtx.Lock() defer e.mtx.Unlock() if err := e.metricClient.Close(); err != nil { _ = e.logger.Log("msg", "error closing metric client", "err", err) } e.metricClient = nil } // CtxKey is a dedicated type for keys of context-embedded values propagated // with the scrape context. type ctxKey int // Valid CtxKey values. const ( ctxKeyMetadata ctxKey = iota + 1 ) // WithMetadataFunc stores mf in the context. func WithMetadataFunc(ctx context.Context, mf MetadataFunc) context.Context { return context.WithValue(ctx, ctxKeyMetadata, mf) } // MetadataFuncFromContext extracts a MetataFunc from ctx. func MetadataFuncFromContext(ctx context.Context) (MetadataFunc, bool) { mf, ok := ctx.Value(ctxKeyMetadata).(MetadataFunc) return mf, ok } // MetricMetadata is a copy of MetricMetadata in Prometheus's scrape package. // It is copied to break a dependency cycle. type MetricMetadata struct { Metric string Type textparse.MetricType Help string Unit string } // MetadataFunc gets metadata for a specific metric name. type MetadataFunc func(metric string) (MetricMetadata, bool) func (e *Exporter) wrapMetadata(f MetadataFunc) MetadataFunc { // Metadata is nil for metrics ingested through recording or alerting rules. // Unless the rule literally does no processing at all, this always means the // resulting data is a gauge. // This makes it safe to assume a gauge type here in the absence of any other // metadata. // In the future we might want to propagate the rule definition and add it as // help text here to easily understand what produced the metric. if f == nil { f = gaugeMetadata } // Ensure that we always cover synthetic scrape metrics and in doubt fallback // to untyped metrics. The wrapping order is important! f = withScrapeMetricMetadata(f) f = e.withUntypedDefaultMetadata(f) return f } // gaugeMetadata is a MetadataFunc that always returns the gauge type. // Help and Unit are left empty. func gaugeMetadata(metric string) (MetricMetadata, bool) { return MetricMetadata{ Metric: metric, Type: textparse.MetricTypeGauge, }, true } // untypedMetadata is a MetadataFunc that always returns the untyped/unknown type. // Help and Unit are left empty. func untypedMetadata(metric string) (MetricMetadata, bool) { return MetricMetadata{ Metric: metric, Type: textparse.MetricTypeUnknown, }, true } // Metrics Prometheus writes at scrape time for which no metadata is exposed. var internalMetricMetadata = map[string]MetricMetadata{ "up": { Metric: "up", Type: textparse.MetricTypeGauge, Help: "Up indicates whether the last target scrape was successful.", }, "scrape_samples_scraped": { Metric: "scrape_samples_scraped", Type: textparse.MetricTypeGauge, Help: "How many samples were scraped during the last successful scrape.", }, "scrape_duration_seconds": { Metric: "scrape_duration_seconds", Type: textparse.MetricTypeGauge, Help: "Duration of the last scrape.", }, "scrape_samples_post_metric_relabeling": { Metric: "scrape_samples_post_metric_relabeling", Type: textparse.MetricTypeGauge, Help: "How many samples were ingested after relabeling.", }, "scrape_series_added": { Metric: "scrape_series_added", Type: textparse.MetricTypeGauge, Help: "Number of new series added in the last scrape.", }, } // withScrapeMetricMetadata wraps a MetadataFunc and additionally returns metadata // about Prometheues's synthetic scrape-time metrics. func withScrapeMetricMetadata(f MetadataFunc) MetadataFunc { return func(metric string) (MetricMetadata, bool) { md, ok := internalMetricMetadata[metric] if ok { return md, true } return f(metric) } } // withUntypedDefaultMetadata returns a MetadataFunc that returns the untyped // type, if no metadata is found through f. // It logs a warning once per metric name where a default to untyped happened // as this is generally undesirable. // // For Prometheus this primarily handles cases where metric relabeling is used to // create new metric names on the fly, for which no metadata is known. // This allows ingesting this data in a best-effort manner. func (e *Exporter) withUntypedDefaultMetadata(f MetadataFunc) MetadataFunc { return func(metric string) (MetricMetadata, bool) { md, ok := f(metric) if ok { return md, true } // The metric name may contain suffixes (_sum, _bucket, _count), which need to be stripped // to find the matching metadata. Before we can assume that not metadata exist, we've // to verify that the base name is not found either. // Our transformation logic applies the same lookup sequence. Without this step // we'd incorrectly return the untyped metadata for all those sub-series. if baseName, _, ok := splitMetricSuffix(metric); ok { if _, ok := f(baseName); ok { // There is metadata for the underlying metric, return false and let the // conversion logic do its thing. return MetricMetadata{}, false } } // We only log a message the first time for each metric. We check this against a global cache // as the total number of unique observed names is generally negligible. e.mtx.Lock() defer e.mtx.Unlock() if _, ok := e.warnedUntypedMetrics[metric]; !ok { //nolint:errcheck level.Warn(e.logger).Log("msg", "no metadata found, defaulting to untyped metric", "metric_name", metric) e.warnedUntypedMetrics[metric] = struct{}{} } return untypedMetadata(metric) } } // batch accumulates a batch of samples to be sent to GCM. Once the batch is full // it must be sent and cannot be used anymore after that. type batch struct { logger log.Logger maxSize uint m map[string][]*monitoring_pb.TimeSeries shards []*shard oneFull bool total int } func newBatch(logger log.Logger, shardsCount uint, maxSize uint) *batch { if logger == nil { logger = log.NewNopLogger() } return &batch{ logger: logger, maxSize: maxSize, m: make(map[string][]*monitoring_pb.TimeSeries, 1), shards: make([]*shard, 0, shardsCount/2), } } func (b *batch) addShard(s *shard) { b.shards = append(b.shards, s) } // add a new sample to the batch. Must only be called after full() returned false. func (b *batch) add(s *monitoring_pb.TimeSeries) { pid := s.Resource.Labels[KeyProjectID] l, ok := b.m[pid] if !ok { l = make([]*monitoring_pb.TimeSeries, 0, b.maxSize) } l = append(l, s) b.m[pid] = l if len(l) == cap(l) { b.oneFull = true } b.total++ } // full returns whether the batch is full. Being full means that add() must not be called again // and it guarantees that at most one request per project with at most maxSize samples is made. func (b *batch) full() bool { // We determine that a batch is full if at least one project's batch is full. // // TODO(freinartz): We could add further conditions here like the total number projects or samples so we don't // accumulate too many requests that block the shards that contributed to the batch. // However, this may in turn result in too many small requests in flight. // Another option is to limit the number of shards contributing to a single batch. return b.oneFull } // empty returns true if the batch contains no samples. func (b *batch) empty() bool { return b.total == 0 } // send the accumulated samples to their respective projects. It returns once all // requests have completed and notifies the pending shards. func (b *batch) send( ctx context.Context, sendOne func(context.Context, *monitoring_pb.CreateTimeSeriesRequest, ...gax.CallOption) error, ) { // Set timeout so slow requests in the batch do not block overall progress indefinitely. sendCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() projectsPerBatch.Observe(float64(len(b.m))) var wg sync.WaitGroup for pid, l := range b.m { wg.Add(1) go func(pid string, l []*monitoring_pb.TimeSeries) { defer wg.Done() pendingRequests.Inc() defer pendingRequests.Dec() samplesPerRPCBatch.Observe(float64(len(l))) // We do not retry any requests due to the risk of producing a backlog // that cannot be worked down, especially if large amounts of clients try to do so. err := sendOne(sendCtx, &monitoring_pb.CreateTimeSeriesRequest{ Name: fmt.Sprintf("projects/%s", pid), TimeSeries: l, }) if err != nil { //nolint:errcheck level.Error(b.logger).Log("msg", "send batch", "size", len(l), "err", err) samplesSendErrors.WithLabelValues(pid).Inc() } samplesSent.Add(float64(len(l))) }(pid, l) } wg.Wait() for _, s := range b.shards { s.notifyDone() } } // Matchers holds a list of metric selectors that can be set as a flag. type Matchers []labels.Selector func (m *Matchers) String() string { return fmt.Sprintf("%v", []labels.Selector(*m)) } func (m *Matchers) Set(s string) error { if s == "" { return nil } ms, err := parser.ParseMetricSelector(s) if err != nil { return fmt.Errorf("invalid metric matcher %q: %w", s, err) } *m = append(*m, ms) return nil } func (m *Matchers) IsCumulative() bool { return true } func (m *Matchers) Matches(lset labels.Labels) bool { if len(*m) == 0 { return true } for _, sel := range *m { if sel.Matches(lset) { return true } } return false }