func()

in exporter/datadogexporter/factory.go [256:420]


func (f *factory) createMetricsExporter(
	ctx context.Context,
	set exporter.Settings,
	c component.Config,
) (exporter.Metrics, error) {
	cfg := checkAndCastConfig(c, set.Logger)
	hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.GetSourceTimeout())
	if err != nil {
		return nil, fmt.Errorf("failed to build hostname provider: %w", err)
	}

	// cancel() runs on shutdown
	ctx, cancel := context.WithCancel(ctx)

	attrsTranslator, err := f.AttributesTranslator(set.TelemetrySettings)
	if err != nil {
		cancel()
		return nil, fmt.Errorf("failed to build attributes translator: %w", err)
	}
	var wg sync.WaitGroup // waits for consumeStatsPayload to exit

	acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider, attrsTranslator)
	if err != nil {
		cancel()
		return nil, err
	}
	metricsClient := metricsclient.InitializeMetricClient(set.MeterProvider, metricsclient.ExporterSourceTag)
	timingReporter := timing.New(metricsClient)
	statsWriter := writer.NewStatsWriter(acfg, telemetry.NewNoopCollector(), metricsClient, timingReporter)

	set.Logger.Debug("Starting Datadog Trace-Agent StatsWriter")
	go statsWriter.Run()

	statsIn := make(chan []byte, 1000)
	statsv := set.BuildInfo.Command + set.BuildInfo.Version
	f.consumeStatsPayload(ctx, &wg, statsIn, statsWriter, statsv, acfg.AgentVersion, set.Logger)

	var pushMetricsFn consumer.ConsumeMetricsFunc

	pcfg := newMetadataConfigfromConfig(cfg)
	// Don't start a `Reporter` if host metadata is disabled.
	var metadataReporter *inframetadata.Reporter
	if cfg.HostMetadata.Enabled {
		metadataReporter, err = f.Reporter(set, pcfg)
		if err != nil {
			cancel()
			return nil, fmt.Errorf("failed to build host metadata reporter: %w", err)
		}
	}

	switch {
	case cfg.OnlyMetadata:
		pushMetricsFn = func(_ context.Context, md pmetric.Metrics) error {
			// only sending metadata use only metrics
			f.onceMetadata.Do(func() {
				attrs := pcommon.NewMap()
				if md.ResourceMetrics().Len() > 0 {
					attrs = md.ResourceMetrics().At(0).Resource().Attributes()
				}
				go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
			})

			// Consume resources for host metadata
			for i := 0; i < md.ResourceMetrics().Len(); i++ {
				res := md.ResourceMetrics().At(i).Resource()
				consumeResource(metadataReporter, res, set.Logger)
			}
			return nil
		}
	case isMetricExportSerializerEnabled():
		errchan := make(chan error)
		apiClient := clientutil.CreateAPIClient(
			set.BuildInfo,
			cfg.Metrics.Endpoint,
			cfg.ClientConfig)
		go func() { errchan <- clientutil.ValidateAPIKey(ctx, string(cfg.API.Key), set.Logger, apiClient) }()
		if cfg.API.FailOnInvalidKey {
			err = <-errchan
			if err != nil {
				cancel()  // first cancel context
				wg.Wait() // then wait for shutdown
				return nil, err
			}
		}
		// Start the hostmetadata pusher once.
		// It sends the hostmetadata for the host where the collector is running.
		if cfg.HostMetadata.Enabled {
			if cfg.HostMetadata.HostnameSource == datadogconfig.HostnameSourceFirstResource {
				set.Logger.Warn("first_resource has no effect when serializer exporter is used for exporting metrics")
			}
			f.onceMetadata.Do(func() {
				attrs := pcommon.NewMap()
				go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
			})
		}
		set.Logger.Info("Using Datadog serializerexporter for metric export")
		sf := serializerexporter.NewFactoryForOSSExporter(metadata.Type, statsIn)
		ex := &serializerexporter.ExporterConfig{
			Metrics: serializerexporter.MetricsConfig{
				Metrics: cfg.Metrics,
			},
			TimeoutConfig: exporterhelper.TimeoutConfig{
				Timeout: cfg.Timeout,
			},
			QueueConfig: cfg.QueueSettings,
			API:         cfg.API,
			HostProvider: func(ctx context.Context) (string, error) {
				h, err2 := hostProvider.Source(ctx)
				if err2 != nil {
					return "", err2
				}
				return h.Identifier, nil
			},
			ShutdownFunc: func(context.Context) error {
				cancel()  // first cancel context
				wg.Wait() // then wait for shutdown
				f.StopReporter()
				statsWriter.Stop()
				if statsIn != nil {
					close(statsIn)
				}
				return nil
			},
			HostMetadata: cfg.HostMetadata,
		}
		return sf.CreateMetrics(ctx, set, ex)
	default:
		exp, metricsErr := newMetricsExporter(ctx, set, cfg, acfg, &f.onceMetadata, attrsTranslator, hostProvider, metadataReporter, statsIn, f.gatewayUsage)
		if metricsErr != nil {
			cancel()  // first cancel context
			wg.Wait() // then wait for shutdown
			return nil, metricsErr
		}
		pushMetricsFn = exp.PushMetricsDataScrubbed
	}

	exporter, err := exporterhelper.NewMetrics(
		ctx,
		set,
		cfg,
		pushMetricsFn,
		// explicitly disable since we rely on http.Client timeout logic.
		exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0 * time.Second}),
		// We use our own custom mechanism for retries, since we hit several endpoints.
		exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
		// The metrics remapping code mutates data
		exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
		exporterhelper.WithQueue(cfg.QueueSettings),
		exporterhelper.WithShutdown(func(context.Context) error {
			cancel()  // first cancel context
			wg.Wait() // then wait for shutdown
			f.StopReporter()
			statsWriter.Stop()
			if statsIn != nil {
				close(statsIn)
			}
			return nil
		}),
	)
	if err != nil {
		return nil, err
	}
	return resourcetotelemetry.WrapMetricsExporter(
		resourcetotelemetry.Settings{Enabled: cfg.Metrics.ExporterConfig.ResourceAttributesAsTags}, exporter), nil
}