in exporter/datadogexporter/factory.go [256:420]
func (f *factory) createMetricsExporter(
ctx context.Context,
set exporter.Settings,
c component.Config,
) (exporter.Metrics, error) {
cfg := checkAndCastConfig(c, set.Logger)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.GetSourceTimeout())
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
// cancel() runs on shutdown
ctx, cancel := context.WithCancel(ctx)
attrsTranslator, err := f.AttributesTranslator(set.TelemetrySettings)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build attributes translator: %w", err)
}
var wg sync.WaitGroup // waits for consumeStatsPayload to exit
acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider, attrsTranslator)
if err != nil {
cancel()
return nil, err
}
metricsClient := metricsclient.InitializeMetricClient(set.MeterProvider, metricsclient.ExporterSourceTag)
timingReporter := timing.New(metricsClient)
statsWriter := writer.NewStatsWriter(acfg, telemetry.NewNoopCollector(), metricsClient, timingReporter)
set.Logger.Debug("Starting Datadog Trace-Agent StatsWriter")
go statsWriter.Run()
statsIn := make(chan []byte, 1000)
statsv := set.BuildInfo.Command + set.BuildInfo.Version
f.consumeStatsPayload(ctx, &wg, statsIn, statsWriter, statsv, acfg.AgentVersion, set.Logger)
var pushMetricsFn consumer.ConsumeMetricsFunc
pcfg := newMetadataConfigfromConfig(cfg)
// Don't start a `Reporter` if host metadata is disabled.
var metadataReporter *inframetadata.Reporter
if cfg.HostMetadata.Enabled {
metadataReporter, err = f.Reporter(set, pcfg)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build host metadata reporter: %w", err)
}
}
switch {
case cfg.OnlyMetadata:
pushMetricsFn = func(_ context.Context, md pmetric.Metrics) error {
// only sending metadata use only metrics
f.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
if md.ResourceMetrics().Len() > 0 {
attrs = md.ResourceMetrics().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})
// Consume resources for host metadata
for i := 0; i < md.ResourceMetrics().Len(); i++ {
res := md.ResourceMetrics().At(i).Resource()
consumeResource(metadataReporter, res, set.Logger)
}
return nil
}
case isMetricExportSerializerEnabled():
errchan := make(chan error)
apiClient := clientutil.CreateAPIClient(
set.BuildInfo,
cfg.Metrics.Endpoint,
cfg.ClientConfig)
go func() { errchan <- clientutil.ValidateAPIKey(ctx, string(cfg.API.Key), set.Logger, apiClient) }()
if cfg.API.FailOnInvalidKey {
err = <-errchan
if err != nil {
cancel() // first cancel context
wg.Wait() // then wait for shutdown
return nil, err
}
}
// Start the hostmetadata pusher once.
// It sends the hostmetadata for the host where the collector is running.
if cfg.HostMetadata.Enabled {
if cfg.HostMetadata.HostnameSource == datadogconfig.HostnameSourceFirstResource {
set.Logger.Warn("first_resource has no effect when serializer exporter is used for exporting metrics")
}
f.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})
}
set.Logger.Info("Using Datadog serializerexporter for metric export")
sf := serializerexporter.NewFactoryForOSSExporter(metadata.Type, statsIn)
ex := &serializerexporter.ExporterConfig{
Metrics: serializerexporter.MetricsConfig{
Metrics: cfg.Metrics,
},
TimeoutConfig: exporterhelper.TimeoutConfig{
Timeout: cfg.Timeout,
},
QueueConfig: cfg.QueueSettings,
API: cfg.API,
HostProvider: func(ctx context.Context) (string, error) {
h, err2 := hostProvider.Source(ctx)
if err2 != nil {
return "", err2
}
return h.Identifier, nil
},
ShutdownFunc: func(context.Context) error {
cancel() // first cancel context
wg.Wait() // then wait for shutdown
f.StopReporter()
statsWriter.Stop()
if statsIn != nil {
close(statsIn)
}
return nil
},
HostMetadata: cfg.HostMetadata,
}
return sf.CreateMetrics(ctx, set, ex)
default:
exp, metricsErr := newMetricsExporter(ctx, set, cfg, acfg, &f.onceMetadata, attrsTranslator, hostProvider, metadataReporter, statsIn, f.gatewayUsage)
if metricsErr != nil {
cancel() // first cancel context
wg.Wait() // then wait for shutdown
return nil, metricsErr
}
pushMetricsFn = exp.PushMetricsDataScrubbed
}
exporter, err := exporterhelper.NewMetrics(
ctx,
set,
cfg,
pushMetricsFn,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0 * time.Second}),
// We use our own custom mechanism for retries, since we hit several endpoints.
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
// The metrics remapping code mutates data
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel() // first cancel context
wg.Wait() // then wait for shutdown
f.StopReporter()
statsWriter.Stop()
if statsIn != nil {
close(statsIn)
}
return nil
}),
)
if err != nil {
return nil, err
}
return resourcetotelemetry.WrapMetricsExporter(
resourcetotelemetry.Settings{Enabled: cfg.Metrics.ExporterConfig.ResourceAttributesAsTags}, exporter), nil
}