exporter/datadogexporter/factory.go (524 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package datadogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter"
import (
"context"
"fmt"
"runtime"
"sync"
"time"
"github.com/DataDog/datadog-agent/comp/otelcol/logsagentpipeline"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/serializerexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/metricsclient"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/trace/agent"
"github.com/DataDog/datadog-agent/pkg/trace/telemetry"
"github.com/DataDog/datadog-agent/pkg/trace/timing"
"github.com/DataDog/datadog-agent/pkg/trace/writer"
"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/clientutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/hostmetadata"
datadogconfig "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog/config"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
)
var logsAgentExporterFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.datadogexporter.UseLogsAgentExporter",
featuregate.StageBeta,
featuregate.WithRegisterDescription("When enabled, datadogexporter uses the Datadog agent logs pipeline for exporting logs."),
featuregate.WithRegisterFromVersion("v0.100.0"),
)
var metricExportNativeClientFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.datadogexporter.metricexportnativeclient",
featuregate.StageBeta,
featuregate.WithRegisterDescription("When enabled, metric export in datadogexporter uses native Datadog client APIs instead of Zorkian APIs."),
)
// noAPMStatsFeatureGate causes the trace consumer to skip APM stats computation.
var noAPMStatsFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.datadogexporter.DisableAPMStats",
featuregate.StageBeta,
featuregate.WithRegisterDescription("Datadog Exporter will not compute APM Stats"),
)
var metricExportSerializerClientFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.datadogexporter.metricexportserializerclient",
featuregate.StageBeta,
featuregate.WithRegisterDescription("When enabled, metric export in datadogexporter uses the serializer exporter from the Datadog Agent."),
)
// isMetricExportV2Enabled returns true if metric export in datadogexporter uses native Datadog client APIs, false if it uses Zorkian APIs
func isMetricExportV2Enabled() bool {
return metricExportNativeClientFeatureGate.IsEnabled()
}
func isLogsAgentExporterEnabled() bool {
return logsAgentExporterFeatureGate.IsEnabled()
}
func isMetricExportSerializerEnabled() bool {
return metricExportSerializerClientFeatureGate.IsEnabled()
}
// enableNativeMetricExport switches metric export to call native Datadog APIs instead of Zorkian APIs.
func enableNativeMetricExport() error {
if err := featuregate.GlobalRegistry().Set(metricExportSerializerClientFeatureGate.ID(), false); err != nil {
return err
}
return featuregate.GlobalRegistry().Set(metricExportNativeClientFeatureGate.ID(), true)
}
// enableZorkianMetricExport switches metric export to call Zorkian APIs instead of native Datadog APIs.
func enableZorkianMetricExport() error {
if err := featuregate.GlobalRegistry().Set(metricExportSerializerClientFeatureGate.ID(), false); err != nil {
return err
}
return featuregate.GlobalRegistry().Set(metricExportNativeClientFeatureGate.ID(), false)
}
func consumeResource(metadataReporter *inframetadata.Reporter, res pcommon.Resource, logger *zap.Logger) {
if err := metadataReporter.ConsumeResource(res); err != nil {
logger.Warn("failed to consume resource for host metadata", zap.Error(err), zap.Any("resource", res))
}
}
func enableMetricExportSerializer() error {
return featuregate.GlobalRegistry().Set(metricExportSerializerClientFeatureGate.ID(), true)
}
type factory struct {
onceMetadata sync.Once
onceProvider sync.Once
sourceProvider source.Provider
providerErr error
onceReporter sync.Once
onceStopReporter sync.Once
reporter *inframetadata.Reporter
reporterErr error
onceAttributesTranslator sync.Once
attributesTranslator *attributes.Translator
attributesErr error
registry *featuregate.Registry
gatewayUsage *attributes.GatewayUsage
}
func (f *factory) SourceProvider(set component.TelemetrySettings, configHostname string, timeout time.Duration) (source.Provider, error) {
f.onceProvider.Do(func() {
f.sourceProvider, f.providerErr = hostmetadata.GetSourceProvider(set, configHostname, timeout)
})
return f.sourceProvider, f.providerErr
}
func (f *factory) AttributesTranslator(set component.TelemetrySettings) (*attributes.Translator, error) {
f.onceAttributesTranslator.Do(func() {
f.attributesTranslator, f.attributesErr = attributes.NewTranslator(set)
})
return f.attributesTranslator, f.attributesErr
}
// Reporter builds and returns an *inframetadata.Reporter.
func (f *factory) Reporter(params exporter.Settings, pcfg hostmetadata.PusherConfig) (*inframetadata.Reporter, error) {
f.onceReporter.Do(func() {
pusher := hostmetadata.NewPusher(params, pcfg)
f.reporter, f.reporterErr = inframetadata.NewReporter(params.Logger, pusher, pcfg.ReporterPeriod)
if f.reporterErr == nil {
go func() {
if err := f.reporter.Run(context.Background()); err != nil {
params.Logger.Error("Host metadata reporter failed at runtime", zap.Error(err))
}
}()
}
})
return f.reporter, f.reporterErr
}
// StopReporter stops the host metadata reporter.
func (f *factory) StopReporter() {
// Use onceReporter or wait until it is done
f.onceReporter.Do(func() {})
// Stop the reporter
f.onceStopReporter.Do(func() {
if f.reporterErr == nil && f.reporter != nil {
f.reporter.Stop()
}
})
}
func (f *factory) TraceAgent(ctx context.Context, wg *sync.WaitGroup, params exporter.Settings, cfg *datadogconfig.Config, sourceProvider source.Provider, attrsTranslator *attributes.Translator) (*agent.Agent, error) {
agnt, err := newTraceAgent(ctx, params, cfg, sourceProvider, metricsclient.InitializeMetricClient(params.MeterProvider, metricsclient.ExporterSourceTag), attrsTranslator)
if err != nil {
return nil, err
}
wg.Add(1)
go func() {
defer wg.Done()
agnt.Run()
}()
return agnt, nil
}
func newFactoryWithRegistry(registry *featuregate.Registry) exporter.Factory {
f := &factory{registry: registry, gatewayUsage: attributes.NewGatewayUsage()}
return exporter.NewFactory(
metadata.Type,
f.createDefaultConfig,
exporter.WithMetrics(f.createMetricsExporter, metadata.MetricsStability),
exporter.WithTraces(f.createTracesExporter, metadata.TracesStability),
exporter.WithLogs(f.createLogsExporter, metadata.LogsStability),
)
}
// NewFactory creates a Datadog exporter factory
func NewFactory() exporter.Factory {
return newFactoryWithRegistry(featuregate.GlobalRegistry())
}
func defaultClientConfig() confighttp.ClientConfig {
client := confighttp.NewDefaultClientConfig()
client.Timeout = 15 * time.Second
return client
}
// createDefaultConfig creates the default exporter configuration
func (f *factory) createDefaultConfig() component.Config {
return datadogconfig.CreateDefaultConfig()
}
// checkAndCastConfig checks the configuration type and its warnings, and casts it to
// the Datadog Config struct.
func checkAndCastConfig(c component.Config, logger *zap.Logger) *datadogconfig.Config {
cfg, ok := c.(*datadogconfig.Config)
if !ok {
panic("programming error: config structure is not of type *datadogconfig.Config")
}
cfg.LogWarnings(logger)
return cfg
}
func (f *factory) consumeStatsPayload(ctx context.Context, wg *sync.WaitGroup, statsIn <-chan []byte, statsWriter *writer.DatadogStatsWriter, tracerVersion string, agentVersion string, logger *zap.Logger) {
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case msg := <-statsIn:
sp := &pb.StatsPayload{}
err := proto.Unmarshal(msg, sp)
if err != nil {
logger.Error("failed to unmarshal stats payload", zap.Error(err))
continue
}
for _, csp := range sp.Stats {
if csp.TracerVersion == "" {
csp.TracerVersion = tracerVersion
}
}
// The DD Connector doesn't set the agent version, so we'll set it here
sp.AgentVersion = agentVersion
statsWriter.Write(sp)
}
}
}()
}
}
// createMetricsExporter creates a metrics exporter based on this config.
func (f *factory) createMetricsExporter(
ctx context.Context,
set exporter.Settings,
c component.Config,
) (exporter.Metrics, error) {
cfg := checkAndCastConfig(c, set.Logger)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.GetSourceTimeout())
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
// cancel() runs on shutdown
ctx, cancel := context.WithCancel(ctx)
attrsTranslator, err := f.AttributesTranslator(set.TelemetrySettings)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build attributes translator: %w", err)
}
var wg sync.WaitGroup // waits for consumeStatsPayload to exit
acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider, attrsTranslator)
if err != nil {
cancel()
return nil, err
}
metricsClient := metricsclient.InitializeMetricClient(set.MeterProvider, metricsclient.ExporterSourceTag)
timingReporter := timing.New(metricsClient)
statsWriter := writer.NewStatsWriter(acfg, telemetry.NewNoopCollector(), metricsClient, timingReporter)
set.Logger.Debug("Starting Datadog Trace-Agent StatsWriter")
go statsWriter.Run()
statsIn := make(chan []byte, 1000)
statsv := set.BuildInfo.Command + set.BuildInfo.Version
f.consumeStatsPayload(ctx, &wg, statsIn, statsWriter, statsv, acfg.AgentVersion, set.Logger)
var pushMetricsFn consumer.ConsumeMetricsFunc
pcfg := newMetadataConfigfromConfig(cfg)
// Don't start a `Reporter` if host metadata is disabled.
var metadataReporter *inframetadata.Reporter
if cfg.HostMetadata.Enabled {
metadataReporter, err = f.Reporter(set, pcfg)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build host metadata reporter: %w", err)
}
}
switch {
case cfg.OnlyMetadata:
pushMetricsFn = func(_ context.Context, md pmetric.Metrics) error {
// only sending metadata use only metrics
f.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
if md.ResourceMetrics().Len() > 0 {
attrs = md.ResourceMetrics().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})
// Consume resources for host metadata
for i := 0; i < md.ResourceMetrics().Len(); i++ {
res := md.ResourceMetrics().At(i).Resource()
consumeResource(metadataReporter, res, set.Logger)
}
return nil
}
case isMetricExportSerializerEnabled():
errchan := make(chan error)
apiClient := clientutil.CreateAPIClient(
set.BuildInfo,
cfg.Metrics.Endpoint,
cfg.ClientConfig)
go func() { errchan <- clientutil.ValidateAPIKey(ctx, string(cfg.API.Key), set.Logger, apiClient) }()
if cfg.API.FailOnInvalidKey {
err = <-errchan
if err != nil {
cancel() // first cancel context
wg.Wait() // then wait for shutdown
return nil, err
}
}
// Start the hostmetadata pusher once.
// It sends the hostmetadata for the host where the collector is running.
if cfg.HostMetadata.Enabled {
if cfg.HostMetadata.HostnameSource == datadogconfig.HostnameSourceFirstResource {
set.Logger.Warn("first_resource has no effect when serializer exporter is used for exporting metrics")
}
f.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})
}
set.Logger.Info("Using Datadog serializerexporter for metric export")
sf := serializerexporter.NewFactoryForOSSExporter(metadata.Type, statsIn)
ex := &serializerexporter.ExporterConfig{
Metrics: serializerexporter.MetricsConfig{
Metrics: cfg.Metrics,
},
TimeoutConfig: exporterhelper.TimeoutConfig{
Timeout: cfg.Timeout,
},
QueueConfig: cfg.QueueSettings,
API: cfg.API,
HostProvider: func(ctx context.Context) (string, error) {
h, err2 := hostProvider.Source(ctx)
if err2 != nil {
return "", err2
}
return h.Identifier, nil
},
ShutdownFunc: func(context.Context) error {
cancel() // first cancel context
wg.Wait() // then wait for shutdown
f.StopReporter()
statsWriter.Stop()
if statsIn != nil {
close(statsIn)
}
return nil
},
HostMetadata: cfg.HostMetadata,
}
return sf.CreateMetrics(ctx, set, ex)
default:
exp, metricsErr := newMetricsExporter(ctx, set, cfg, acfg, &f.onceMetadata, attrsTranslator, hostProvider, metadataReporter, statsIn, f.gatewayUsage)
if metricsErr != nil {
cancel() // first cancel context
wg.Wait() // then wait for shutdown
return nil, metricsErr
}
pushMetricsFn = exp.PushMetricsDataScrubbed
}
exporter, err := exporterhelper.NewMetrics(
ctx,
set,
cfg,
pushMetricsFn,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0 * time.Second}),
// We use our own custom mechanism for retries, since we hit several endpoints.
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
// The metrics remapping code mutates data
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel() // first cancel context
wg.Wait() // then wait for shutdown
f.StopReporter()
statsWriter.Stop()
if statsIn != nil {
close(statsIn)
}
return nil
}),
)
if err != nil {
return nil, err
}
return resourcetotelemetry.WrapMetricsExporter(
resourcetotelemetry.Settings{Enabled: cfg.Metrics.ExporterConfig.ResourceAttributesAsTags}, exporter), nil
}
// createTracesExporter creates a trace exporter based on this config.
func (f *factory) createTracesExporter(
ctx context.Context,
set exporter.Settings,
c component.Config,
) (exporter.Traces, error) {
cfg := checkAndCastConfig(c, set.Logger)
if noAPMStatsFeatureGate.IsEnabled() {
set.Logger.Info(
"Trace metrics are now disabled in the Datadog Exporter by default. To continue receiving Trace Metrics, configure the Datadog Connector or disable the feature gate.",
zap.String("documentation", "https://docs.datadoghq.com/opentelemetry/guide/migration/"),
zap.String("feature gate ID", noAPMStatsFeatureGate.ID()),
)
}
var (
pusher consumer.ConsumeTracesFunc
stop component.ShutdownFunc
wg sync.WaitGroup // waits for agent to exit
)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.GetSourceTimeout())
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
// cancel() runs on shutdown
attrsTranslator, err := f.AttributesTranslator(set.TelemetrySettings)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build attributes translator: %w", err)
}
traceagent, err := f.TraceAgent(ctx, &wg, set, cfg, hostProvider, attrsTranslator)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to start trace-agent: %w", err)
}
pcfg := newMetadataConfigfromConfig(cfg)
// Don't start a `Reporter` if host metadata is disabled.
var metadataReporter *inframetadata.Reporter
if cfg.HostMetadata.Enabled {
metadataReporter, err = f.Reporter(set, pcfg)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build host metadata reporter: %w", err)
}
}
if cfg.OnlyMetadata {
// only host metadata needs to be sent, once.
pusher = func(_ context.Context, td ptrace.Traces) error {
f.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
if td.ResourceSpans().Len() > 0 {
attrs = td.ResourceSpans().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})
// Consume resources for host metadata
for i := 0; i < td.ResourceSpans().Len(); i++ {
res := td.ResourceSpans().At(i).Resource()
consumeResource(metadataReporter, res, set.Logger)
}
return nil
}
stop = func(context.Context) error {
cancel()
f.StopReporter()
return nil
}
} else {
tracex, err2 := newTracesExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, traceagent, metadataReporter, f.gatewayUsage)
if err2 != nil {
cancel()
wg.Wait() // then wait for shutdown
return nil, err2
}
pusher = tracex.consumeTraces
stop = func(context.Context) error {
cancel() // first cancel context
f.StopReporter()
return nil
}
}
return exporterhelper.NewTraces(
ctx,
set,
cfg,
pusher,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0 * time.Second}),
// We don't do retries on traces because of deduping concerns on APM Events.
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(stop),
)
}
// createLogsExporter creates a logs exporter based on the config.
func (f *factory) createLogsExporter(
ctx context.Context,
set exporter.Settings,
c component.Config,
) (exporter.Logs, error) {
cfg := checkAndCastConfig(c, set.Logger)
if cfg.Logs.DumpPayloads && isLogsAgentExporterEnabled() {
set.Logger.Warn("logs::dump_payloads is not valid when the exporter.datadogexporter.UseLogsAgentExporter feature gate is enabled")
}
if cfg.Logs.UseCompression && !isLogsAgentExporterEnabled() {
set.Logger.Warn("logs::use_compression is not valid when the exporter.datadogexporter.UseLogsAgentExporter feature gate is disabled")
}
if cfg.Logs.CompressionLevel != 0 && !isLogsAgentExporterEnabled() {
set.Logger.Warn("logs::compression_level is not valid when the exporter.datadogexporter.UseLogsAgentExporter feature gate is disabled")
}
if cfg.Logs.BatchWait != 0 && !isLogsAgentExporterEnabled() {
set.Logger.Warn("logs::batch_wait is not valid when the exporter.datadogexporter.UseLogsAgentExporter feature gate is disabled")
}
var pusher consumer.ConsumeLogsFunc
var logsAgent logsagentpipeline.LogsAgent
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.GetSourceTimeout())
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
// cancel() runs on shutdown
pcfg := newMetadataConfigfromConfig(cfg)
// Don't start a `Reporter` if host metadata is disabled.
var metadataReporter *inframetadata.Reporter
if cfg.HostMetadata.Enabled {
metadataReporter, err = f.Reporter(set, pcfg)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build host metadata reporter: %w", err)
}
}
attributesTranslator, err := f.AttributesTranslator(set.TelemetrySettings)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build attributes translator: %w", err)
}
switch {
case cfg.OnlyMetadata:
// only host metadata needs to be sent, once.
pusher = func(_ context.Context, td plog.Logs) error {
f.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})
for i := 0; i < td.ResourceLogs().Len(); i++ {
res := td.ResourceLogs().At(i).Resource()
consumeResource(metadataReporter, res, set.Logger)
}
return nil
}
case isLogsAgentExporterEnabled():
la, exp, err := newLogsAgentExporter(ctx, set, cfg, hostProvider, f.gatewayUsage)
if err != nil {
cancel()
return nil, err
}
logsAgent = la
pusher = exp.ConsumeLogs
default:
exp, err := newLogsExporter(ctx, set, cfg, &f.onceMetadata, attributesTranslator, hostProvider, metadataReporter, f.gatewayUsage)
if err != nil {
cancel()
return nil, err
}
pusher = exp.consumeLogs
}
return exporterhelper.NewLogs(
ctx,
set,
cfg,
pusher,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0 * time.Second}),
exporterhelper.WithRetry(cfg.BackOffConfig),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel()
f.StopReporter()
if logsAgent != nil {
return logsAgent.Stop(ctx)
}
return nil
}),
)
}