config.go (126 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package docappender import ( "errors" "fmt" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/elastic/elastic-transport-go/v8/elastictransport" ) type Value int const ( Unset Value = iota True False ) // Config holds configuration for Appender. type Config struct { // Logger holds an optional Logger to use for logging indexing requests. // // All Elasticsearch errors will be logged at error level, so in cases // where the indexer is used for high throughput indexing, is recommended // that a rate-limited logger is used. // // If Logger is nil, logging will be disabled. Logger *zap.Logger // TracerProvider holds an optional otel TracerProvider for tracing // flush requests. // // If TracerProvider is nil, requests will not be traced. // To use this provider Tracer must be nil. TracerProvider trace.TracerProvider // CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression) // to 9 (gzip.BestCompression). Higher values provide greater compression, at a // greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the // default compression level. CompressionLevel int // MaxRequests holds the maximum number of bulk index requests to execute concurrently. // The maximum memory usage of Appender is thus approximately MaxRequests*FlushBytes. // // If MaxRequests is less than or equal to zero, the default of 10 will be used. MaxRequests int // BulkIndexerPool holds an optional pool that is used for creating new BulkIndexers. // If not set/nil, a new BulkIndexerPool will be created with MaxRequests as the // guaranteed, local and total maximum number of indexers. // // A BulkIndexerPool may be shared between multiple Appender instances. Each has its // own unique ID to guarantee per Appender limits. // // For more information, see [NewBulkIndexerPool]. BulkIndexerPool *BulkIndexerPool // MaxDocumentRetries holds the maximum number of document retries MaxDocumentRetries int // RetryOnDocumentStatus holds the document level statuses that will trigger a document retry. // // If RetryOnDocumentStatus is empty or nil, the default of [429] will be used. RetryOnDocumentStatus []int // FlushBytes holds the flush threshold in bytes. If Compression is enabled, // The number of documents that can be buffered will be greater. // // If FlushBytes is zero, the default of 1MB will be used. FlushBytes int // FlushInterval holds the flush threshold as a duration. // // If FlushInterval is zero, the default of 30 seconds will be used. FlushInterval time.Duration // FlushTimeout holds the flush timeout as a duration. // // If FlushTimeout is zero, no timeout will be used. FlushTimeout time.Duration // DocumentBufferSize sets the number of documents that can be buffered before // they are stored in the active indexer buffer. // // If DocumentBufferSize is zero, the default 1024 will be used. DocumentBufferSize int // Pipeline holds the ingest pipeline ID. // // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. Pipeline string // RequireDataStream, If set to true, an index will be created only if a // matching index template is found and it contains a data stream template. // When true, `require_data_stream=true` is set in the bulk request. // When false or not set, `require_data_stream` is not set in the bulk request. // Which could cause a classic index to be created if no data stream template // matches the index in the request. // // RequireDataStream is disabled by default. RequireDataStream bool // IncludeSourceOnError, if set to True, the response body of a Bulk Index request // might contain the part of source document on error. // If Unset the error reason will be dropped. // Requires Elasticsearch 8.18+ if value is True or False. // WARNING: if set to True, user is responsible for sanitizing the error as it may contain // sensitive data. // // IncludeSourceOnError is Unset by default IncludeSourceOnError Value // PopulateFailedDocsInput controls whether each BulkIndexerResponseItem.Input // in BulkIndexerResponseStat.FailedDocs is populated with the input of the item, // which includes the action line and the document line. // // WARNING: this is provided for testing and debugging only. // Use with caution as it may expose sensitive data; any clients // of go-docappender enabling this should relay this warning to // their users. Setting this will also add memory overhead. PopulateFailedDocsInput bool // Scaling configuration for the docappender. // // If unspecified, scaling is enabled by default. Scaling ScalingConfig // MeterProvider holds the OTel MeterProvider to be used to create and // record appender metrics. // // If unset, the global OTel MeterProvider will be used, if that is unset, // no metrics will be recorded. MeterProvider metric.MeterProvider // MetricAttributes holds any extra attributes to set in the recorded // metrics. MetricAttributes attribute.Set } // DefaultConfig returns a copy of cfg with any zero values set to their // default values. func DefaultConfig(cl elastictransport.Interface, cfg Config) Config { if cfg.MaxRequests <= 0 { cfg.MaxRequests = 10 } if cfg.FlushBytes <= 0 { cfg.FlushBytes = 1 * 1024 * 1024 } if cfg.FlushInterval <= 0 { cfg.FlushInterval = 30 * time.Second } if cfg.DocumentBufferSize <= 0 { cfg.DocumentBufferSize = 1024 } if !cfg.Scaling.Disabled { if cfg.Scaling.ScaleDown.Threshold == 0 { cfg.Scaling.ScaleDown.Threshold = 30 } if cfg.Scaling.ScaleDown.CoolDown <= 0 { cfg.Scaling.ScaleDown.CoolDown = 30 * time.Second } if cfg.Scaling.ScaleUp.Threshold == 0 { cfg.Scaling.ScaleUp.Threshold = 60 } if cfg.Scaling.ScaleUp.CoolDown <= 0 { cfg.Scaling.ScaleUp.CoolDown = time.Minute } if cfg.Scaling.IdleInterval <= 0 { cfg.Scaling.IdleInterval = 30 * time.Second } if cfg.Scaling.ActiveRatio <= 0 { cfg.Scaling.ActiveRatio = 0.25 } } if cfg.Logger == nil { cfg.Logger = zap.NewNop() } if cfg.BulkIndexerPool == nil { cfg.BulkIndexerPool = NewBulkIndexerPool( cfg.MaxRequests, cfg.MaxRequests, cfg.MaxRequests, BulkIndexerConfigFrom(cl, cfg), ) } return cfg } // ScalingConfig holds the docappender autoscaling configuration. type ScalingConfig struct { // Disabled toggles active indexer scaling on. // // It is enabled by default. Disabled bool // ActiveRatio defines the threshold for (potential) active indexers to // GOMAXPROCS. The higher the number, the more potential active indexers // there will be actively pulling from the BulkIndexerItem channel. // For example, when ActiveRatio:1 and GOMAXPROCS:2, there can be a max // of 2 active indexers, or 1 per GOMAXPROCS. // If set to 0.5, the maximum number of active indexers is 1, since. // The value must be between 0 and 1. // // It defaults to 0.25 by default. ActiveRatio float64 // ScaleDown configures the Threshold and CoolDown for the scale down // action. In order to scale down an active indexer, the Threshold has // to be met after the CoolDown has elapsed. Scale down will only take // place if there are more than 1 active indexer. // Active indexers will be destroyed when they aren't needed anymore, // when enough timed flushes (FlushInterval) are performed by an active // indexer (controlled by Threshold), or when an active indexer is idle // for (IdleInterval * Threshold) as long as CoolDown allows it. // // When unset, the default of 30 is used for Threshold, and 30 seconds for // CoolDown. ScaleDown ScaleActionConfig // ScaleUp configures the Threshold and CoolDown for the scale up action. // // In order for a scale up to occur, the Threshold has to be met after // the CoolDown has elapsed. By default, a single active indexer is created // which actively pulls items from the internal buffered queue. When enough // full flushes (FlushBytes) are performed by an active indexer (controlled // by Threshold), a new active indexer will be created until GOMAXPROCS / 4 // is reached (25% of CPU capacity) if the CoolDown allows it. // // When unspecified, the default of 60 is used for Threshold, and 60 seconds // for CoolDown. ScaleUp ScaleActionConfig // IdleInterval defines how long an active indexer performs an inactivity // check. The ScaleDown.Threshold and ScaleDown.CoolDown needs to be met // for an active indexer to be destroyed. // // When unspecified, the default of 30 seconds will be used. IdleInterval time.Duration } // ScaleActionConfig holds the configuration for a scaling action type ScaleActionConfig struct { // Threshold is the number of consecutive times a scale up/down condition // has to happen for the scaling action will be triggered. Threshold uint // CoolDown is the amount of time needed to elapse between scaling actions // to trigger it. CoolDown time.Duration } // BulkIndexerConfig holds configuration for BulkIndexer. type BulkIndexerConfig struct { // Client holds the Elasticsearch client. Client elastictransport.Interface // MaxDocumentRetries holds the maximum number of document retries MaxDocumentRetries int // RetryOnDocumentStatus holds the document level statuses that will trigger a document retry. // // If RetryOnDocumentStatus is empty or nil, the default of [429] will be used. RetryOnDocumentStatus []int // CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression) // to 9 (gzip.BestCompression). Higher values provide greater compression, at a // greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the // default compression level. CompressionLevel int // Pipeline holds the ingest pipeline ID. // // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. Pipeline string // RequireDataStream, If set to true, an index will be created only if a // matching index template is found and it contains a data stream template. // When true, `require_data_stream=true` is set in the bulk request. // When false or not set, `require_data_stream` is not set in the bulk request. // Which could cause a classic index to be created if no data stream template // matches the index in the request. // // RequireDataStream is disabled by default. RequireDataStream bool // IncludeSourceOnError, if set to True, the response body of a Bulk Index request // might contain the part of source document on error. // If Unset the error reason will be dropped. // Requires Elasticsearch 8.18+ if value is True or False. // WARNING: if set to True, user is responsible for sanitizing the error as it may contain // sensitive data. // // IncludeSourceOnError is Unset by default IncludeSourceOnError Value // PopulateFailedDocsInput controls whether each BulkIndexerResponseItem.Input // in BulkIndexerResponseStat.FailedDocs is populated with the input of the item, // which includes the action line and the document line. // // WARNING: this is provided for testing and debugging only. // Use with caution as it may expose sensitive data; any clients // of go-docappender enabling this should relay this warning to // their users. Setting this will also add memory overhead. PopulateFailedDocsInput bool } // Validate checks the configuration for errors. func (cfg BulkIndexerConfig) Validate() error { var errs []error if cfg.Client == nil { errs = append(errs, fmt.Errorf("bulk_indexer: client is required")) } if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 { errs = append(errs, fmt.Errorf("expected CompressionLevel in range [-1,9], got %d", cfg.CompressionLevel, )) } return errors.Join(errs...) } // BulkIndexerConfigFrom creates a BulkIndexerConfig from the provided Config, // with additional information included as necessary. func BulkIndexerConfigFrom(cl elastictransport.Interface, cfg Config) BulkIndexerConfig { return BulkIndexerConfig{ Client: cl, MaxDocumentRetries: cfg.MaxDocumentRetries, RetryOnDocumentStatus: cfg.RetryOnDocumentStatus, CompressionLevel: cfg.CompressionLevel, Pipeline: cfg.Pipeline, RequireDataStream: cfg.RequireDataStream, IncludeSourceOnError: cfg.IncludeSourceOnError, PopulateFailedDocsInput: cfg.PopulateFailedDocsInput, } }