config/configelasticsearch/esclient.go (126 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package configelasticsearch
import (
"compress/gzip"
"context"
"io"
"net/http"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
"github.com/elastic/go-elasticsearch/v8"
"go.opentelemetry.io/collector/component"
traceSdk "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
)
const defaultMaxRetries = 2
// clientLogger implements the estransport.Logger interface
// that is required by the Elasticsearch client for logging.
type clientLogger struct {
*zap.Logger
logRequestBody bool
logResponseBody bool
}
// LogRoundTrip should not modify the request or response, except for consuming and closing the body.
// Implementations have to check for nil values in request and response.
func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, clientErr error, _ time.Time, dur time.Duration) error {
zl := cl.Logger
var fields []zap.Field
if cl.logRequestBody && requ != nil && requ.Body != nil {
body := requ.Body
if requ.Header.Get("Content-Encoding") == "gzip" {
if r, err := gzip.NewReader(body); err == nil {
defer r.Close()
body = r
}
}
if b, err := io.ReadAll(body); err == nil {
fields = append(fields, zap.ByteString("request_body", b))
}
}
if cl.logResponseBody && resp != nil && resp.Body != nil {
if b, err := io.ReadAll(resp.Body); err == nil {
fields = append(fields, zap.ByteString("response_body", b))
}
}
switch {
case clientErr == nil && resp != nil:
fields = append(
fields,
zap.String("path", requ.URL.Path),
zap.String("method", requ.Method),
zap.Duration("duration", dur),
zap.String("status", resp.Status),
)
zl.Debug("Request roundtrip completed.", fields...)
case clientErr != nil:
fields = append(
fields,
zap.NamedError("reason", clientErr),
)
zl.Debug("Request failed.", fields...)
}
return nil
}
// RequestBodyEnabled makes the client pass a copy of request body to the logger.
func (cl *clientLogger) RequestBodyEnabled() bool {
return cl.logRequestBody
}
// ResponseBodyEnabled makes the client pass a copy of response body to the logger.
func (cl *clientLogger) ResponseBodyEnabled() bool {
return cl.logResponseBody
}
// user_agent should be added with the confighttp client
func (cfg *ClientConfig) ToClient(
ctx context.Context,
host component.Host,
telemetry component.TelemetrySettings,
) (*elasticsearch.Client, error) {
httpClient, err := cfg.ClientConfig.ToClient(ctx, host, telemetry)
if err != nil {
return nil, err
}
// endpoints converts Config.Endpoints, Config.CloudID,
// and Config.ClientConfig.Endpoint to a list of addresses.
endpoints, err := cfg.endpoints()
if err != nil {
return nil, err
}
esLogger := clientLogger{
Logger: telemetry.Logger,
logRequestBody: cfg.TelemetrySettings.LogRequestBody,
logResponseBody: cfg.TelemetrySettings.LogResponseBody,
}
maxRetries := defaultMaxRetries
if cfg.Retry.MaxRetries != 0 {
maxRetries = cfg.Retry.MaxRetries
}
return elasticsearch.NewClient(elasticsearch.Config{
Transport: httpClient.Transport,
// configure connection setup
Addresses: endpoints,
// configure retry behavior
RetryOnStatus: cfg.Retry.RetryOnStatus,
DisableRetry: !cfg.Retry.Enabled,
// RetryOnError: retryOnError, // should be used from esclient version 8 onwards
MaxRetries: maxRetries,
RetryBackoff: createElasticsearchBackoffFunc(&cfg.Retry),
// configure sniffing
DiscoverNodesOnStart: cfg.Discovery.OnStart,
DiscoverNodesInterval: cfg.Discovery.Interval,
// configure internal metrics reporting and logging
EnableMetrics: false, // TODO
EnableDebugLogger: false, // TODO
Logger: &esLogger,
Instrumentation: func() elastictransport.Instrumentation {
// only set tracing if enabled and not the noop tracer
// The actual implementation of the no-op tracer (tracer.noopNoContextTracer)
// is not exported by the OpenTelemetry collector, so we cannot directly check for it.
// Instead, we assert if the default Go SDK Tracer provider is used: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.118.0/service/telemetry/tracer.go#L66
// TODO: use tracer.Enabled() once available in the Go SDK:https: //github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#enabled
if _, ok := telemetry.TracerProvider.(*traceSdk.TracerProvider); ok {
return elasticsearch.NewOpenTelemetryInstrumentation(telemetry.TracerProvider, false)
}
return nil
}(),
})
}
func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Duration {
if !config.Enabled {
return nil
}
expBackoff := backoff.NewExponentialBackOff()
if config.InitialInterval > 0 {
expBackoff.InitialInterval = config.InitialInterval
}
if config.MaxInterval > 0 {
expBackoff.MaxInterval = config.MaxInterval
}
expBackoff.Reset()
return func(attempts int) time.Duration {
if attempts == 1 {
expBackoff.Reset()
}
return expBackoff.NextBackOff()
}
}