internal/pkg/config/output.go (224 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package config import ( "bytes" "context" "fmt" "net" "net/http" "net/url" "os" "regexp" "strconv" "strings" "time" urlutil "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/go-elasticsearch/v8" "github.com/rs/zerolog" ) // The timeout would be driven by the server for long poll. // Giving it some sane long value. const httpTransportLongPollTimeout = 10 * time.Minute const schemeHTTP = "http" var hasScheme = regexp.MustCompile(`^([a-z][a-z0-9+\-.]*)://`) // Output is the output configuration to elasticsearch. type Output struct { Elasticsearch Elasticsearch `config:"elasticsearch"` Extra map[string]interface{} `config:",inline"` } // Elasticsearch is the configuration for elasticsearch. type Elasticsearch struct { Protocol string `config:"protocol"` Hosts []string `config:"hosts"` Path string `config:"path"` Headers map[string]string `config:"headers"` ServiceToken string `config:"service_token"` ServiceTokenPath string `config:"service_token_path"` ProxyURL string `config:"proxy_url"` ProxyDisable bool `config:"proxy_disable"` ProxyHeaders map[string]string `config:"proxy_headers"` TLS *tlscommon.Config `config:"ssl"` MaxRetries int `config:"max_retries"` MaxConnPerHost int `config:"max_conn_per_host"` Timeout time.Duration `config:"timeout"` MaxContentLength int `config:"max_content_length"` } // InitDefaults initializes the defaults for the configuration. func (c *Elasticsearch) InitDefaults() { c.Protocol = schemeHTTP c.Hosts = []string{"localhost:9200"} c.Timeout = 90 * time.Second c.MaxRetries = 3 c.MaxConnPerHost = 128 c.MaxContentLength = 100 * 1024 * 1024 } // Validate ensures that the configuration is valid. func (c *Elasticsearch) Validate() error { if c.ProxyURL != "" && !c.ProxyDisable { if _, err := urlutil.ParseURL(c.ProxyURL); err != nil { return err } } if c.TLS != nil && c.TLS.IsEnabled() { _, err := tlscommon.LoadTLSConfig(c.TLS) if err != nil { return err } } return nil } // ToESConfig converts the configuration object into the config for the elasticsearch client. func (c *Elasticsearch) ToESConfig(longPoll bool) (elasticsearch.Config, error) { // build the addresses addrs := make([]string, len(c.Hosts)) for i, host := range c.Hosts { addr, err := makeURL(c.Protocol, c.Path, host, 9200) if err != nil { return elasticsearch.Config{}, err } addrs[i] = addr } // build the transport from the config httpTransport := &http.Transport{ DialContext: (&net.Dialer{ Timeout: 10 * time.Second, KeepAlive: 30 * time.Second, }).DialContext, TLSHandshakeTimeout: 10 * time.Second, DisableKeepAlives: false, DisableCompression: false, MaxIdleConns: 100, MaxIdleConnsPerHost: 32, MaxConnsPerHost: c.MaxConnPerHost, IdleConnTimeout: 60 * time.Second, ResponseHeaderTimeout: c.Timeout, ExpectContinueTimeout: 1 * time.Second, } disableRetry := false if longPoll { httpTransport.IdleConnTimeout = httpTransportLongPollTimeout httpTransport.ResponseHeaderTimeout = httpTransportLongPollTimeout // no retries for long poll monitoring disableRetry = true } if c.TLS != nil && c.TLS.IsEnabled() { tls, err := tlscommon.LoadTLSConfig(c.TLS) if err != nil { return elasticsearch.Config{}, err } httpTransport.TLSClientConfig = tls.ToConfig() } if !c.ProxyDisable { if c.ProxyURL != "" { proxyURL, err := urlutil.ParseURL(c.ProxyURL) if err != nil { return elasticsearch.Config{}, err } httpTransport.Proxy = http.ProxyURL(proxyURL) } else { httpTransport.Proxy = http.ProxyFromEnvironment } var proxyHeaders http.Header if len(c.ProxyHeaders) > 0 { proxyHeaders = make(http.Header, len(c.ProxyHeaders)) for k, v := range c.ProxyHeaders { proxyHeaders.Add(k, v) } } httpTransport.ProxyConnectHeader = proxyHeaders } h := http.Header{} for key, val := range c.Headers { h.Set(key, val) } // Set special header "X-elastic-product-origin" for .fleet-* indices based on the latest conversation with ES team // This eliminates the warning while accessing the system index h.Set("X-elastic-product-origin", "fleet") serviceToken := c.ServiceToken if c.ServiceToken == "" && c.ServiceTokenPath != "" { p, err := os.ReadFile(c.ServiceTokenPath) if err != nil { return elasticsearch.Config{}, fmt.Errorf("unable to read service_token_path: %w", err) } serviceToken = string(p) } return elasticsearch.Config{ Addresses: addrs, ServiceToken: serviceToken, Header: h, Transport: httpTransport, MaxRetries: c.MaxRetries, DisableRetry: disableRetry, }, nil } // Validate validates that only elasticsearch is defined on the output. func (c *Output) Validate() error { if c.Extra == nil { return nil } _, ok := c.Extra["elasticsearch"] if (!ok && len(c.Extra) > 0) || (ok && len(c.Extra) > 1) { return fmt.Errorf("can only contain elasticsearch key") } // clear Extra because its valid (only used for validation) c.Extra = nil return nil } func makeURL(defaultScheme string, defaultPath string, rawURL string, defaultPort int) (string, error) { if defaultScheme == "" { defaultScheme = schemeHTTP } if !hasScheme.MatchString(rawURL) { rawURL = fmt.Sprintf("%v://%v", defaultScheme, rawURL) } addr, err := url.Parse(rawURL) if err != nil { return "", err } scheme := addr.Scheme host := addr.Host port := strconv.Itoa(defaultPort) if host == "" { host = "localhost" } else { // split host and optional port if splitHost, splitPort, err := net.SplitHostPort(host); err == nil { host = splitHost port = splitPort } // Check if ipv6 if strings.Count(host, ":") > 1 && strings.Count(host, "]") == 0 { host = "[" + host + "]" } } // Assign default path if not set if addr.Path == "" { addr.Path = defaultPath } // reconstruct url addr.Scheme = scheme addr.Host = host + ":" + port return addr.String(), nil } func (c *Elasticsearch) DiagRequests(ctx context.Context) []byte { pURL, err := httpcommon.NewProxyURIFromString(c.ProxyURL) if err != nil { zerolog.Ctx(ctx).Warn().Err(err).Msg("Unable to transform proxy_url to url.URL") } settings := httpcommon.HTTPTransportSettings{ TLS: c.TLS, Timeout: c.Timeout, Proxy: httpcommon.HTTPClientProxySettings{ Disable: c.ProxyDisable, URL: pURL, Headers: httpcommon.ProxyHeaders(c.ProxyHeaders), }, } headers := http.Header{} for k, v := range c.Headers { headers.Set(k, v) } reqs := make([]*http.Request, 0, len(c.Hosts)) var res bytes.Buffer for _, host := range c.Hosts { hostURL, err := makeURL(c.Protocol, "", host, 9200) if err != nil { zerolog.Ctx(ctx).Warn().Err(err).Str("host", host).Msg("Unable to transform host to url.URL") res.WriteString(fmt.Sprintf("Unable to transform host %q to url.URL: %v\n", host, err)) continue } req, err := http.NewRequestWithContext(ctx, http.MethodGet, hostURL, nil) if err != nil { zerolog.Ctx(ctx).Warn().Err(err).Str("host", host).Msg("Unable to create request to host") res.WriteString(fmt.Sprintf("Unable to create request to host %q: %v\n", host, err)) continue } req.Header = headers.Clone() reqs = append(reqs, req) } res.Write(settings.DiagRequests(reqs)()) return res.Bytes() }