config/configelasticsearch/configclient.go (143 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package configelasticsearch import ( "encoding/base64" "errors" "fmt" "net/http" "net/url" "os" "strings" "time" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" ) const defaultElasticsearchEnvName = "ELASTICSEARCH_URL" var ( errConfigEndpointRequired = errors.New("exactly one of [endpoint, endpoints, cloudid] must be specified") errConfigEmptyEndpoint = errors.New("endpoint must not be empty") ) // NewDefaultClientConfig returns ClientConfig type object with // the default values of 'MaxIdleConns' and 'IdleConnTimeout', as well as [http.DefaultTransport] values. // Other config options are not added as they are initialized with 'zero value' by GoLang as default. // We encourage to use this function to create an object of ClientConfig. func NewDefaultClientConfig() ClientConfig { // The default values are taken from the values of 'DefaultTransport' of 'http' package. defaultHTTPClientConfig := confighttp.NewDefaultClientConfig() defaultHTTPClientConfig.Timeout = 90 * time.Second defaultHTTPClientConfig.Compression = configcompression.TypeGzip return ClientConfig{ ClientConfig: defaultHTTPClientConfig, TelemetrySettings: TelemetrySettings{ LogRequestBody: false, LogResponseBody: false, }, Retry: RetrySettings{ Enabled: true, MaxRetries: 0, // default is set in exporter code InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{ http.StatusTooManyRequests, }, }, } } type ClientConfig struct { confighttp.ClientConfig `mapstructure:",squash"` // CloudID holds the cloud ID to identify the Elastic Cloud cluster to send events to. // https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html // // This setting is required if no URL is configured. CloudID string `mapstructure:"cloudid"` // ELASTICSEARCH_URL environment variable is not set. Endpoints []string `mapstructure:"endpoints"` Retry RetrySettings `mapstructure:"retry"` Discovery DiscoverySettings `mapstructure:"discover"` // TelemetrySettings contains settings useful for testing/debugging purposes // This is experimental and may change at any time. TelemetrySettings `mapstructure:"telemetry"` } type TelemetrySettings struct { LogRequestBody bool `mapstructure:"log_request_body"` LogResponseBody bool `mapstructure:"log_response_body"` } // RetrySettings defines settings for the HTTP request retries in the Elasticsearch exporter. // Failed sends are retried with exponential backoff. type RetrySettings struct { // RetryOnStatus configures the status codes that trigger request or document level retries. RetryOnStatus []int `mapstructure:"retry_on_status"` // MaxRetries configures how many times an HTTP request is retried. MaxRetries int `mapstructure:"max_retries"` // InitialInterval configures the initial waiting time if a request failed. InitialInterval time.Duration `mapstructure:"initial_interval"` // MaxInterval configures the max waiting time if consecutive requests failed. MaxInterval time.Duration `mapstructure:"max_interval"` // Enabled allows users to disable retry without having to comment out all settings. Enabled bool `mapstructure:"enabled"` } // DiscoverySettings defines Elasticsearch node discovery related settings. // The exporter will check Elasticsearch regularly for available nodes // and updates the list of hosts if discovery is enabled. Newly discovered // nodes will automatically be used for load balancing. // // DiscoverySettings should not be enabled when operating Elasticsearch behind a proxy // or load balancer. // // https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how type DiscoverySettings struct { // OnStart, if set, instructs the exporter to look for available Elasticsearch // nodes the first time the exporter connects to the cluster. OnStart bool `mapstructure:"on_start"` // Interval instructs the exporter to renew the list of Elasticsearch URLs // with the given interval. URLs will not be updated if Interval is <=0. Interval time.Duration `mapstructure:"interval"` } // Validate checks the receiver configuration is valid. func (cfg *ClientConfig) Validate() error { endpoints, err := cfg.endpoints() if err != nil { return err } for _, endpoint := range endpoints { if err := validateEndpoint(endpoint); err != nil { return fmt.Errorf("invalid endpoint %q: %w", endpoint, err) } } if cfg.Compression != "none" && cfg.Compression != configcompression.TypeGzip { return errors.New("compression must be one of [none, gzip]") } if cfg.Retry.MaxRetries < 0 { return errors.New("retry::max_requests should be non-negative") } return cfg.ClientConfig.Validate() } func validateEndpoint(endpoint string) error { if endpoint == "" { return errConfigEmptyEndpoint } u, err := url.Parse(endpoint) if err != nil { return err } switch u.Scheme { case "http", "https": default: return fmt.Errorf(`invalid scheme %q, expected "http" or "https"`, u.Scheme) } return nil } func (cfg *ClientConfig) endpoints() ([]string, error) { // Exactly one of endpoint, endpoints, or cloudid must be configured. // If none are set, then $ELASTICSEARCH_URL may be specified instead. var endpoints []string var numEndpointConfigs int if cfg.Endpoint != "" { numEndpointConfigs++ endpoints = []string{cfg.Endpoint} } if len(cfg.Endpoints) > 0 { numEndpointConfigs++ endpoints = cfg.Endpoints } if cfg.CloudID != "" { numEndpointConfigs++ u, err := parseCloudID(cfg.CloudID) if err != nil { return nil, err } endpoints = []string{u.String()} } if numEndpointConfigs == 0 { if v := os.Getenv(defaultElasticsearchEnvName); v != "" { numEndpointConfigs++ endpoints = strings.Split(v, ",") for i, endpoint := range endpoints { endpoints[i] = strings.TrimSpace(endpoint) } } } if numEndpointConfigs != 1 { return nil, errConfigEndpointRequired } return endpoints, nil } // Based on "addrFromCloudID" in go-elasticsearch. func parseCloudID(input string) (*url.URL, error) { _, after, ok := strings.Cut(input, ":") if !ok { return nil, fmt.Errorf("invalid CloudID %q", input) } decoded, err := base64.StdEncoding.DecodeString(after) if err != nil { return nil, err } before, after, ok := strings.Cut(string(decoded), "$") if !ok { return nil, fmt.Errorf("invalid decoded CloudID %q", string(decoded)) } return url.Parse(fmt.Sprintf("https://%s.%s", after, before)) }