exporter/signalfxexporter/factory.go (160 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package signalfxexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter" import ( "context" "errors" "fmt" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/correlation" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr" ) const ( defaultHTTPTimeout = time.Second * 10 defaultHTTP2ReadIdleTimeout = time.Second * 10 defaultHTTP2PingTimeout = time.Second * 10 defaultMaxConns = 100 defaultDimMaxBuffered = 10000 defaultDimSendDelay = 10 * time.Second defaultDimMaxConnsPerHost = 20 defaultDimMaxIdleConns = 20 defaultDimMaxIdleConnsPerHost = 20 ) // NewFactory creates a factory for SignalFx exporter. func NewFactory() exporter.Factory { return exporter.NewFactory( metadata.Type, createDefaultConfig, exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability), exporter.WithLogs(createLogsExporter, metadata.LogsStability), exporter.WithTraces(createTracesExporter, metadata.TracesStability), ) } func createDefaultConfig() component.Config { maxConnCount := defaultMaxConns idleConnTimeout := 30 * time.Second timeout := 10 * time.Second clientConfig := confighttp.NewDefaultClientConfig() clientConfig.Timeout = defaultHTTPTimeout clientConfig.MaxIdleConns = maxConnCount clientConfig.MaxIdleConnsPerHost = maxConnCount clientConfig.IdleConnTimeout = idleConnTimeout clientConfig.HTTP2ReadIdleTimeout = defaultHTTP2ReadIdleTimeout clientConfig.HTTP2PingTimeout = defaultHTTP2PingTimeout return &Config{ BackOffConfig: configretry.NewDefaultBackOffConfig(), QueueSettings: exporterhelper.NewDefaultQueueConfig(), ClientConfig: clientConfig, AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{ AccessTokenPassthrough: true, }, DeltaTranslationTTL: 3600, Correlation: correlation.DefaultConfig(), NonAlphanumericDimensionChars: "_-.", DimensionClient: DimensionClientConfig{ SendDelay: defaultDimSendDelay, MaxBuffered: defaultDimMaxBuffered, MaxConnsPerHost: defaultDimMaxConnsPerHost, MaxIdleConns: defaultDimMaxIdleConns, MaxIdleConnsPerHost: defaultDimMaxIdleConnsPerHost, IdleConnTimeout: idleConnTimeout, Timeout: timeout, }, } } func createTracesExporter( ctx context.Context, set exporter.Settings, eCfg component.Config, ) (exporter.Traces, error) { cfg := eCfg.(*Config) corrCfg := cfg.Correlation if corrCfg.Endpoint == "" { apiURL, err := cfg.getAPIURL() if err != nil { return nil, fmt.Errorf("unable to create API URL: %w", err) } corrCfg.Endpoint = apiURL.String() } if cfg.AccessToken == "" { return nil, errors.New("access_token is required") } set.Logger.Info("Correlation tracking enabled", zap.String("endpoint", corrCfg.Endpoint)) tracker := correlation.NewTracker(corrCfg, cfg.AccessToken, set) return exporterhelper.NewTraces( ctx, set, cfg, tracker.ProcessTraces, exporterhelper.WithStart(tracker.Start), exporterhelper.WithShutdown(tracker.Shutdown)) } func createMetricsExporter( ctx context.Context, set exporter.Settings, config component.Config, ) (exporter.Metrics, error) { cfg := config.(*Config) exp, err := newSignalFxExporter(cfg, set) if err != nil { return nil, err } me, err := exporterhelper.NewMetrics( ctx, set, cfg, exp.pushMetrics, // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), exporterhelper.WithRetry(cfg.BackOffConfig), exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithStart(exp.start), exporterhelper.WithShutdown(exp.shutdown)) if err != nil { return nil, err } // If AccessTokenPassthrough enabled, split the incoming Metrics data by splunk.SFxAccessTokenLabel, // this ensures that we get batches of data for the same token when pushing to the backend. if cfg.AccessTokenPassthrough { me = &baseMetricsExporter{ Component: me, Metrics: batchperresourceattr.NewBatchPerResourceMetrics(splunk.SFxAccessTokenLabel, me), } } return &signalfMetadataExporter{ Metrics: me, exporter: exp, }, nil } func createLogsExporter( ctx context.Context, set exporter.Settings, cfg component.Config, ) (exporter.Logs, error) { expCfg := cfg.(*Config) exp, err := newEventExporter(expCfg, set) if err != nil { return nil, err } le, err := exporterhelper.NewLogs( ctx, set, cfg, exp.pushLogs, // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), exporterhelper.WithRetry(expCfg.BackOffConfig), exporterhelper.WithQueue(expCfg.QueueSettings), exporterhelper.WithStart(exp.startLogs)) if err != nil { return nil, err } // If AccessTokenPassthrough enabled, split the incoming Metrics data by splunk.SFxAccessTokenLabel, // this ensures that we get batches of data for the same token when pushing to the backend. if expCfg.AccessTokenPassthrough { le = &baseLogsExporter{ Component: le, Logs: batchperresourceattr.NewBatchPerResourceLogs(splunk.SFxAccessTokenLabel, le), } } return le, nil }