internal/pkg/bulk/opt.go (161 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package bulk import ( "context" "strconv" "time" "github.com/rs/zerolog" "go.elastic.co/apm/v2" "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/config" ) //----- // Transaction options type optionsT struct { Refresh bool RetryOnConflict string Indices []string WaitForCheckpoints []int64 IgnoreUnavailable bool spanLink *apm.SpanLink } type Opt func(*optionsT) func WithRefresh() Opt { return func(opt *optionsT) { opt.Refresh = true } } func WithIgnoreUnavailble() Opt { return func(opt *optionsT) { opt.IgnoreUnavailable = true } } func WithRetryOnConflict(n int) Opt { return func(opt *optionsT) { opt.RetryOnConflict = strconv.Itoa(n) } } // WithIndex sets the index when searching func WithIndex(idx string) Opt { return func(opt *optionsT) { opt.Indices = append(opt.Indices, idx) } } // WithWaitForCheckpoints will set the checkpoints parameters // Applicable to _fleet_msearch, wait_for_checkpoints parameters func WithWaitForCheckpoints(checkpoints []int64) Opt { return func(opt *optionsT) { opt.WaitForCheckpoints = checkpoints } } func withAPMLinkedContext(ctx context.Context) Opt { return func(opt *optionsT) { trace := apm.TransactionFromContext(ctx) if trace == nil { return } tCtx := trace.TraceContext() opt.spanLink = &apm.SpanLink{ Trace: tCtx.Trace, Span: tCtx.Span, } } } //----- // Bulk API options type bulkOptT struct { flushInterval time.Duration flushThresholdCnt int flushThresholdSz int maxPending int blockQueueSz int apikeyMaxParallel int apikeyMaxReqSize int policyTokens []config.PolicyToken bi build.Info } type BulkOpt func(*bulkOptT) // WithFlushInterval sets the interval on which any pending transactions will be flushed to bulker func WithFlushInterval(d time.Duration) BulkOpt { return func(opt *bulkOptT) { opt.flushInterval = d } } // WithFlushThresholdCount sets count of pending transactions that will force flush before interval func WithFlushThresholdCount(cnt int) BulkOpt { return func(opt *bulkOptT) { opt.flushThresholdCnt = cnt } } // WithFlushThresholdSize sets the cummulative size in bytes of pending transactions that will force flush before interval func WithFlushThresholdSize(sz int) BulkOpt { return func(opt *bulkOptT) { opt.flushThresholdSz = sz } } // WithMaxPending sets the number of elastic transactions pending response func WithMaxPending(max int) BulkOpt { return func(opt *bulkOptT) { opt.maxPending = max } } // WithBlockQueueSize sets the size of the internal block queue (ie. channel) func WithBlockQueueSize(sz int) BulkOpt { return func(opt *bulkOptT) { opt.blockQueueSz = sz } } // WithAPIKeyMaxParallel sets the number of api key operations outstanding func WithAPIKeyMaxParallel(max int) BulkOpt { return func(opt *bulkOptT) { opt.apikeyMaxParallel = max } } // WithAPIKeyMaxRequestSize sets the maximum size of the request body. Default 100MB func WithAPIKeyMaxRequestSize(maxBytes int) BulkOpt { return func(opt *bulkOptT) { if opt.apikeyMaxReqSize > 0 { opt.apikeyMaxReqSize = maxBytes } } } // WithStaticPolicyTokens sets the static policy tokens. Default is empty func WithPolicyTokens(tokens []config.PolicyToken) BulkOpt { return func(opt *bulkOptT) { opt.policyTokens = tokens } } func WithBi(bi build.Info) BulkOpt { return func(opt *bulkOptT) { opt.bi = bi } } func parseBulkOpts(opts ...BulkOpt) bulkOptT { bopt := bulkOptT{ flushInterval: defaultFlushInterval, flushThresholdCnt: defaultFlushThresholdCnt, flushThresholdSz: defaultFlushThresholdSz, maxPending: defaultMaxPending, apikeyMaxParallel: defaultAPIKeyMaxParallel, blockQueueSz: defaultBlockQueueSz, apikeyMaxReqSize: defaultApikeyMaxReqSize, policyTokens: []config.PolicyToken{}, // default is empty } for _, f := range opts { f(&bopt) } return bopt } func (o *bulkOptT) MarshalZerologObject(e *zerolog.Event) { e.Dur("flushInterval", o.flushInterval) e.Int("flushThresholdCnt", o.flushThresholdCnt) e.Int("flushThresholdSz", o.flushThresholdSz) e.Int("maxPending", o.maxPending) e.Int("blockQueueSz", o.blockQueueSz) e.Int("apikeyMaxParallel", o.apikeyMaxParallel) e.Int("apikeyMaxReqSize", o.apikeyMaxReqSize) } // BulkOptsFromCfg transforms config to a slize of BulkOpt // used to bridge to configuration subsystem func BulkOptsFromCfg(cfg *config.Config) []BulkOpt { bulkCfg := cfg.Inputs[0].Server.Bulk // Attempt to slice the max number of connections to leave room for the bulk flush queues maxKeyParallel := cfg.Output.Elasticsearch.MaxConnPerHost if cfg.Output.Elasticsearch.MaxConnPerHost > bulkCfg.FlushMaxPending { maxKeyParallel = cfg.Output.Elasticsearch.MaxConnPerHost - bulkCfg.FlushMaxPending } policyTokens := []config.PolicyToken{} if cfg.Inputs[0].Server.StaticPolicyTokens.Enabled { policyTokens = cfg.Inputs[0].Server.StaticPolicyTokens.PolicyTokens } return []BulkOpt{ WithFlushInterval(bulkCfg.FlushInterval), WithFlushThresholdCount(bulkCfg.FlushThresholdCount), WithFlushThresholdSize(bulkCfg.FlushThresholdSize), WithMaxPending(bulkCfg.FlushMaxPending), WithAPIKeyMaxParallel(maxKeyParallel), WithAPIKeyMaxRequestSize(cfg.Output.Elasticsearch.MaxContentLength), WithPolicyTokens(policyTokens), } }