exporter/prometheusremotewriteexporter/exporter.go (329 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter" import ( "bytes" "context" "errors" "fmt" "io" "math" "net/http" "net/url" "strings" "sync" "github.com/cenkalti/backoff/v4" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter/internal/metadata" prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" ) type prwTelemetry interface { recordTranslationFailure(ctx context.Context) recordTranslatedTimeSeries(ctx context.Context, numTS int) recordRemoteWriteSentBatch(ctx context.Context) setNumberConsumer(ctx context.Context, n int64) } type prwTelemetryOtel struct { telemetryBuilder *metadata.TelemetryBuilder otelAttrs []attribute.KeyValue } func (p *prwTelemetryOtel) setNumberConsumer(ctx context.Context, n int64) { p.telemetryBuilder.ExporterPrometheusremotewriteConsumers.Add(ctx, n, metric.WithAttributes(p.otelAttrs...)) } func (p *prwTelemetryOtel) recordRemoteWriteSentBatch(ctx context.Context) { p.telemetryBuilder.ExporterPrometheusremotewriteSentBatches.Add(ctx, 1, metric.WithAttributes(p.otelAttrs...)) } func (p *prwTelemetryOtel) recordTranslationFailure(ctx context.Context) { p.telemetryBuilder.ExporterPrometheusremotewriteFailedTranslations.Add(ctx, 1, metric.WithAttributes(p.otelAttrs...)) } func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS int) { p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...)) } type buffer struct { protobuf *proto.Buffer snappy []byte } // A reusable buffer pool for serializing protobufs and compressing them with Snappy. var bufferPool = sync.Pool{ New: func() any { return &buffer{ protobuf: proto.NewBuffer(nil), snappy: nil, } }, } // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type prwExporter struct { endpointURL *url.URL client *http.Client wg *sync.WaitGroup closeChan chan struct{} concurrency int userAgentHeader string maxBatchSizeBytes int clientSettings *confighttp.ClientConfig settings component.TelemetrySettings retrySettings configretry.BackOffConfig retryOnHTTP429 bool wal *prweWAL exporterSettings prometheusremotewrite.Settings telemetry prwTelemetry // When concurrency is enabled, concurrent goroutines would potentially // fight over the same batchState object. To avoid this, we use a pool // to provide each goroutine with its own state. batchStatePool sync.Pool } func newPRWTelemetry(set exporter.Settings, endpointURL *url.URL) (prwTelemetry, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) if err != nil { return nil, err } return &prwTelemetryOtel{ telemetryBuilder: telemetryBuilder, otelAttrs: []attribute.KeyValue{ attribute.String("exporter", set.ID.String()), attribute.String("endpoint", endpointURL.String()), }, }, nil } // newPRWExporter initializes a new prwExporter instance and sets fields accordingly. func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) { sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg) if err != nil { return nil, err } endpointURL, err := url.ParseRequestURI(cfg.ClientConfig.Endpoint) if err != nil { return nil, errors.New("invalid endpoint") } telemetry, err := newPRWTelemetry(set, endpointURL) if err != nil { return nil, err } userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version) concurrency := 5 if !enableMultipleWorkersFeatureGate.IsEnabled() { concurrency = cfg.RemoteWriteQueue.NumConsumers } if cfg.MaxBatchRequestParallelism != nil { concurrency = *cfg.MaxBatchRequestParallelism } // Set the desired number of consumers as a metric for the exporter. telemetry.setNumberConsumer(context.Background(), int64(concurrency)) prwe := &prwExporter{ endpointURL: endpointURL, wg: new(sync.WaitGroup), closeChan: make(chan struct{}), userAgentHeader: userAgentHeader, maxBatchSizeBytes: cfg.MaxBatchSizeBytes, concurrency: concurrency, clientSettings: &cfg.ClientConfig, settings: set.TelemetrySettings, retrySettings: cfg.BackOffConfig, retryOnHTTP429: retryOn429FeatureGate.IsEnabled(), exporterSettings: prometheusremotewrite.Settings{ Namespace: cfg.Namespace, ExternalLabels: sanitizedLabels, DisableTargetInfo: !cfg.TargetInfo.Enabled, AddMetricSuffixes: cfg.AddMetricSuffixes, SendMetadata: cfg.SendMetadata, }, telemetry: telemetry, batchStatePool: sync.Pool{New: func() any { return newBatchTimeServicesState() }}, } prwe.wal = newWAL(cfg.WAL, prwe.export) return prwe, nil } // Start creates the prometheus client func (prwe *prwExporter) Start(ctx context.Context, host component.Host) (err error) { prwe.client, err = prwe.clientSettings.ToClient(ctx, host, prwe.settings) if err != nil { return err } return prwe.turnOnWALIfEnabled(contextWithLogger(ctx, prwe.settings.Logger.Named("prw.wal"))) } func (prwe *prwExporter) shutdownWALIfEnabled() error { if !prwe.walEnabled() { return nil } return prwe.wal.stop() } // Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations // to finish before returning func (prwe *prwExporter) Shutdown(context.Context) error { select { case <-prwe.closeChan: default: close(prwe.closeChan) } err := prwe.shutdownWALIfEnabled() prwe.wg.Wait() return err } // PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of // TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally // exports the map. func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) error { prwe.wg.Add(1) defer prwe.wg.Done() select { case <-prwe.closeChan: return errors.New("shutdown has been called") default: tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) if err != nil { prwe.telemetry.recordTranslationFailure(ctx) prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap))) } prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap)) var m []*prompb.MetricMetadata if prwe.exporterSettings.SendMetadata { m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes) } // Call export even if a conversion error, since there may be points that were successfully converted. return prwe.handleExport(ctx, tsMap, m) } } func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) { sanitizedLabels := make(map[string]string) for key, value := range cfg.ExternalLabels { if key == "" || value == "" { return nil, fmt.Errorf("prometheus remote write: external labels configuration contains an empty key or value") } sanitizedLabels[prometheustranslator.NormalizeLabel(key)] = value } return sanitizedLabels, nil } func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error { // There are no metrics to export, so return. if len(tsMap) == 0 { return nil } state := prwe.batchStatePool.Get().(*batchTimeSeriesState) defer prwe.batchStatePool.Put(state) // Calls the helper function to convert and batch the TsMap to the desired format requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, state) if err != nil { return err } if !prwe.walEnabled() { // Perform a direct export otherwise. return prwe.export(ctx, requests) } // Otherwise the WAL is enabled, and just persist the requests to the WAL // and they'll be exported in another goroutine to the RemoteWrite endpoint. if err = prwe.wal.persistToWAL(requests); err != nil { return consumererror.NewPermanent(err) } return nil } // export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteRequest) error { input := make(chan *prompb.WriteRequest, len(requests)) for _, request := range requests { input <- request } close(input) var wg sync.WaitGroup concurrencyLimit := int(math.Min(float64(prwe.concurrency), float64(len(requests)))) wg.Add(concurrencyLimit) // used to wait for workers to be finished var mu sync.Mutex var errs error // Run concurrencyLimit of workers until there // is no more requests to execute in the input channel. for i := 0; i < concurrencyLimit; i++ { go func() { defer wg.Done() for { select { case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled. return case request, ok := <-input: if !ok { return } if errExecute := prwe.execute(ctx, request); errExecute != nil { mu.Lock() errs = multierr.Append(errs, consumererror.NewPermanent(errExecute)) mu.Unlock() } } } }() } wg.Wait() return errs } func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error { buf := bufferPool.Get().(*buffer) buf.protobuf.Reset() defer bufferPool.Put(buf) // Uses proto.Marshal to convert the WriteRequest into bytes array errMarshal := buf.protobuf.Marshal(writeReq) if errMarshal != nil { return consumererror.NewPermanent(errMarshal) } // If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer. // Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards. maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes())) if maxCompressedLen > len(buf.snappy) { if cap(buf.snappy) < maxCompressedLen { buf.snappy = make([]byte, maxCompressedLen) } else { buf.snappy = buf.snappy[:maxCompressedLen] } } compressedData := snappy.Encode(buf.snappy, buf.protobuf.Bytes()) // executeFunc can be used for backoff and non backoff scenarios. executeFunc := func() error { // check there was no timeout in the component level to avoid retries // to continue to run after a timeout select { case <-ctx.Done(): return backoff.Permanent(ctx.Err()) default: // continue } // Create the HTTP POST request to send to the endpoint req, err := http.NewRequestWithContext(ctx, http.MethodPost, prwe.endpointURL.String(), bytes.NewReader(compressedData)) if err != nil { return backoff.Permanent(consumererror.NewPermanent(err)) } // Add necessary headers specified by: // https://cortexmetrics.io/docs/apis/#remote-api req.Header.Add("Content-Encoding", "snappy") req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") req.Header.Set("User-Agent", prwe.userAgentHeader) resp, err := prwe.client.Do(req) prwe.telemetry.recordRemoteWriteSentBatch(ctx) if err != nil { return err } defer func() { _, _ = io.Copy(io.Discard, resp.Body) resp.Body.Close() }() // 2xx status code is considered a success // 5xx errors are recoverable and the exporter should retry // Reference for different behavior according to status code: // https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186 if resp.StatusCode >= 200 && resp.StatusCode < 300 { return nil } body, err := io.ReadAll(io.LimitReader(resp.Body, 256)) rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body) if resp.StatusCode >= 500 && resp.StatusCode < 600 { return rerr } // 429 errors are recoverable and the exporter should retry if RetryOnHTTP429 enabled // Reference: https://github.com/prometheus/prometheus/pull/12677 if prwe.retryOnHTTP429 && resp.StatusCode == http.StatusTooManyRequests { return rerr } return backoff.Permanent(consumererror.NewPermanent(rerr)) } var err error if prwe.retrySettings.Enabled { // Use the BackOff instance to retry the func with exponential backoff. err = backoff.Retry(executeFunc, &backoff.ExponentialBackOff{ InitialInterval: prwe.retrySettings.InitialInterval, RandomizationFactor: prwe.retrySettings.RandomizationFactor, Multiplier: prwe.retrySettings.Multiplier, MaxInterval: prwe.retrySettings.MaxInterval, MaxElapsedTime: prwe.retrySettings.MaxElapsedTime, Stop: backoff.Stop, Clock: backoff.SystemClock, }) } else { err = executeFunc() } if err != nil { return consumererror.NewPermanent(err) } return err } func (prwe *prwExporter) walEnabled() bool { return prwe.wal != nil } func (prwe *prwExporter) turnOnWALIfEnabled(ctx context.Context) error { if !prwe.walEnabled() { return nil } cancelCtx, cancel := context.WithCancel(ctx) go func() { <-prwe.closeChan cancel() }() return prwe.wal.run(cancelCtx) }