exporter/datadogexporter/metrics_exporter.go (210 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package datadogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter" import ( "bytes" "context" "errors" "fmt" "io" "net/http" "sync" "time" "github.com/DataDog/datadog-agent/pkg/trace/config" "github.com/DataDog/datadog-api-client-go/v2/api/datadogV2" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" otlpmetrics "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" zorkian "gopkg.in/zorkian/go-datadog-api.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics/sketches" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/clientutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/hostmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/scrub" pkgdatadog "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog" datadogconfig "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog/config" ) type metricsExporter struct { params exporter.Settings cfg *datadogconfig.Config agntConfig *config.AgentConfig ctx context.Context client *zorkian.Client metricsAPI *datadogV2.MetricsApi tr *otlpmetrics.Translator scrubber scrub.Scrubber retrier *clientutil.Retrier onceMetadata *sync.Once sourceProvider source.Provider metadataReporter *inframetadata.Reporter // getPushTime returns a Unix time in nanoseconds, representing the time pushing metrics. // It will be overwritten in tests. getPushTime func() uint64 gatewayUsage *attributes.GatewayUsage } func newMetricsExporter( ctx context.Context, params exporter.Settings, cfg *datadogconfig.Config, agntConfig *config.AgentConfig, onceMetadata *sync.Once, attrsTranslator *attributes.Translator, sourceProvider source.Provider, metadataReporter *inframetadata.Reporter, statsOut chan []byte, gatewayUsage *attributes.GatewayUsage, ) (*metricsExporter, error) { options := cfg.Metrics.ToTranslatorOpts() options = append(options, otlpmetrics.WithFallbackSourceProvider(sourceProvider)) options = append(options, otlpmetrics.WithStatsOut(statsOut)) if pkgdatadog.MetricRemappingDisabledFeatureGate.IsEnabled() { params.Logger.Warn("Metric remapping is disabled in the Datadog exporter. OpenTelemetry metrics must be mapped to Datadog semantics before metrics are exported to Datadog (ex: via a processor).") } else { options = append(options, otlpmetrics.WithRemapping()) } tr, err := otlpmetrics.NewTranslator(params.TelemetrySettings, attrsTranslator, options...) if err != nil { return nil, err } scrubber := scrub.NewScrubber() exporter := &metricsExporter{ params: params, cfg: cfg, ctx: ctx, agntConfig: agntConfig, tr: tr, scrubber: scrubber, retrier: clientutil.NewRetrier(params.Logger, cfg.BackOffConfig, scrubber), onceMetadata: onceMetadata, sourceProvider: sourceProvider, getPushTime: func() uint64 { return uint64(time.Now().UTC().UnixNano()) }, metadataReporter: metadataReporter, gatewayUsage: gatewayUsage, } errchan := make(chan error) if isMetricExportV2Enabled() { apiClient := clientutil.CreateAPIClient( params.BuildInfo, cfg.Metrics.Endpoint, cfg.ClientConfig) go func() { errchan <- clientutil.ValidateAPIKey(ctx, string(cfg.API.Key), params.Logger, apiClient) }() exporter.metricsAPI = datadogV2.NewMetricsApi(apiClient) } else { client := clientutil.CreateZorkianClient(string(cfg.API.Key), cfg.Metrics.Endpoint) client.ExtraHeader["User-Agent"] = clientutil.UserAgent(params.BuildInfo) client.HttpClient = clientutil.NewHTTPClient(cfg.ClientConfig) go func() { errchan <- clientutil.ValidateAPIKeyZorkian(params.Logger, client) }() exporter.client = client } if cfg.API.FailOnInvalidKey { err = <-errchan if err != nil { return nil, err } } return exporter, nil } func (exp *metricsExporter) pushSketches(ctx context.Context, sl sketches.SketchSeriesList) error { payload, err := sl.Marshal() if err != nil { return fmt.Errorf("failed to marshal sketches: %w", err) } req, err := http.NewRequestWithContext(ctx, http.MethodPost, exp.cfg.Metrics.Endpoint+sketches.SketchSeriesEndpoint, bytes.NewBuffer(payload), ) if err != nil { return fmt.Errorf("failed to build sketches HTTP request: %w", err) } clientutil.SetDDHeaders(req.Header, exp.params.BuildInfo, string(exp.cfg.API.Key)) clientutil.SetExtraHeaders(req.Header, clientutil.ProtobufHeaders) var resp *http.Response if isMetricExportV2Enabled() { resp, err = exp.metricsAPI.Client.Cfg.HTTPClient.Do(req) } else { resp, err = exp.client.HttpClient.Do(req) } if err != nil { return clientutil.WrapError(fmt.Errorf("failed to do sketches HTTP request: %w", err), resp) } defer resp.Body.Close() // We must read the full response body from the http request to ensure that connections can be // properly re-used. https://pkg.go.dev/net/http#Client.Do _, err = io.Copy(io.Discard, resp.Body) if err != nil { return clientutil.WrapError(fmt.Errorf("failed to read response body from sketches HTTP request: %w", err), resp) } if resp.StatusCode >= 400 { return clientutil.WrapError(fmt.Errorf("error when sending payload to %s: %s", sketches.SketchSeriesEndpoint, resp.Status), resp) } return nil } func (exp *metricsExporter) PushMetricsDataScrubbed(ctx context.Context, md pmetric.Metrics) error { return exp.scrubber.Scrub(exp.PushMetricsData(ctx, md)) } func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pmetric.Metrics) error { if exp.cfg.HostMetadata.Enabled { // Start host metadata with resource attributes from // the first payload. exp.onceMetadata.Do(func() { attrs := pcommon.NewMap() if md.ResourceMetrics().Len() > 0 { attrs = md.ResourceMetrics().At(0).Resource().Attributes() } go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs, exp.metadataReporter) }) // Consume resources for host metadata for i := 0; i < md.ResourceMetrics().Len(); i++ { res := md.ResourceMetrics().At(i).Resource() consumeResource(exp.metadataReporter, res, exp.params.Logger) } } var consumer otlpmetrics.Consumer if isMetricExportV2Enabled() { consumer = metrics.NewConsumer(exp.gatewayUsage) } else { consumer = metrics.NewZorkianConsumer() } metadata, err := exp.tr.MapMetrics(ctx, md, consumer, exp.gatewayUsage) if err != nil { return fmt.Errorf("failed to map metrics: %w", err) } src, err := exp.sourceProvider.Source(ctx) if err != nil { return err } var tags []string if src.Kind == source.AWSECSFargateKind { tags = append(tags, exp.cfg.HostMetadata.Tags...) } var sl sketches.SketchSeriesList var errs []error if isMetricExportV2Enabled() { var ms []datadogV2.MetricSeries ms, sl = consumer.(*metrics.Consumer).All(exp.getPushTime(), exp.params.BuildInfo, tags, metadata) if len(ms) > 0 { exp.params.Logger.Debug("exporting native Datadog payload", zap.Any("metric", ms)) _, experr := exp.retrier.DoWithRetries(ctx, func(context.Context) error { ctx = clientutil.GetRequestContext(ctx, string(exp.cfg.API.Key)) _, httpresp, merr := exp.metricsAPI.SubmitMetrics(ctx, datadogV2.MetricPayload{Series: ms}, *clientutil.GZipSubmitMetricsOptionalParameters) return clientutil.WrapError(merr, httpresp) }) errs = append(errs, experr) } } else { var ms []zorkian.Metric ms, sl = consumer.(*metrics.ZorkianConsumer).All(exp.getPushTime(), exp.params.BuildInfo, tags) if len(ms) > 0 { exp.params.Logger.Debug("exporting Zorkian Datadog payload", zap.Any("metric", ms)) _, experr := exp.retrier.DoWithRetries(ctx, func(context.Context) error { return exp.client.PostMetrics(ms) }) errs = append(errs, experr) } } if len(sl) > 0 { exp.params.Logger.Debug("exporting sketches payload", zap.Any("sketches", sl)) _, experr := exp.retrier.DoWithRetries(ctx, func(ctx context.Context) error { return exp.pushSketches(ctx, sl) }) errs = append(errs, experr) } return errors.Join(errs...) }