x-pack/libbeat/management/managerV2.go (712 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package management import ( "context" "errors" "fmt" "os" "os/signal" "sync" "syscall" "time" "go.uber.org/zap/zapcore" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" gproto "google.golang.org/protobuf/proto" "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/features" lbmanagement "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) // diagnosticHandler is a wrapper type that's a bit of a hack, the compiler won't let us send the raw unit struct, // since there's a type disagreement with the `client.DiagnosticHook` argument, and due to licensing issues we can't import the agent client types into the reloader type diagnosticHandler struct { log *logp.Logger client *agentUnit } func (handler diagnosticHandler) Register(name string, description string, filename string, contentType string, callback func() []byte) { handler.log.Infof("registering callback with %s", name) // paranoid checking if handler.client != nil { handler.client.RegisterDiagnosticHook(name, description, filename, contentType, callback) } else { handler.log.Warnf("client handler for diag callback %s is nil", name) } } // unitKey is used to identify a unique unit in a map // the `ID` of a unit in itself is not unique without its type, only `Type` + `ID` is unique type unitKey struct { Type client.UnitType ID string } // BeatV2Manager is the main type for tracing V2-related config updates type BeatV2Manager struct { config *Config registry *reload.Registry client client.V2 logger *logp.Logger // handles client errors errCanceller context.CancelFunc // track individual units given to us by the V2 API mx sync.Mutex units map[unitKey]*agentUnit actions []client.Action forceReload bool // status is reported as a whole for every unit sent to this component // hopefully this can be improved in the future to be separated per unit status status.Status message string payload map[string]interface{} // stop callback must be registered by libbeat, as with the V1 callback stopFunc func() stopOnOutputReload bool stopOnEmptyUnits bool stopMut sync.Mutex beatStop sync.Once // sync channel for shutting down the manager after we get a stop from // either the agent or the beat stopChan chan struct{} isRunning bool // set with the last applied output config // allows tracking if the configuration actually changed and if the // beat needs to restart if stopOnOutputReload is set lastOutputCfg *proto.UnitExpectedConfig // set with the last applied input configs lastInputCfgs map[string]*proto.UnitExpectedConfig // set with the last applied APM config lastAPMCfg *proto.APMConfig // used for the debug callback to report as-running config lastBeatOutputCfg *reload.ConfigWithMeta lastBeatInputCfgs []*reload.ConfigWithMeta lastBeatFeaturesCfg *conf.C lastBeatAPMCfg *reload.ConfigWithMeta // changeDebounce is the debounce time for a configuration change changeDebounce time.Duration // forceReloadDebounce is the time the manager will wait before // trying to reload the configuration after an input not finished error // happens forceReloadDebounce time.Duration } // ================================ // Optionals // ================================ // WithStopOnEmptyUnits enables stopping the beat when agent sends no units. func WithStopOnEmptyUnits(m *BeatV2Manager) { m.stopOnEmptyUnits = true } // WithChangeDebounce sets the changeDeboung value func WithChangeDebounce(d time.Duration) func(b *BeatV2Manager) { return func(b *BeatV2Manager) { b.changeDebounce = d } } // WithForceReloadDebounce sets the forceReloadDebounce value func WithForceReloadDebounce(d time.Duration) func(b *BeatV2Manager) { return func(b *BeatV2Manager) { b.forceReloadDebounce = d } } // ================================ // Init Functions // ================================ // Register the agent manager, so that calls to lbmanagement.NewManager will // invoke NewV2AgentManager when linked with x-pack. func init() { lbmanagement.SetManagerFactory(NewV2AgentManager) } // NewV2AgentManager returns a remote config manager for the agent V2 protocol. // This is registered as the manager factory in init() so that calls to // lbmanagement.NewManager will be forwarded here. func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { logger := logp.NewLogger(lbmanagement.DebugK).Named("V2-manager") c := DefaultConfig() if config.Enabled() { if err := config.Unpack(&c); err != nil { return nil, fmt.Errorf("parsing fleet management settings: %w", err) } } versionInfo := client.VersionInfo{ Name: "beat-v2-client", BuildHash: version.Commit(), Meta: map[string]string{ "commit": version.Commit(), "build_time": version.BuildTime().String(), }, } var agentClient client.V2 var err error if c.InsecureGRPCURLForTesting != "" && c.Enabled { // Insecure for testing Elastic-Agent-Client initialisation logger.Info("Using INSECURE GRPC connection, this should be only used for testing!") agentClient = client.NewV2(c.InsecureGRPCURLForTesting, "", // Insecure connection for test, no token needed versionInfo, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) } else { // Normal Elastic-Agent-Client initialisation agentClient, _, err = client.NewV2FromReader(os.Stdin, versionInfo) if err != nil { return nil, fmt.Errorf("error reading control config from agent: %w", err) } } // officially running under the elastic-agent; we set the publisher pipeline // to inform it that we are running under elastic-agent (used to ensure "Publish event: " // debug log messages are only outputted when running in trace mode lbmanagement.SetUnderAgent(true) return NewV2AgentManagerWithClient(c, registry, agentClient) } // NewV2AgentManagerWithClient actually creates the manager instance used by the rest of the beats. func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agentClient client.V2, opts ...func(*BeatV2Manager)) (lbmanagement.Manager, error) { log := logp.NewLogger(lbmanagement.DebugK) if config.RestartOnOutputChange { log.Infof("Output reload is enabled, the beat will restart as needed on change of output config") } m := &BeatV2Manager{ stopOnOutputReload: config.RestartOnOutputChange, config: config, logger: log.Named("V2-manager"), registry: registry, units: make(map[unitKey]*agentUnit), status: status.Running, message: "Healthy", stopChan: make(chan struct{}, 1), changeDebounce: time.Second, // forceReloadDebounce is greater than changeDebounce because it is only // used when an input has not reached its finished state, this means some events // still need to be acked by the acker, hence the longer we wait the more likely // for the input to have reached its finished state. forceReloadDebounce: time.Second * 10, } if config.Enabled { m.client = agentClient } for _, o := range opts { o(m) } return m, nil } // ================================ // Beats central management interface implementation // ================================ func (cm *BeatV2Manager) AgentInfo() client.AgentInfo { if cm.client.AgentInfo() == nil { return client.AgentInfo{} } return *cm.client.AgentInfo() } // RegisterDiagnosticHook will register a diagnostic callback function when elastic-agent asks for a diagnostics dump func (cm *BeatV2Manager) RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) { cm.client.RegisterDiagnosticHook(name, description, filename, contentType, hook) } // UpdateStatus updates the manager with the current status for the beat. func (cm *BeatV2Manager) UpdateStatus(status status.Status, msg string) { cm.mx.Lock() defer cm.mx.Unlock() cm.status = status cm.message = msg cm.updateStatuses() } // Enabled returns true if config management is enabled. func (cm *BeatV2Manager) Enabled() bool { return cm.config.Enabled } // SetStopCallback sets the callback to run when the manager want to shut down the beats gracefully. func (cm *BeatV2Manager) SetStopCallback(stopFunc func()) { cm.stopMut.Lock() defer cm.stopMut.Unlock() cm.stopFunc = stopFunc } // Start the config manager. func (cm *BeatV2Manager) Start() error { if !cm.Enabled() { return fmt.Errorf("V2 Manager is disabled") } if cm.errCanceller != nil { cm.errCanceller() cm.errCanceller = nil } ctx := context.Background() err := cm.client.Start(ctx) if err != nil { return fmt.Errorf("error starting connection to client") } ctx, canceller := context.WithCancel(ctx) cm.errCanceller = canceller go cm.watchErrChan(ctx) cm.client.RegisterDiagnosticHook( "beat-rendered-config", "the rendered config used by the beat", "beat-rendered-config.yml", "application/yaml", cm.handleDebugYaml) go cm.unitListen() cm.isRunning = true return nil } // Stop stops the current Manager and close the connection to Elastic Agent. func (cm *BeatV2Manager) Stop() { cm.stopChan <- struct{}{} } // CheckRawConfig is currently not implemented for V1. func (cm *BeatV2Manager) CheckRawConfig(_ *conf.C) error { // This does not do anything on V1 or V2, but here we are return nil } // RegisterAction adds a V2 client action func (cm *BeatV2Manager) RegisterAction(action client.Action) { cm.mx.Lock() defer cm.mx.Unlock() cm.actions = append(cm.actions, action) for _, unit := range cm.units { // actions are only registered on input units (not a requirement by Agent but // don't see a need in beats to support actions on an output at the moment) if clientUnit := unit; clientUnit != nil && clientUnit.Type() == client.UnitTypeInput { clientUnit.RegisterAction(action) } } } // UnregisterAction removes a V2 client action func (cm *BeatV2Manager) UnregisterAction(action client.Action) { cm.mx.Lock() defer cm.mx.Unlock() // remove the registered action i := func() int { for i, a := range cm.actions { if a.Name() == action.Name() { return i } } return -1 }() if i == -1 { // not registered return } cm.actions = append(cm.actions[:i], cm.actions[i+1:]...) for _, unit := range cm.units { // actions are only registered on input units (not a requirement by Agent but // don't see a need in beats to support actions on an output at the moment) if clientUnit := unit; clientUnit != nil && clientUnit.Type() == client.UnitTypeInput { clientUnit.UnregisterAction(action) } } } // SetPayload sets the global payload for the V2 client func (cm *BeatV2Manager) SetPayload(payload map[string]interface{}) { cm.mx.Lock() defer cm.mx.Unlock() cm.payload = payload cm.updateStatuses() } // updateStatuses updates the status for all units to match the status of the entire manager. // // This is done because beats at the moment cannot fully manage different status per unit, something // that is new in the V2 control protocol but not supported in beats itself. // // Errors while starting/reloading inputs are already reported by unit, but // the shutdown process is still not being handled by unit. func (cm *BeatV2Manager) updateStatuses() { message := cm.message payload := cm.payload for _, unit := range cm.units { expected := unit.Expected() if expected.State == client.UnitStateStopped { // unit is expected to be stopping (don't adjust the state as the state is now managed by the // `reload` method and will be marked stopped in that code path) continue } err := unit.UpdateState(cm.status, message, payload) if err != nil { cm.logger.Errorf("Failed to update unit %s status: %s", unit.ID(), err) } } } // ================================ // Unit manager // ================================ func (cm *BeatV2Manager) upsertUnit(unit *client.Unit) { cm.mx.Lock() defer cm.mx.Unlock() aUnit, ok := cm.units[unitKey{unit.Type(), unit.ID()}] if ok { aUnit.update(unit) } else { unitLogger := cm.logger.Named(fmt.Sprintf("state-unit-%s", unit.ID())) aUnit = newAgentUnit(unit, unitLogger) cm.units[unitKey{unit.Type(), unit.ID()}] = aUnit } // update specific unit to starting _ = aUnit.UpdateState(status.Starting, "Starting", nil) // register the already registered actions (only on input units) for _, action := range cm.actions { aUnit.RegisterAction(action) } } func (cm *BeatV2Manager) updateUnit(unit *client.Unit) { // `unit` is already in `cm.units` no need to add it to the map again // but the lock still needs to be held so reload can be triggered cm.mx.Lock() defer cm.mx.Unlock() // no need to update cm.units because the elastic-agent-client and the beats share // the pointer to each unit, so when the client updates a unit on its side, it // is reflected here. As this deals with modifications, they're already present. // Only the state needs to be updated. aUnit, ok := cm.units[unitKey{unit.Type(), unit.ID()}] if !ok { cm.logger.Infof("BeatV2Manager.updateUnit Unit %s not found", unit.ID()) return } aUnit.update(unit) expected := unit.Expected() if expected.State == client.UnitStateStopped { // expected to be stopped; needs to stop this unit _ = aUnit.UpdateState(status.Stopping, "Stopping", nil) } else { // update specific unit to configuring _ = aUnit.UpdateState(status.Configuring, "Configuring", nil) } } func (cm *BeatV2Manager) softDeleteUnit(unit *client.Unit) { cm.mx.Lock() defer cm.mx.Unlock() key := unitKey{unit.Type(), unit.ID()} if aUnit, ok := cm.units[key]; ok { aUnit.markAsDeleted() } } // ================================ // Private V2 implementation // ================================ func (cm *BeatV2Manager) watchErrChan(ctx context.Context) { for { select { case <-ctx.Done(): return case err := <-cm.client.Errors(): // Don't print the context canceled errors that happen normally during shutdown, restart, etc if !errors.Is(context.Canceled, err) { cm.logger.Errorf("elastic-agent-client error: %s", err) } } } } func (cm *BeatV2Manager) unitListen() { // register signal handler sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) // timer is used to provide debounce on unit changes // this allows multiple changes to come in and only a single reload be performed t := time.NewTimer(cm.changeDebounce) t.Stop() // starts stopped, until a change occurs cm.logger.Debug("Listening for agent unit changes") for { select { // The stopChan channel comes from the Manager interface Stop() method case <-cm.stopChan: cm.stopBeat() case sig := <-sigc: // we can't duplicate the same logic used by stopChan here. // A beat will also watch for sigint and shut down, if we call the stopFunc // callback, either the V2 client or the beat will get a panic, // as the stopFunc sent by the beats is usually unsafe. switch sig { case syscall.SIGINT, syscall.SIGTERM: cm.logger.Debug("Received sigterm/sigint, stopping") case syscall.SIGHUP: cm.logger.Debug("Received sighup, stopping") } cm.isRunning = false cm.UpdateStatus(status.Stopping, "Stopping") return case change := <-cm.client.UnitChanges(): cm.logger.Infof( "BeatV2Manager.unitListen UnitChanged.ID(%s), UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s", change.Unit.ID(), change.Type, int64(change.Triggers), change.Type, change.Triggers) switch change.Type { // Within the context of how we send config to beats, I'm not sure if there is a difference between // A unit add and a unit change, since either way we can't do much more than call the reloader case client.UnitChangedAdded: cm.upsertUnit(change.Unit) // reset can be called here because `<-t.C` is handled in the same select t.Reset(cm.changeDebounce) case client.UnitChangedModified: cm.updateUnit(change.Unit) // reset can be called here because `<-t.C` is handled in the same select t.Reset(cm.changeDebounce) case client.UnitChangedRemoved: // necessary to soft-delete here and follow up with the actual deletion of units // in `<-t.C` to avoid deleting a unit that will be re-created before `<-t.C` // expires where the respective runners will not reload; actual deleting here // can cause a runner to lose ref to a unit cm.softDeleteUnit(change.Unit) } case <-t.C: // a copy of the units is used for reload to prevent the holding of the `cm.mx`. // it could be possible that sending the configuration to reload could cause the `UpdateStatus` // to be called on the manager causing it to try and grab the `cm.mx` lock, causing a deadlock. cm.mx.Lock() units := make(map[unitKey]*agentUnit, len(cm.units)) for k, u := range cm.units { if u.softDeleted { delete(cm.units, k) continue } units[k] = u } cm.mx.Unlock() if len(cm.units) == 0 && cm.stopOnEmptyUnits { cm.stopBeat() } cm.reload(units) if cm.forceReload { // Restart the debounce timer so we try to reload the inputs. t.Reset(cm.forceReloadDebounce) } } } } func (cm *BeatV2Manager) stopBeat() { if !cm.isRunning { return } cm.logger.Debugf("Stopping beat") cm.UpdateStatus(status.Stopping, "Stopping") cm.isRunning = false cm.stopMut.Lock() defer cm.stopMut.Unlock() if cm.stopFunc != nil { // I'm not 100% sure the once here is needed, // but various beats tend to handle this in a not-quite-safe way cm.beatStop.Do(cm.stopFunc) } cm.client.Stop() cm.UpdateStatus(status.Stopped, "Stopped") if cm.errCanceller != nil { cm.errCanceller() cm.errCanceller = nil } } func (cm *BeatV2Manager) reload(units map[unitKey]*agentUnit) { lowestLevel := client.UnitLogLevelError var outputUnit *agentUnit var inputUnits []*agentUnit var stoppingUnits []*agentUnit healthyInputs := map[string]*agentUnit{} unitErrors := map[string][]error{} // as the very last action, set the state of the failed units defer func() { for _, unit := range units { errs := unitErrors[unit.ID()] if len(errs) != 0 { _ = unit.UpdateState(status.Failed, errors.Join(errs...).Error(), nil) } } }() for _, unit := range units { expected := unit.Expected() if expected.LogLevel > lowestLevel { // log level is still used from an expected stopped unit until // the unit is completely removed (aka. fully stopped) lowestLevel = expected.LogLevel } if expected.Features != nil { // unit is expected to update its feature flags featuresCfg, err := features.NewConfigFromProto(expected.Features) if err != nil { unitErrors[unit.ID()] = append(unitErrors[unit.ID()], err) } if err := features.UpdateFromConfig(featuresCfg); err != nil { unitErrors[unit.ID()] = append(unitErrors[unit.ID()], err) } cm.lastBeatFeaturesCfg = featuresCfg } if expected.State == client.UnitStateStopped { // unit is being stopped // // we keep the unit so after reload is performed // these units can be marked as stopped stoppingUnits = append(stoppingUnits, unit) continue } else if expected.State != client.UnitStateHealthy { // only stopped or healthy are known (and expected) state // for a unit cm.logger.Errorf("unit %s has an unknown state %+v", unit.ID(), expected.State) } if unit.Type() == client.UnitTypeOutput { outputUnit = unit } else if unit.Type() == client.UnitTypeInput { inputUnits = append(inputUnits, unit) healthyInputs[unit.ID()] = unit } else { cm.logger.Errorf("unit %s as an unknown type %+v", unit.ID(), unit.Type()) } } // set the new log level (if nothing has changed is a noop) ll, trace := getZapcoreLevel(lowestLevel) logp.SetLevel(ll) lbmanagement.SetUnderAgentTrace(trace) // reload the output configuration restartBeat, err := cm.reloadOutput(outputUnit) // The manager has already signalled the Beat to stop, // there is nothing else to do. Trying to reload inputs // will only lead to invalid state updates and possible // race conditions. if restartBeat { return } if err != nil { // Output creation failed, there is no point in going any further // because there is no output to read events. // // Trying to start inputs will eventually lead them to deadlock // waiting for the output. Log input will deadlock when starting, // effectively blocking this manager. cm.logger.Errorw("could not start output", "error", err) msg := fmt.Sprintf("could not start output: %s", err) if err := outputUnit.UpdateState(status.Failed, msg, nil); err != nil { cm.logger.Errorw("setting output state", "error", err) } return } if err := outputUnit.UpdateState(status.Running, "Healthy", nil); err != nil { cm.logger.Errorw("setting output state", "error", err) } // reload APM tracing configuration // all error handling is handled inside of reloadAPM cm.reloadAPM(outputUnit) // compute the input configuration // // in v2 only a single input type will be started per component, so we don't need to // worry about getting multiple re-loaders (we just need the one for the type) if err := cm.reloadInputs(inputUnits); err != nil { // HERE // cm.reloadInputs will use fmt.Errorf and join an error slice // using errors.Join, so we need to unwrap the fmt wrapped error, // then we can iterate over the errors list. err = errors.Unwrap(err) type unwrapList interface { Unwrap() []error } //nolint:errorlint // That's a custom logic based on how reloadInputs builds the error errList, isErrList := err.(unwrapList) if isErrList { for _, err := range errList.Unwrap() { unitErr := cfgfile.UnitError{} if errors.As(err, &unitErr) { unitErrors[unitErr.UnitID] = append(unitErrors[unitErr.UnitID], unitErr.Err) delete(healthyInputs, unitErr.UnitID) } } } } // report the stopping units as stopped for _, unit := range stoppingUnits { _ = unit.UpdateState(status.Stopped, "Stopped", nil) } // now update the statuses of all units that contain only healthy // inputs. If there isn't an error with the inputs, we set the unit as // healthy because there is no way to know more information about its inputs. for _, unit := range healthyInputs { expected := unit.Expected() if expected.State == client.UnitStateStopped { // unit is expected to be stopping (don't adjust the state as the state is now managed by the // `reload` method and will be marked stopped in that code path) continue } err := unit.UpdateState(status.Running, "Healthy", nil) if err != nil { cm.logger.Errorf("Failed to update unit %s status: %s", unit.ID(), err) } } } // reloadOutput reload outputs, it returns a bool and an error. // The bool, if set, indicates that the output reload requires an restart, // in that case the error is always `nil`. // // In any other case, the bool is always false and the error will be non nil // if any error has occurred. func (cm *BeatV2Manager) reloadOutput(unit *agentUnit) (bool, error) { // Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go output := cm.registry.GetReloadableOutput() if output == nil { return false, fmt.Errorf("failed to find beat reloadable type 'output'") } if unit == nil { // output is being stopped err := output.Reload(nil) if err != nil { return false, fmt.Errorf("failed to reload output: %w", err) } cm.lastOutputCfg = nil cm.lastBeatOutputCfg = nil return false, nil } expected := unit.Expected() if expected.Config == nil { // should not happen; hard stop return false, fmt.Errorf("output unit has no config") } if cm.lastOutputCfg != nil && gproto.Equal(cm.lastOutputCfg, expected.Config) { // configuration for the output did not change; do nothing cm.logger.Debug("Skipped reloading output; configuration didn't change") return false, nil } cm.logger.Debugf("Got output unit config '%s'", expected.Config.GetId()) if cm.stopOnOutputReload && cm.lastOutputCfg != nil { cm.logger.Info("beat is restarting because output changed") _ = unit.UpdateState(status.Stopping, "Restarting", nil) cm.Stop() return true, nil } reloadConfig, err := groupByOutputs(expected.Config) if err != nil { return false, fmt.Errorf("failed to generate config for output: %w", err) } // Set those variables regardless of the outcome of output.Reload // this ensures that if we're on a failed output state and a new // output configuration is sent, the Beat will gracefully exit cm.lastOutputCfg = expected.Config cm.lastBeatOutputCfg = reloadConfig err = output.Reload(reloadConfig) if err != nil { return false, fmt.Errorf("failed to reload output: %w", err) } return false, nil } func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error { obj := cm.registry.GetInputList() if obj == nil { return fmt.Errorf("failed to find beat reloadable type 'input'") } inputCfgs := make(map[string]*proto.UnitExpectedConfig, len(inputUnits)) inputBeatCfgs := make([]*reload.ConfigWithMeta, 0, len(inputUnits)) agentInfo := cm.client.AgentInfo() for _, unit := range inputUnits { expected := unit.Expected() if expected.Config == nil { // should not happen; hard stop return fmt.Errorf("input unit %s has no config", unit.ID()) } inputCfg, err := generateBeatConfig(expected.Config, agentInfo) if err != nil { return fmt.Errorf("failed to generate configuration for unit %s: %w", unit.ID(), err) } // add diag callbacks for unit // we want to add the diagnostic handler that's specific to the unit, and not the gobal diagnostic handler for idx, in := range inputCfg { in.DiagCallback = diagnosticHandler{client: unit, log: cm.logger.Named("diagnostic-manager")} in.InputUnitID = unit.ID() in.StatusReporter = unit.GetReporterForStreamByIndex(idx) } inputCfgs[unit.ID()] = expected.Config inputBeatCfgs = append(inputBeatCfgs, inputCfg...) } if !didChange(cm.lastInputCfgs, inputCfgs) && !cm.forceReload { cm.logger.Debug("Skipped reloading input units; configuration didn't change") return nil } if cm.forceReload { cm.logger.Info("Reloading Beats inputs because forceReload is true. " + "Set log level to debug to get more information about which " + "inputs are causing this.") } if err := obj.Reload(inputBeatCfgs); err != nil { var errs []error // At the moment this logic is tightly bound to the current RunnerList // implementation from libbeat/cfgfile/list.go and Input.loadStates from // filebeat/input/log/input.go. // If they change the way they report errors, this will break. // TODO (Tiago): update all layers to use the most recent features from // the standard library errors package. type unwrapList interface { Unwrap() []error } errList, isErrList := err.(unwrapList) //nolint:errorlint // see the comment above if isErrList { for _, err := range errList.Unwrap() { causeErr := errors.Unwrap(err) // A Log input is only marked as finished when all events it // produced are acked by the acker so when we see this error, // we just retry until the new input can be started. // This is the same logic used by the standalone configuration file // reloader implemented on libbeat/cfgfile/reload.go inputNotFinishedErr := &common.ErrInputNotFinished{} if ok := errors.As(causeErr, &inputNotFinishedErr); ok { cm.logger.Debugf("file '%s' is not finished, will retry starting the input soon", inputNotFinishedErr.File) cm.forceReload = true cm.logger.Debug("ForceReload set to TRUE") continue } // This is an error that cannot be ignored, so we report it errs = append(errs, err) } } if len(errs) != 0 { return fmt.Errorf("failed to reload inputs: %w", errors.Join(errs...)) } } else { // If there was no error reloading input and forceReload was // true, then set it to false. This prevents unnecessary logging // and makes it clear this was the moment when the input reload // finally worked. if cm.forceReload { cm.forceReload = false cm.logger.Debug("ForceReload set to FALSE") } } cm.lastInputCfgs = inputCfgs cm.lastBeatInputCfgs = inputBeatCfgs return nil } // reloadAPM reloads APM tracing // // An error is not returned from this function, because in no case do we want APM trace configuration // to cause the beat to fail. The error is logged appropriately in the case of a failure on reload. func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) { // Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go apm := cm.registry.GetReloadableAPM() if apm == nil { // no APM reloadable, nothing to do cm.logger.Debug("Unable to reload APM tracing; no APM reloadable registered") return } var apmConfig *proto.APMConfig if unit != nil { expected := unit.Expected() if expected.APMConfig != nil { apmConfig = expected.APMConfig } } if (cm.lastAPMCfg == nil && apmConfig == nil) || (cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig)) { // configuration for the APM tracing did not change; do nothing cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change") return } if apmConfig == nil { // APM tracing is being stopped cm.logger.Debug("Stopping APM tracing") err := apm.Reload(nil) if err != nil { cm.logger.Errorf("Error stopping APM tracing: %s", err) return } cm.lastAPMCfg = nil cm.lastBeatAPMCfg = nil cm.logger.Debug("Stopped APM tracing") return } uconfig, err := conf.NewConfigFrom(apmConfig) if err != nil { cm.logger.Errorf("Failed to create uconfig from APM configuration: %s", err) return } reloadConfig := &reload.ConfigWithMeta{Config: uconfig} cm.logger.Debug("Reloading APM tracing") err = apm.Reload(reloadConfig) if err != nil { cm.logger.Debugf("Error reloading APM tracing: %s", err) return } cm.lastAPMCfg = apmConfig cm.lastBeatAPMCfg = reloadConfig cm.logger.Debugf("Reloaded APM tracing") } // this function is registered as a debug hook // it prints the last known configuration generated by the beat func (cm *BeatV2Manager) handleDebugYaml() []byte { // generate input inputList := []map[string]interface{}{} for _, module := range cm.lastBeatInputCfgs { var inputMap map[string]interface{} err := module.Config.Unpack(&inputMap) if err != nil { cm.logger.Errorf("error unpacking input config for debug callback: %s", err) return nil } inputList = append(inputList, inputMap) } // generate output outputCfg := map[string]interface{}{} if cm.lastBeatOutputCfg != nil { err := cm.lastBeatOutputCfg.Config.Unpack(&outputCfg) if err != nil { cm.logger.Errorf("error unpacking output config for debug callback: %s", err) return nil } } // generate features var featuresCfg map[string]interface{} if cm.lastBeatFeaturesCfg != nil { if err := cm.lastBeatFeaturesCfg.Unpack(&featuresCfg); err != nil { cm.logger.Errorf("error unpacking feature flags config for debug callback: %s", err) return nil } } // generate APM var apmCfg map[string]interface{} if cm.lastBeatAPMCfg != nil { if err := cm.lastBeatAPMCfg.Config.Unpack(&apmCfg); err != nil { cm.logger.Errorf("error unpacking APM tracing config for debug callback: %s", err) return nil } } // combine all of the above in a somewhat coherent way // This isn't perfect, but generating a config that can actually be fed back into the beat // would require beatCfg := struct { Inputs []map[string]interface{} Outputs map[string]interface{} Features map[string]interface{} APM map[string]interface{} }{ Inputs: inputList, Outputs: outputCfg, Features: featuresCfg, APM: apmCfg, } data, err := yaml.Marshal(beatCfg) if err != nil { cm.logger.Errorf("error generating YAML for input debug callback: %w", err) return nil } return data } func getZapcoreLevel(ll client.UnitLogLevel) (zapcore.Level, bool) { switch ll { case client.UnitLogLevelError: return zapcore.ErrorLevel, false case client.UnitLogLevelWarn: return zapcore.WarnLevel, false case client.UnitLogLevelInfo: return zapcore.InfoLevel, false case client.UnitLogLevelDebug: return zapcore.DebugLevel, false case client.UnitLogLevelTrace: // beats doesn't support trace // but we do allow the "Publish event:" debug logs // when trace mode is enabled return zapcore.DebugLevel, true } // info level for fallback return zapcore.InfoLevel, false } func didChange(previous map[string]*proto.UnitExpectedConfig, latest map[string]*proto.UnitExpectedConfig) bool { if (previous == nil && latest != nil) || (previous != nil && latest == nil) { return true } if len(previous) != len(latest) { return true } for k, v := range latest { p, ok := previous[k] if !ok { return true } if !gproto.Equal(p, v) { return true } } return false }