pkg/stanza/fileconsumer/config.go (222 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 //go:generate mdatagen metadata.yaml package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( "bufio" "errors" "fmt" "runtime" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" "go.uber.org/zap" "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) const ( defaultMaxConcurrentFiles = 1024 defaultEncoding = "utf-8" defaultPollInterval = 200 * time.Millisecond ) var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( "filelog.allowFileDeletion", featuregate.StageAlpha, featuregate.WithRegisterDescription("When enabled, allows usage of the `delete_after_read` setting."), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"), ) var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister( "filelog.allowHeaderMetadataParsing", featuregate.StageBeta, featuregate.WithRegisterDescription("When enabled, allows usage of the `header` setting."), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18198"), ) // NewConfig creates a new input config with default values func NewConfig() *Config { return &Config{ PollInterval: defaultPollInterval, MaxConcurrentFiles: defaultMaxConcurrentFiles, StartAt: "end", FingerprintSize: fingerprint.DefaultSize, InitialBufferSize: scanner.DefaultBufferSize, MaxLogSize: reader.DefaultMaxLogSize, Encoding: defaultEncoding, FlushPeriod: reader.DefaultFlushPeriod, Resolver: attrs.Resolver{ IncludeFileName: true, }, } } // Config is the configuration of a file input operator type Config struct { matcher.Criteria `mapstructure:",squash"` attrs.Resolver `mapstructure:",squash"` PollInterval time.Duration `mapstructure:"poll_interval,omitempty"` MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` MaxBatches int `mapstructure:"max_batches,omitempty"` StartAt string `mapstructure:"start_at,omitempty"` FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` InitialBufferSize helper.ByteSize `mapstructure:"initial_buffer_size,omitempty"` MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` Encoding string `mapstructure:"encoding,omitempty"` SplitConfig split.Config `mapstructure:"multiline,omitempty"` TrimConfig trim.Config `mapstructure:",squash,omitempty"` FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"` Header *HeaderConfig `mapstructure:"header,omitempty"` DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"` Compression string `mapstructure:"compression,omitempty"` PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"` } type HeaderConfig struct { Pattern string `mapstructure:"pattern"` MetadataOperators []operator.Config `mapstructure:"metadata_operators"` } func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts ...Option) (*Manager, error) { if err := c.validate(); err != nil { return nil, err } if emit == nil { return nil, fmt.Errorf("must provide emit function") } o := new(options) for _, opt := range opts { opt(o) } enc, err := textutils.LookupEncoding(c.Encoding) if err != nil { return nil, fmt.Errorf("failed to find encoding: %w", err) } splitFunc := o.splitFunc if splitFunc == nil { splitFunc, err = c.SplitConfig.Func(enc, false, int(c.MaxLogSize)) if err != nil { return nil, err } } trimFunc := trim.Nop if enc != encoding.Nop { trimFunc = c.TrimConfig.Func() } var startAtBeginning bool switch c.StartAt { case "beginning": startAtBeginning = true case "end": startAtBeginning = false default: return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt) } var hCfg *header.Config if c.Header != nil { hCfg, err = header.NewConfig(set, c.Header.Pattern, c.Header.MetadataOperators, enc) if err != nil { return nil, fmt.Errorf("failed to build header config: %w", err) } } fileMatcher, err := matcher.New(c.Criteria) if err != nil { return nil, err } set.Logger = set.Logger.With(zap.String("component", "fileconsumer")) readerFactory := reader.Factory{ TelemetrySettings: set, FromBeginning: startAtBeginning, FingerprintSize: int(c.FingerprintSize), InitialBufferSize: int(c.InitialBufferSize), MaxLogSize: int(c.MaxLogSize), Encoding: enc, SplitFunc: splitFunc, TrimFunc: trimFunc, FlushTimeout: c.FlushPeriod, EmitFunc: emit, Attributes: c.Resolver, HeaderConfig: hCfg, DeleteAtEOF: c.DeleteAfterRead, IncludeFileRecordNumber: c.IncludeFileRecordNumber, Compression: c.Compression, AcquireFSLock: c.AcquireFSLock, } telemetryBuilder, err := metadata.NewTelemetryBuilder(set) if err != nil { return nil, err } return &Manager{ set: set, readerFactory: readerFactory, fileMatcher: fileMatcher, pollInterval: c.PollInterval, maxBatchFiles: c.MaxConcurrentFiles / 2, maxBatches: c.MaxBatches, telemetryBuilder: telemetryBuilder, noTracking: o.noTracking, }, nil } func (c Config) validate() error { if _, err := matcher.New(c.Criteria); err != nil { return err } if c.FingerprintSize < fingerprint.MinSize { return fmt.Errorf("'fingerprint_size' must be at least %d bytes", fingerprint.MinSize) } if c.MaxLogSize <= 0 { return fmt.Errorf("'max_log_size' must be positive") } if c.MaxConcurrentFiles < 1 { return fmt.Errorf("'max_concurrent_files' must be positive") } if c.MaxBatches < 0 { return errors.New("'max_batches' must not be negative") } enc, err := textutils.LookupEncoding(c.Encoding) if err != nil { return err } if c.DeleteAfterRead { if !allowFileDeletion.IsEnabled() { return fmt.Errorf("'delete_after_read' requires feature gate '%s'", allowFileDeletion.ID()) } if c.StartAt == "end" { return fmt.Errorf("'delete_after_read' cannot be used with 'start_at: end'") } } if c.Header != nil { if !AllowHeaderMetadataParsing.IsEnabled() { return fmt.Errorf("'header' requires feature gate '%s'", AllowHeaderMetadataParsing.ID()) } if c.StartAt == "end" { return fmt.Errorf("'header' cannot be specified with 'start_at: end'") } set := component.TelemetrySettings{Logger: zap.NewNop()} if _, errConfig := header.NewConfig(set, c.Header.Pattern, c.Header.MetadataOperators, enc); errConfig != nil { return fmt.Errorf("invalid config for 'header': %w", errConfig) } } if runtime.GOOS == "windows" && (c.IncludeFileOwnerName || c.IncludeFileOwnerGroupName) { return fmt.Errorf("'include_file_owner_name' or 'include_file_owner_group_name' it's not supported for windows: %w", err) } return nil } type options struct { splitFunc bufio.SplitFunc noTracking bool } type Option func(*options) // WithSplitFunc overrides the split func which is normally built from other settings on the config func WithSplitFunc(f bufio.SplitFunc) Option { return func(o *options) { o.splitFunc = f } } // WithNoTracking forces the readerFactory to not keep track of files in memory. When used, the reader will // read from the beginning of each file every time it is polled. func WithNoTracking() Option { return func(o *options) { o.noTracking = true } }