exporter/elasticsearchexporter/config.go (270 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( "encoding/base64" "errors" "fmt" "net/url" "os" "slices" "strings" "time" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" ) // Config defines configuration for Elastic exporter. type Config struct { QueueSettings exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"` // Endpoints holds the Elasticsearch URLs the exporter should send events to. // // This setting is required if CloudID is not set and if the // ELASTICSEARCH_URL environment variable is not set. Endpoints []string `mapstructure:"endpoints"` // CloudID holds the cloud ID to identify the Elastic Cloud cluster to send events to. // https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html // // This setting is required if no URL is configured. CloudID string `mapstructure:"cloudid"` // NumWorkers configures the number of workers publishing bulk requests. NumWorkers int `mapstructure:"num_workers"` // LogsIndex configures the static index used for document routing for logs. // It should be empty if dynamic document routing is preferred. LogsIndex string `mapstructure:"logs_index"` LogsDynamicIndex DynamicIndexSetting `mapstructure:"logs_dynamic_index"` // MetricsIndex configures the static index used for document routing for metrics. // It should be empty if dynamic document routing is preferred. MetricsIndex string `mapstructure:"metrics_index"` MetricsDynamicIndex DynamicIndexSetting `mapstructure:"metrics_dynamic_index"` // TracesIndex configures the static index used for document routing for metrics. // It should be empty if dynamic document routing is preferred. TracesIndex string `mapstructure:"traces_index"` TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"` // LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES. LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"` // LogsDynamicPipeline configures whether log record attribute `elasticsearch.document_pipeline` is set as the document ingest pipeline for ES. LogsDynamicPipeline DynamicPipelineSettings `mapstructure:"logs_dynamic_pipeline"` // Pipeline configures the ingest node pipeline name that should be used to process the // events. // // https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html Pipeline string `mapstructure:"pipeline"` confighttp.ClientConfig `mapstructure:",squash"` Authentication AuthenticationSettings `mapstructure:",squash"` Discovery DiscoverySettings `mapstructure:"discover"` Retry RetrySettings `mapstructure:"retry"` Flush FlushSettings `mapstructure:"flush"` Mapping MappingsSettings `mapstructure:"mapping"` LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"` // TelemetrySettings contains settings useful for testing/debugging purposes // This is experimental and may change at any time. TelemetrySettings `mapstructure:"telemetry"` // Batcher holds configuration for batching requests based on timeout // and size-based thresholds. // // Batcher is unused by default, in which case Flush will be used. // If Batcher.Enabled is non-nil (i.e. batcher::enabled is specified), // then the Flush will be ignored even if Batcher.Enabled is false. Batcher BatcherConfig `mapstructure:"batcher"` } // BatcherConfig holds configuration for exporterbatcher. // // This is a slightly modified version of exporterbatcher.Config, // to enable tri-state Enabled: unset, false, true. type BatcherConfig struct { exporterhelper.BatcherConfig `mapstructure:",squash"` // enabledSet tracks whether Enabled has been specified. // If enabledSet is false, the exporter will perform its // own buffering. enabledSet bool `mapstructure:"-"` } func (c *BatcherConfig) Unmarshal(conf *confmap.Conf) error { if err := conf.Unmarshal(c); err != nil { return err } c.enabledSet = conf.IsSet("enabled") return nil } type TelemetrySettings struct { LogRequestBody bool `mapstructure:"log_request_body"` LogResponseBody bool `mapstructure:"log_response_body"` } type LogstashFormatSettings struct { Enabled bool `mapstructure:"enabled"` PrefixSeparator string `mapstructure:"prefix_separator"` DateFormat string `mapstructure:"date_format"` } type DynamicIndexSetting struct { // Enabled enables dynamic index routing. // // Deprecated: [v0.122.0] This config is now ignored. Dynamic index routing is always done by default. Enabled bool `mapstructure:"enabled"` } type DynamicIDSettings struct { Enabled bool `mapstructure:"enabled"` } type DynamicPipelineSettings struct { Enabled bool `mapstructure:"enabled"` } // AuthenticationSettings defines user authentication related settings. type AuthenticationSettings struct { // User is used to configure HTTP Basic Authentication. User string `mapstructure:"user"` // Password is used to configure HTTP Basic Authentication. Password configopaque.String `mapstructure:"password"` // APIKey is used to configure ApiKey based Authentication. // // https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html APIKey configopaque.String `mapstructure:"api_key"` } // DiscoverySettings defines Elasticsearch node discovery related settings. // The exporter will check Elasticsearch regularly for available nodes // and updates the list of hosts if discovery is enabled. Newly discovered // nodes will automatically be used for load balancing. // // DiscoverySettings should not be enabled when operating Elasticsearch behind a proxy // or load balancer. // // https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how type DiscoverySettings struct { // OnStart, if set, instructs the exporter to look for available Elasticsearch // nodes the first time the exporter connects to the cluster. OnStart bool `mapstructure:"on_start"` // Interval instructs the exporter to renew the list of Elasticsearch URLs // with the given interval. URLs will not be updated if Interval is <=0. Interval time.Duration `mapstructure:"interval"` } // FlushSettings defines settings for configuring the write buffer flushing // policy in the Elasticsearch exporter. The exporter sends a bulk request with // all events already serialized into the send-buffer. type FlushSettings struct { // Bytes sets the send buffer flushing limit. Bytes int `mapstructure:"bytes"` // Interval configures the max age of a document in the send buffer. Interval time.Duration `mapstructure:"interval"` } // RetrySettings defines settings for the HTTP request retries in the Elasticsearch exporter. // Failed sends are retried with exponential backoff. type RetrySettings struct { // Enabled allows users to disable retry without having to comment out all settings. Enabled bool `mapstructure:"enabled"` // MaxRequests configures how often an HTTP request is attempted before it is assumed to be failed. // Deprecated: use MaxRetries instead. MaxRequests int `mapstructure:"max_requests"` // MaxRetries configures how many times an HTTP request is retried. MaxRetries int `mapstructure:"max_retries"` // InitialInterval configures the initial waiting time if a request failed. InitialInterval time.Duration `mapstructure:"initial_interval"` // MaxInterval configures the max waiting time if consecutive requests failed. MaxInterval time.Duration `mapstructure:"max_interval"` // RetryOnStatus configures the status codes that trigger request or document level retries. RetryOnStatus []int `mapstructure:"retry_on_status"` } type MappingsSettings struct { // Mode configures the default document mapping mode. // // The mode may be overridden by the client metadata key // X-Elastic-Mapping-Mode, if specified. Mode string `mapstructure:"mode"` // AllowedModes controls the allowed document mapping modes // specified through X-Elastic-Mapping-Mode client metadata. // // If unspecified, all mapping modes are allowed. AllowedModes []string `mapstructure:"allowed_modes"` } type MappingMode int // Enum values for MappingMode. const ( MappingNone MappingMode = iota MappingECS MappingOTel MappingRaw MappingBodyMap // NumMappingModes remain last, it is used for sizing arrays. NumMappingModes ) func (m MappingMode) String() string { switch m { case MappingNone: return "none" case MappingECS: return "ecs" case MappingOTel: return "otel" case MappingRaw: return "raw" case MappingBodyMap: return "bodymap" } return "" } var ( errConfigEndpointRequired = errors.New("exactly one of [endpoint, endpoints, cloudid] must be specified") errConfigEmptyEndpoint = errors.New("endpoint must not be empty") ) const defaultElasticsearchEnvName = "ELASTICSEARCH_URL" // Validate validates the elasticsearch server configuration. func (cfg *Config) Validate() error { endpoints, err := cfg.endpoints() if err != nil { return err } for _, endpoint := range endpoints { if err := validateEndpoint(endpoint); err != nil { return fmt.Errorf("invalid endpoint %q: %w", endpoint, err) } } canonicalAllowedModes := make([]string, len(cfg.Mapping.AllowedModes)) for i, name := range cfg.Mapping.AllowedModes { canonicalName := canonicalMappingModeName(name) if _, ok := canonicalMappingModes[canonicalName]; !ok { return fmt.Errorf("unknown allowed mapping mode name %q", name) } canonicalAllowedModes[i] = canonicalName } if !slices.Contains(canonicalAllowedModes, canonicalMappingModeName(cfg.Mapping.Mode)) { return fmt.Errorf("invalid or disallowed default mapping mode %q", cfg.Mapping.Mode) } if cfg.Compression != "none" && cfg.Compression != configcompression.TypeGzip { return errors.New("compression must be one of [none, gzip]") } if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 { return errors.New("must not specify both retry::max_requests and retry::max_retries") } if cfg.Retry.MaxRequests < 0 { return errors.New("retry::max_requests should be non-negative") } if cfg.Retry.MaxRetries < 0 { return errors.New("retry::max_retries should be non-negative") } if cfg.LogsIndex != "" && cfg.LogsDynamicIndex.Enabled { return errors.New("must not specify both logs_index and logs_dynamic_index; logs_index should be empty unless all documents should be sent to the same index") } if cfg.MetricsIndex != "" && cfg.MetricsDynamicIndex.Enabled { return errors.New("must not specify both metrics_index and metrics_dynamic_index; metrics_index should be empty unless all documents should be sent to the same index") } if cfg.TracesIndex != "" && cfg.TracesDynamicIndex.Enabled { return errors.New("must not specify both traces_index and traces_dynamic_index; traces_index should be empty unless all documents should be sent to the same index") } return nil } // allowedMappingModes returns a map from canonical mapping mode names to MappingModes. func (cfg *Config) allowedMappingModes() map[string]MappingMode { modes := make(map[string]MappingMode) for _, name := range cfg.Mapping.AllowedModes { canonical := canonicalMappingModeName(name) modes[canonical] = canonicalMappingModes[canonical] } return modes } var canonicalMappingModes = map[string]MappingMode{ MappingNone.String(): MappingNone, MappingRaw.String(): MappingRaw, MappingECS.String(): MappingECS, MappingOTel.String(): MappingOTel, MappingBodyMap.String(): MappingBodyMap, } func canonicalMappingModeName(name string) string { lower := strings.ToLower(name) switch lower { case "", "no": // aliases for "none" return "none" default: return lower } } func (cfg *Config) endpoints() ([]string, error) { // Exactly one of endpoint, endpoints, or cloudid must be configured. // If none are set, then $ELASTICSEARCH_URL may be specified instead. var endpoints []string var numEndpointConfigs int if cfg.Endpoint != "" { numEndpointConfigs++ endpoints = []string{cfg.Endpoint} } if len(cfg.Endpoints) > 0 { numEndpointConfigs++ endpoints = cfg.Endpoints } if cfg.CloudID != "" { numEndpointConfigs++ u, err := parseCloudID(cfg.CloudID) if err != nil { return nil, err } endpoints = []string{u.String()} } if numEndpointConfigs == 0 { if v := os.Getenv(defaultElasticsearchEnvName); v != "" { numEndpointConfigs++ endpoints = strings.Split(v, ",") for i, endpoint := range endpoints { endpoints[i] = strings.TrimSpace(endpoint) } } } if numEndpointConfigs != 1 { return nil, errConfigEndpointRequired } return endpoints, nil } func validateEndpoint(endpoint string) error { if endpoint == "" { return errConfigEmptyEndpoint } u, err := url.Parse(endpoint) if err != nil { return err } switch u.Scheme { case "http", "https": default: return fmt.Errorf(`invalid scheme %q, expected "http" or "https"`, u.Scheme) } return nil } // Based on "addrFromCloudID" in go-elasticsearch. func parseCloudID(input string) (*url.URL, error) { _, after, ok := strings.Cut(input, ":") if !ok { return nil, fmt.Errorf("invalid CloudID %q", input) } decoded, err := base64.StdEncoding.DecodeString(after) if err != nil { return nil, err } before, after, ok := strings.Cut(string(decoded), "$") if !ok { return nil, fmt.Errorf("invalid decoded CloudID %q", string(decoded)) } return url.Parse(fmt.Sprintf("https://%s.%s", after, before)) } func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { if cfg.Retry.MaxRequests != 0 { cfg.Retry.MaxRetries = cfg.Retry.MaxRequests - 1 // Do not set cfg.Retry.Enabled = false if cfg.Retry.MaxRequest = 1 to avoid breaking change on behavior logger.Warn("retry::max_requests has been deprecated, and will be removed in a future version. Use retry::max_retries instead.") } if cfg.LogsDynamicIndex.Enabled { logger.Warn("logs_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") } if cfg.MetricsDynamicIndex.Enabled { logger.Warn("metrics_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") } if cfg.TracesDynamicIndex.Enabled { logger.Warn("traces_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") } }