internal/beatcmd/reloader.go (204 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package beatcmd import ( "context" "errors" "fmt" "sync" "go.elastic.co/apm/module/apmotel/v2" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) // NewRunnerFunc is a function type that constructs a new Runner with the given // parameters. type NewRunnerFunc func(RunnerParams) (Runner, error) // RunnerParams holds parameters that will be passed by Reloader // to its NewRunnerFunc. type RunnerParams struct { // Config holds the full, raw, configuration, including apm-server.* // and output.* attributes. Config *config.C // Info holds information about the APM Server ("beat", for historical // reasons) process. Info beat.Info // Logger holds a logger to use for logging throughout the APM Server. Logger *logp.Logger // TracerProvider holds a trace.TracerProvider that can be used for // creating traces. TracerProvider trace.TracerProvider // MeterProvider holds a metric.MeterProvider that can be used for // creating metrics. The same MeterProvider is expected to be used // for each instance of the Runner, to ensure counter metrics are // not reset. // // NOTE(axw) metrics registered through this provider are used for // feeding into both Elastic APM (if enabled) and the libbeat // monitoring framework. For the latter, only gauge and counter // metrics are supported, and attributes (dimensions) are ignored. MeterProvider metric.MeterProvider MetricsGatherer *apmotel.Gatherer } // Runner is an interface returned by NewRunnerFunc. type Runner interface { // Run runs until its context is cancelled. Run(context.Context) error } // NewReloader returns a new Reloader which creates Runners using the provided // beat.Info and NewRunnerFunc. func NewReloader(info beat.Info, registry *reload.Registry, newRunner NewRunnerFunc, meterProvider metric.MeterProvider, metricGatherer *apmotel.Gatherer, tracerProvider trace.TracerProvider) (*Reloader, error) { r := &Reloader{ info: info, logger: info.Logger, newRunner: newRunner, stopped: make(chan struct{}), tracerProvider: tracerProvider, meterProvider: meterProvider, metricGatherer: metricGatherer, } if err := registry.RegisterList(reload.InputRegName, reloadableListFunc(r.reloadInputs)); err != nil { return nil, fmt.Errorf("failed to register inputs reloader: %w", err) } if err := registry.Register(reload.OutputRegName, reload.ReloadableFunc(r.reloadOutput)); err != nil { return nil, fmt.Errorf("failed to register output reloader: %w", err) } if err := registry.Register(reload.APMRegName, reload.ReloadableFunc(r.reloadAPMTracing)); err != nil { return nil, fmt.Errorf("failed to register apm tracing reloader: %w", err) } return r, nil } // Reloader responds to libbeat configuration changes by calling the given // NewRunnerFunc to create a new Runner, and then stopping any existing one. type Reloader struct { info beat.Info logger *logp.Logger newRunner NewRunnerFunc tracerProvider trace.TracerProvider meterProvider metric.MeterProvider metricGatherer *apmotel.Gatherer runner Runner stopRunner func() error mu sync.Mutex inputConfig *config.C outputConfig *config.C apmTracingConfig *config.C stopped chan struct{} } // Run runs the Reloader, blocking until ctx is cancelled or a fatal error occurs. // // Run must be called once and only once. func (r *Reloader) Run(ctx context.Context) error { defer close(r.stopped) <-ctx.Done() r.mu.Lock() defer r.mu.Unlock() if r.runner == nil { return nil } return r.stopRunner() } // reloadInput (re)loads input configuration. // It returns a *multierror.MultiError as libbeat manager error handling is tightly coupled // with its own reloadable list implementation in libbeat/cfgfile/list.go. // // Note: reloadInputs may be called before the Reloader is running. func (r *Reloader) reloadInputs(configs []*reload.ConfigWithMeta) error { if n := len(configs); n != 1 { var errs []error for _, cfg := range configs { unitErr := cfgfile.UnitError{ Err: fmt.Errorf("only 1 input supported, got %d", n), UnitID: cfg.InputUnitID, } errs = append(errs, unitErr) } return errors.Join(errs...) } r.mu.Lock() defer r.mu.Unlock() cfg := configs[0].Config // Input configuration is expected to have a monotonically // increasing revision number. revision, err := cfg.Int("revision", -1) if err != nil { return errors.Join( cfgfile.UnitError{ Err: fmt.Errorf("failed to extract input config revision: %w", err), UnitID: configs[0].InputUnitID, }, ) } if err := r.reload(cfg, r.outputConfig, r.apmTracingConfig); err != nil { return errors.Join( cfgfile.UnitError{ Err: fmt.Errorf("failed to load input config: %w", err), UnitID: configs[0].InputUnitID, }, ) } r.inputConfig = cfg r.logger.With(logp.Int64("revision", revision)).Info("loaded input config") return nil } // reloadOutput (re)loads output configuration. // // Note: reloadOutput may be called before the Reloader is running. func (r *Reloader) reloadOutput(cfg *reload.ConfigWithMeta) error { r.mu.Lock() defer r.mu.Unlock() if err := r.reload(r.inputConfig, cfg.Config, r.apmTracingConfig); err != nil { return fmt.Errorf("failed to load output config: %w", err) } r.outputConfig = cfg.Config r.logger.Info("loaded output config") return nil } // reloadAPMTracing (re)loads apm tracing configuration. func (r *Reloader) reloadAPMTracing(cfg *reload.ConfigWithMeta) error { r.mu.Lock() defer r.mu.Unlock() var c *config.C if cfg != nil { c = cfg.Config } if err := r.reload(r.inputConfig, r.outputConfig, c); err != nil { return fmt.Errorf("failed to load apm tracing config: %w", err) } r.apmTracingConfig = c r.logger.Info("loaded apm tracing config") return nil } func (r *Reloader) reload(inputConfig, outputConfig, apmTracingConfig *config.C) error { var outputNamespace config.Namespace if outputConfig != nil { if err := outputConfig.Unpack(&outputNamespace); err != nil { return err } } if inputConfig == nil || !outputNamespace.IsSet() { // Wait until both input and output have been received. // apm tracing config is not mandatory so not waiting for it return nil } select { case <-r.stopped: // The process is shutting down: ignore reloads. return nil default: } wrappedOutputConfig := config.MustNewConfigFrom(map[string]interface{}{ "output": outputConfig, }) var wrappedApmTracingConfig *config.C // apmTracingConfig is nil when disabled if apmTracingConfig != nil { c, err := apmTracingConfig.Child("elastic", -1) if err != nil { return fmt.Errorf("APM tracing config for elastic not found") } // set enabled manually as APMConfig doesn't contain it. c.SetBool("enabled", -1, true) wrappedApmTracingConfig = config.MustNewConfigFrom(map[string]interface{}{ "instrumentation": c, }) } else { // empty instrumentation config wrappedApmTracingConfig = config.NewConfig() } mergedConfig, err := config.MergeConfigs(inputConfig, wrappedOutputConfig, wrappedApmTracingConfig) if err != nil { return err } // Create a new runner. We separate creation from starting to // allow the runner to perform initialisations that must run // synchronously. newRunner, err := r.newRunner(RunnerParams{ Config: mergedConfig, Info: r.info, Logger: r.logger, TracerProvider: r.tracerProvider, MeterProvider: r.meterProvider, MetricsGatherer: r.metricGatherer, }) if err != nil { return err } // Start the new runner. var g errgroup.Group ctx, cancel := context.WithCancel(context.Background()) g.Go(func() error { if err := newRunner.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { r.logger.With(logp.Error(err)).Error("runner returned with an error") return err } return nil }) stopRunner := func() error { cancel() return g.Wait() } // Stop any existing runner. if r.runner != nil { _ = r.stopRunner() // logged above } r.runner = newRunner r.stopRunner = stopRunner return nil } type reloadableListFunc func(config []*reload.ConfigWithMeta) error func (f reloadableListFunc) Reload(configs []*reload.ConfigWithMeta) error { return f(configs) }