exporter/datadogexporter/traces_exporter.go (237 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package datadogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter"
import (
"context"
"fmt"
"net/http"
"sync"
"time"
gzip "github.com/DataDog/datadog-agent/comp/trace/compression/impl-gzip"
"github.com/DataDog/datadog-agent/pkg/trace/agent"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
tracelog "github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/datadog-agent/pkg/trace/telemetry"
"github.com/DataDog/datadog-api-client-go/v2/api/datadogV2"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
zorkian "gopkg.in/zorkian/go-datadog-api.v2"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/clientutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/hostmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/scrub"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog"
datadogconfig "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog/config"
)
var traceCustomHTTPFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.datadogexporter.TraceExportUseCustomHTTPClient",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, trace export uses the HTTP client from the exporter HTTP configs"),
featuregate.WithRegisterFromVersion("v0.105.0"),
)
type traceExporter struct {
params exporter.Settings
cfg *datadogconfig.Config
ctx context.Context // ctx triggers shutdown upon cancellation
client *zorkian.Client // client sends running metrics to backend & performs API validation
metricsAPI *datadogV2.MetricsApi // client sends running metrics to backend
scrubber scrub.Scrubber // scrubber scrubs sensitive information from error messages
onceMetadata *sync.Once // onceMetadata ensures that metadata is sent only once across all exporters
agent *agent.Agent // agent processes incoming traces
sourceProvider source.Provider // is able to source the origin of a trace (hostname, container, etc)
metadataReporter *inframetadata.Reporter // reports host metadata from resource attributes and metrics
retrier *clientutil.Retrier // retrier handles retries on requests
gatewayUsage *attributes.GatewayUsage // gatewayUsage stores the gateway usage metrics
}
func newTracesExporter(
ctx context.Context,
params exporter.Settings,
cfg *datadogconfig.Config,
onceMetadata *sync.Once,
sourceProvider source.Provider,
agent *agent.Agent,
metadataReporter *inframetadata.Reporter,
gatewayUsage *attributes.GatewayUsage,
) (*traceExporter, error) {
scrubber := scrub.NewScrubber()
exp := &traceExporter{
params: params,
cfg: cfg,
ctx: ctx,
agent: agent,
onceMetadata: onceMetadata,
scrubber: scrubber,
sourceProvider: sourceProvider,
retrier: clientutil.NewRetrier(params.Logger, cfg.BackOffConfig, scrubber),
metadataReporter: metadataReporter,
gatewayUsage: gatewayUsage,
}
// client to send running metric to the backend & perform API key validation
errchan := make(chan error)
if isMetricExportV2Enabled() {
apiClient := clientutil.CreateAPIClient(
params.BuildInfo,
cfg.Metrics.Endpoint,
cfg.ClientConfig)
go func() { errchan <- clientutil.ValidateAPIKey(ctx, string(cfg.API.Key), params.Logger, apiClient) }()
exp.metricsAPI = datadogV2.NewMetricsApi(apiClient)
} else {
client := clientutil.CreateZorkianClient(string(cfg.API.Key), cfg.Metrics.Endpoint)
go func() { errchan <- clientutil.ValidateAPIKeyZorkian(params.Logger, client) }()
exp.client = client
}
if cfg.API.FailOnInvalidKey {
if err := <-errchan; err != nil {
return nil, err
}
}
return exp, nil
}
var _ consumer.ConsumeTracesFunc = (*traceExporter)(nil).consumeTraces
// headerComputedStats specifies the HTTP header which indicates whether APM stats
// have already been computed for a payload.
const headerComputedStats = "Datadog-Client-Computed-Stats"
func (exp *traceExporter) consumeTraces(
ctx context.Context,
td ptrace.Traces,
) (err error) {
defer func() { err = exp.scrubber.Scrub(err) }()
if exp.cfg.HostMetadata.Enabled {
// start host metadata with resource attributes from
// the first payload.
exp.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
if td.ResourceSpans().Len() > 0 {
attrs = td.ResourceSpans().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs, exp.metadataReporter)
})
// Consume resources for host metadata
for i := 0; i < td.ResourceSpans().Len(); i++ {
res := td.ResourceSpans().At(i).Resource()
consumeResource(exp.metadataReporter, res, exp.params.Logger)
}
}
rspans := td.ResourceSpans()
hosts := make(map[string]struct{})
tags := make(map[string]struct{})
header := make(http.Header)
if noAPMStatsFeatureGate.IsEnabled() {
header[headerComputedStats] = []string{"true"}
}
for i := 0; i < rspans.Len(); i++ {
rspan := rspans.At(i)
src := exp.agent.OTLPReceiver.ReceiveResourceSpans(ctx, rspan, header, exp.gatewayUsage)
switch src.Kind {
case source.HostnameKind:
hosts[src.Identifier] = struct{}{}
case source.AWSECSFargateKind:
tags[src.Tag()] = struct{}{}
case source.InvalidKind:
}
}
exp.exportUsageMetrics(ctx, hosts, tags)
return nil
}
func (exp *traceExporter) exportUsageMetrics(ctx context.Context, hosts map[string]struct{}, tags map[string]struct{}) {
now := pcommon.NewTimestampFromTime(time.Now())
buildTags := metrics.TagsFromBuildInfo(exp.params.BuildInfo)
var err error
if isMetricExportV2Enabled() {
series := make([]datadogV2.MetricSeries, 0, len(hosts)+len(tags))
timestamp := uint64(now)
for host := range hosts {
series = append(series, metrics.DefaultMetrics("traces", host, timestamp, buildTags)...)
if exp.gatewayUsage != nil {
series = append(series, metrics.GatewayUsageGauge(timestamp, host, buildTags, exp.gatewayUsage))
}
}
for tag := range tags {
ms := metrics.DefaultMetrics("traces", "", timestamp, buildTags)
for i := range ms {
ms[i].Tags = append(ms[i].Tags, tag)
}
series = append(series, ms...)
}
_, err = exp.retrier.DoWithRetries(ctx, func(context.Context) error {
ctx2 := clientutil.GetRequestContext(ctx, string(exp.cfg.API.Key))
_, httpresp, merr := exp.metricsAPI.SubmitMetrics(ctx2, datadogV2.MetricPayload{Series: series}, *clientutil.GZipSubmitMetricsOptionalParameters)
return clientutil.WrapError(merr, httpresp)
})
} else {
series := make([]zorkian.Metric, 0, len(hosts)+len(tags))
for host := range hosts {
series = append(series, metrics.DefaultZorkianMetrics("traces", host, uint64(now), exp.params.BuildInfo)...)
}
for tag := range tags {
ms := metrics.DefaultZorkianMetrics("traces", "", uint64(now), exp.params.BuildInfo)
for i := range ms {
ms[i].Tags = append(ms[i].Tags, tag)
}
series = append(series, ms...)
}
_, err = exp.retrier.DoWithRetries(ctx, func(context.Context) error {
return exp.client.PostMetrics(series)
})
}
if err != nil {
exp.params.Logger.Error("Error posting hostname/tags series", zap.Error(err))
}
}
func newTraceAgent(ctx context.Context, params exporter.Settings, cfg *datadogconfig.Config, sourceProvider source.Provider, metricsClient statsd.ClientInterface, attrsTranslator *attributes.Translator) (*agent.Agent, error) {
acfg, err := newTraceAgentConfig(ctx, params, cfg, sourceProvider, attrsTranslator)
if err != nil {
return nil, err
}
return agent.NewAgent(ctx, acfg, telemetry.NewNoopCollector(), metricsClient, gzip.NewComponent()), nil
}
func newTraceAgentConfig(ctx context.Context, params exporter.Settings, cfg *datadogconfig.Config, sourceProvider source.Provider, attrsTranslator *attributes.Translator) (*traceconfig.AgentConfig, error) {
acfg := traceconfig.New()
src, err := sourceProvider.Source(ctx)
if err != nil {
return nil, err
}
if src.Kind == source.HostnameKind {
acfg.Hostname = src.Identifier
}
acfg.OTLPReceiver.AttributesTranslator = attrsTranslator
acfg.OTLPReceiver.SpanNameRemappings = cfg.Traces.SpanNameRemappings
acfg.OTLPReceiver.SpanNameAsResourceName = cfg.Traces.SpanNameAsResourceName
acfg.Endpoints[0].APIKey = string(cfg.API.Key)
acfg.Ignore["resource"] = cfg.Traces.IgnoreResources
acfg.ReceiverEnabled = false // disable HTTP receiver
acfg.AgentVersion = fmt.Sprintf("datadogexporter-%s-%s", params.BuildInfo.Command, params.BuildInfo.Version)
acfg.SkipSSLValidation = cfg.TLSSetting.InsecureSkipVerify
acfg.ComputeStatsBySpanKind = cfg.Traces.ComputeStatsBySpanKind
acfg.PeerTagsAggregation = cfg.Traces.PeerTagsAggregation
acfg.PeerTags = cfg.Traces.PeerTags
acfg.MaxSenderRetries = 4
if traceCustomHTTPFeatureGate.IsEnabled() {
params.Logger.Info("Experimental feature: datadog exporter trace export uses a custom HTTP client from the exporter HTTP configs")
acfg.HTTPClientFunc = func() *http.Client {
return clientutil.NewHTTPClient(cfg.ClientConfig)
}
}
if datadog.OperationAndResourceNameV2FeatureGate.IsEnabled() {
acfg.Features["enable_operation_and_resource_name_logic_v2"] = struct{}{}
} else {
params.Logger.Info("Please enable feature gate datadog.EnableOperationAndResourceNameV2 for improved operation and resource name logic. This feature will be enabled by default in the future - if you have Datadog monitors or alerts set on operation/resource names, you may need to migrate them to the new convention.")
}
if v := cfg.Traces.GetFlushInterval(); v > 0 {
acfg.TraceWriter.FlushPeriodSeconds = v
}
if v := cfg.Traces.TraceBuffer; v > 0 {
acfg.TraceBuffer = v
}
if addr := cfg.Traces.Endpoint; addr != "" {
acfg.Endpoints[0].Host = addr
}
if cfg.Traces.ComputeTopLevelBySpanKind {
acfg.Features["enable_otlp_compute_top_level_by_span_kind"] = struct{}{}
}
if !datadog.ReceiveResourceSpansV2FeatureGate.IsEnabled() {
acfg.Features["disable_receive_resource_spans_v2"] = struct{}{}
}
tracelog.SetLogger(&datadog.Zaplogger{Logger: params.Logger}) // TODO: This shouldn't be a singleton
return acfg, nil
}