exporter/elasticsearchexporter/factory.go (174 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//go:generate mdatagen metadata.yaml
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
import (
"compress/gzip"
"context"
"maps"
"net/http"
"slices"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper"
"go.opentelemetry.io/collector/exporter/xexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata"
)
var defaultBatcherMinSizeItems = int64(5000)
// NewFactory creates a factory for Elastic exporter.
func NewFactory() exporter.Factory {
return xexporter.NewFactory(
metadata.Type,
createDefaultConfig,
xexporter.WithLogs(createLogsExporter, metadata.LogsStability),
xexporter.WithMetrics(createMetricsExporter, metadata.MetricsStability),
xexporter.WithTraces(createTracesExporter, metadata.TracesStability),
xexporter.WithProfiles(createProfilesExporter, metadata.ProfilesStability),
)
}
func createDefaultConfig() component.Config {
qs := exporterhelper.NewDefaultQueueConfig()
qs.Enabled = false
httpClientConfig := confighttp.NewDefaultClientConfig()
httpClientConfig.Timeout = 90 * time.Second
httpClientConfig.Compression = configcompression.TypeGzip
httpClientConfig.CompressionParams.Level = gzip.BestSpeed
return &Config{
QueueSettings: qs,
ClientConfig: httpClientConfig,
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
LogsDynamicPipeline: DynamicPipelineSettings{
Enabled: false,
},
Retry: RetrySettings{
Enabled: true,
MaxRetries: 0, // default is set in exporter code
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{
http.StatusTooManyRequests,
},
},
Mapping: MappingsSettings{
Mode: "otel",
AllowedModes: slices.Sorted(maps.Keys(canonicalMappingModes)),
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
TelemetrySettings: TelemetrySettings{
LogRequestBody: false,
LogResponseBody: false,
},
Batcher: BatcherConfig{
BatcherConfig: exporterhelper.BatcherConfig{ //nolint:staticcheck
FlushTimeout: 30 * time.Second,
SizeConfig: exporterhelper.SizeConfig{ //nolint:staticcheck
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
},
Flush: FlushSettings{
Bytes: 5e+6,
Interval: 30 * time.Second,
},
}
}
// createLogsExporter creates a new exporter for logs.
//
// Logs are directly indexed into Elasticsearch.
func createLogsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Logs, error) {
cf := cfg.(*Config)
handleDeprecatedConfig(cf, set.Logger)
exporter, err := newExporter(cf, set, cf.LogsIndex)
if err != nil {
return nil, err
}
return exporterhelper.NewLogs(
ctx,
set,
cfg,
exporter.pushLogsData,
exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)...,
)
}
func createMetricsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Metrics, error) {
cf := cfg.(*Config)
handleDeprecatedConfig(cf, set.Logger)
exporter, err := newExporter(cf, set, cf.MetricsIndex)
if err != nil {
return nil, err
}
return exporterhelper.NewMetrics(
ctx,
set,
cfg,
exporter.pushMetricsData,
exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)...,
)
}
func createTracesExporter(ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Traces, error) {
cf := cfg.(*Config)
handleDeprecatedConfig(cf, set.Logger)
exporter, err := newExporter(cf, set, cf.TracesIndex)
if err != nil {
return nil, err
}
return exporterhelper.NewTraces(
ctx,
set,
cfg,
exporter.pushTraceData,
exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)...,
)
}
// createProfilesExporter creates a new exporter for profiles.
//
// Profiles are directly indexed into Elasticsearch.
func createProfilesExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (xexporter.Profiles, error) {
cf := cfg.(*Config)
handleDeprecatedConfig(cf, set.Logger)
exporter, err := newExporter(cf, set, "")
if err != nil {
return nil, err
}
return xexporterhelper.NewProfilesExporter(
ctx,
set,
cfg,
exporter.pushProfilesData,
exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)...,
)
}
func exporterhelperOptions(
cfg *Config,
start component.StartFunc,
shutdown component.ShutdownFunc,
) []exporterhelper.Option {
opts := []exporterhelper.Option{
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithStart(start),
exporterhelper.WithShutdown(shutdown),
exporterhelper.WithQueue(cfg.QueueSettings),
}
if cfg.Batcher.enabledSet {
opts = append(opts, exporterhelper.WithBatcher(cfg.Batcher.BatcherConfig)) //nolint:staticcheck
// Effectively disable timeout_sender because timeout is enforced in bulk indexer.
//
// We keep timeout_sender enabled in the async mode (Batcher.Enabled == nil),
// to ensure sending data to the background workers will not block indefinitely.
opts = append(opts, exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}))
}
return opts
}