internal/pkg/agent/application/coordinator/coordinator.go (1,318 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package coordinator import ( "context" "errors" "fmt" "reflect" "strings" "sync/atomic" "time" "github.com/elastic/elastic-agent/internal/pkg/otel/configtranslate" "go.opentelemetry.io/collector/component/componentstatus" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.elastic.co/apm/v2" "go.opentelemetry.io/collector/confmap" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/capabilities" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/diagnostics" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/features" "github.com/elastic/elastic-agent/pkg/limits" "github.com/elastic/elastic-agent/pkg/utils/broadcaster" ) // ErrNotUpgradable error is returned when upgrade cannot be performed. var ErrNotUpgradable = errors.New( "cannot be upgraded; must be installed with install sub-command and " + "running under control of the systems supervisor") // ErrUpgradeInProgress error is returned if two or more upgrades are // attempted at the same time. var ErrUpgradeInProgress = errors.New("upgrade already in progress") // ReExecManager provides an interface to perform re-execution of the entire agent. type ReExecManager interface { ReExec(callback reexec.ShutdownCallbackFn, argOverrides ...string) } // UpgradeManager provides an interface to perform the upgrade action for the agent. type UpgradeManager interface { // Upgradeable returns true if can be upgraded. Upgradeable() bool // Reload reloads the configuration for the upgrade manager. Reload(rawConfig *config.Config) error // Upgrade upgrades running agent. Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) // Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action Ack(ctx context.Context, acker acker.Acker) error // AckAction is used to ack not persisted action. AckAction(ctx context.Context, acker acker.Acker, action fleetapi.Action) error // MarkerWatcher returns a watcher for the upgrade marker. MarkerWatcher() upgrade.MarkerWatcher } // MonitorManager provides an interface to perform the monitoring action for the agent. type MonitorManager interface { // Enabled when configured to collect metrics/logs. Enabled() bool // Reload reloads the configuration for the upgrade manager. Reload(rawConfig *config.Config) error // MonitoringConfig injects monitoring configuration into resolved ast tree. // args: // - the existing config policy // - a list of the expected running components // - a map of component IDs to the PIDs of the running components. MonitoringConfig(map[string]interface{}, []component.Component, map[string]uint64) (map[string]interface{}, error) // ComponentMonitoringConfig returns monitoring configuration for the component application, if applicable. ComponentMonitoringConfig(unitID, binary string) map[string]any } // Runner provides interface to run a manager and receive running errors. type Runner interface { // Run runs the manager. Run(context.Context) error // Errors returns the channel to listen to errors on. // // A manager should send a nil error to clear its previous error when it should no longer report as an error. Errors() <-chan error } // RuntimeManager provides an interface to run and update the runtime. type RuntimeManager interface { Runner // Update updates the current components model. Update(model component.Model) // PerformAction executes an action on a unit. PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) // SubscribeAll provides an interface to watch for changes in all components. SubscribeAll(context.Context) *runtime.SubscriptionAll // PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then // it performs diagnostics for all current units. PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic // PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided, // then it performs the diagnostics for all current units. PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) } // OTelManager provides an interface to run and update the runtime. type OTelManager interface { Runner // Update updates the current configuration for OTel. Update(cfg *confmap.Conf) // Watch returns the chanel to watch for configuration changes. Watch() <-chan *status.AggregateStatus } // ConfigChange provides an interface for receiving a new configuration. // // Ack must be called if the configuration change was accepted and Fail should be called if it fails to be accepted. type ConfigChange interface { // Config returns the configuration for this change. Config() *config.Config // Ack marks the configuration change as accepted. Ack() error // Fail marks the configuration change as failed. Fail(err error) } // ErrorReporter provides an interface for any manager that is handled by the coordinator to report errors. type ErrorReporter interface{} // ConfigManager provides an interface to run and watch for configuration changes. type ConfigManager interface { Runner // ActionErrors returns the error channel for actions. // May return errors for fleet managed agents. // Will always be empty for standalone agents. ActionErrors() <-chan error // Watch returns the chanel to watch for configuration changes. Watch() <-chan ConfigChange } // VarsManager provides an interface to run and watch for variable changes. type VarsManager interface { Runner // DefaultProvider returns the default provider that the variable manager is configured to use. DefaultProvider() string // Observe instructs the variables to observe. Observe(context.Context, []string) ([]*transpiler.Vars, error) // Watch returns the chanel to watch for variable changes. Watch() <-chan []*transpiler.Vars } // ComponentsModifier is a function that takes the computed components model and modifies it before // passing it into the components runtime manager. type ComponentsModifier func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) // managerShutdownTimeout is how long the coordinator will wait during shutdown // to receive termination states from its managers. const managerShutdownTimeout = time.Second * 5 type configReloader interface { Reload(*config.Config) error } // Coordinator manages the entire state of the Elastic Agent. // // All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator. type Coordinator struct { logger *logger.Logger agentInfo info.Agent isManaged bool cfg *configuration.Configuration specs component.RuntimeSpecs fleetAcker acker.Acker reexecMgr ReExecManager upgradeMgr UpgradeManager monitorMgr MonitorManager monitoringServerReloader configReloader runtimeMgr RuntimeManager configMgr ConfigManager varsMgr VarsManager otelMgr OTelManager otelCfg *confmap.Conf // the final config sent to the manager, contains both config from hybrid mode and from components finalOtelCfg *confmap.Conf caps capabilities.Capabilities modifiers []ComponentsModifier // The current state of the Coordinator. This value and its subfields are // safe to read directly from within the main Coordinator goroutine. // Changes are also safe but must set the stateNeedsRefresh flag to ensure // an update is broadcast at the end of the current iteration (so it is // recommended to make changes via helper funtions like setCoordinatorState, // setFleetState, etc). Changes that need to broadcast immediately without // waiting for the end of the iteration can call stateRefresh() directly, // but this should be rare. // // state should never be directly read or written outside the Coordinator // goroutine. Callers who need to access or modify the state should use the // public accessors like State(), SetLogLevel(), etc. state State stateBroadcaster *broadcaster.Broadcaster[State] // If you get a race detector error while accessing this field, it probably // means you're calling private Coordinator methods from outside the // Coordinator goroutine. stateNeedsRefresh bool // overrideState is used during the update process to report the overall // upgrade progress instead of the Coordinator's baseline internal state. overrideState *coordinatorOverrideState // overrideStateChan forwards override states from the publicly accessible // SetOverrideState helper to the Coordinator goroutine. overrideStateChan chan *coordinatorOverrideState // upgradeDetailsChan forwards upgrade details from the publicly accessible // SetUpgradeDetails helper to the Coordinator goroutine. upgradeDetailsChan chan *details.Details // loglevelCh forwards log level changes from the public API (SetLogLevel) // to the run loop in Coordinator's main goroutine. logLevelCh chan logp.Level // managerChans collects the channels used to receive updates from the // various managers. Coordinator reads from all of them during the run loop. // Tests can safely override these before calling Coordinator.Run, or in // between calls to Coordinator.runLoopIteration when testing synchronously. // Tests can send to these channels to simulate manager updates. managerChans managerChans // Top-level errors reported by managers / actions. These will be folded // into the reported state before broadcasting -- State() will report // agentclient.Failed if one of these is set, even if the underlying // coordinator state is agentclient.Healthy. // Errors from the runtime manager report policy update failures and are // stored in runtimeUpdateErr below. configMgrErr error actionsErr error varsMgrErr error // Errors resulting from different possible failure modes when setting a // new policy. Right now there are three different stages where a policy // update can fail: // - in generateAST, converting the policy to an AST // - in process, converting the AST and vars into a full component model // - while applying the final component model in the runtime manager // (reported asynchronously via the runtime manager error channel) // // The plan is to improve our preprocessing so we can always detect // failures immediately https://github.com/elastic/elastic-agent/issues/2887. // For now, we track three distinct errors for those three failure types, // and merge them into a readable error in generateReportableState. configErr error componentGenErr error runtimeUpdateErr error otelErr error // The raw policy before spec lookup or variable substitution ast *transpiler.AST // The current variables vars []*transpiler.Vars // The policy after spec and variable substitution derivedConfig map[string]interface{} // The final component model generated from ast and vars (this is the same // value that is sent to the runtime manager). componentModel []component.Component // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // mx sync.RWMutex // protection protection.Config // a sync channel that can be called by other components to check if the main coordinator // loop in runLoopIteration() is active and listening. // Should only be interacted with via CoordinatorActive() or runLoopIteration() heartbeatChan chan struct{} // if a component (mostly endpoint) has a new PID, we need to update // the monitoring components so they have a PID to monitor // however, if endpoint is in some kind of restart loop, // we could DOS the config system. Instead, // run a ticker that checks to see if we have a new PID. componentPIDTicker *time.Ticker componentPidRequiresUpdate *atomic.Bool } // The channels Coordinator reads to receive updates from the various managers. type managerChans struct { // runtimeManagerUpdate is not read-only because it is owned internally // and written to by watchRuntimeComponents in a helper goroutine after // receiving updates from the raw runtime manager channel. runtimeManagerUpdate chan runtime.ComponentComponentState runtimeManagerError <-chan error configManagerUpdate <-chan ConfigChange configManagerError <-chan error actionsError <-chan error varsManagerUpdate <-chan []*transpiler.Vars varsManagerError <-chan error otelManagerUpdate <-chan *status.AggregateStatus otelManagerError <-chan error upgradeMarkerUpdate <-chan upgrade.UpdateMarker } // diffCheck is a container used by checkAndLogUpdate() type diffCheck struct { inNew bool inLast bool updated bool } // UpdateStats reports the diff of a component update. // This is primarily used as a log message, and exported in case it's needed elsewhere. type UpdateStats struct { Components UpdateComponentChange `json:"components"` Outputs UpdateComponentChange `json:"outputs"` } // UpdateComponentChange reports stats for changes to a particular part of a config. type UpdateComponentChange struct { Added []string `json:"added,omitempty"` Removed []string `json:"removed,omitempty"` Updated []string `json:"updated,omitempty"` Count int `json:"count,omitempty"` } // New creates a new coordinator. func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, otelMgr OTelManager, fleetAcker acker.Acker, modifiers ...ComponentsModifier) *Coordinator { var fleetState cproto.State var fleetMessage string if !isManaged { // default enum value is STARTING which is confusing for standalone fleetState = agentclient.Stopped fleetMessage = "Not enrolled into Fleet" } state := State{ State: agentclient.Starting, Message: "Starting", FleetState: fleetState, FleetMessage: fleetMessage, LogLevel: logLevel, } c := &Coordinator{ logger: logger, cfg: cfg, agentInfo: agentInfo, isManaged: isManaged, specs: specs, reexecMgr: reexecMgr, upgradeMgr: upgradeMgr, monitorMgr: monitorMgr, runtimeMgr: runtimeMgr, configMgr: configMgr, varsMgr: varsMgr, otelMgr: otelMgr, caps: caps, modifiers: modifiers, state: state, // Note: the uses of a buffered input channel in our broadcaster (the // third parameter to broadcaster.New) means that it is possible for // immediately adjacent writes/reads not to match, e.g.: // // stateBroadcaster.Set(newState) // reportedState := stateBroadcaster.Get() // may not match newState // // We accept this intentionally to make sure Coordinator itself blocks // as rarely as possible. Within Coordinator's goroutine, we can always // get the latest synchronized value by reading the state struct directly, // so this only affects external callers, and we accept that some of those // might be behind by a scheduler interrupt or so. // // If this ever changes and we decide we need absolute causal // synchronization in the subscriber API, just set the input buffer to 0. stateBroadcaster: broadcaster.New(state, 64, 32), logLevelCh: make(chan logp.Level), overrideStateChan: make(chan *coordinatorOverrideState), upgradeDetailsChan: make(chan *details.Details), heartbeatChan: make(chan struct{}), componentPIDTicker: time.NewTicker(time.Second * 30), componentPidRequiresUpdate: &atomic.Bool{}, fleetAcker: fleetAcker, } // Setup communication channels for any non-nil components. This pattern // lets us transparently accept nil managers / simulated events during // unit testing. if runtimeMgr != nil { // The runtime manager's update channel is a special case: unlike the // other channels, we create it directly instead of reading it from the // manager. Once Coordinator.runner starts, it calls watchRuntimeComponents // in a helper goroutine, which subscribes directly to the runtime manager. // It then scans and logs any changes before forwarding the update // unmodified to this channel to merge with Coordinator.state. This is just // to keep the work of scanning and logging the component changes off the // main Coordinator goroutine. // Tests want to simulate a component state update can send directly to // this channel, as long as they aren't specifically testing the logging // behavior in watchRuntimeComponents. c.managerChans.runtimeManagerUpdate = make(chan runtime.ComponentComponentState) c.managerChans.runtimeManagerError = runtimeMgr.Errors() } if configMgr != nil { c.managerChans.configManagerUpdate = configMgr.Watch() c.managerChans.configManagerError = configMgr.Errors() c.managerChans.actionsError = configMgr.ActionErrors() } if varsMgr != nil { c.managerChans.varsManagerUpdate = varsMgr.Watch() c.managerChans.varsManagerError = varsMgr.Errors() } if otelMgr != nil { c.managerChans.otelManagerUpdate = otelMgr.Watch() c.managerChans.otelManagerError = otelMgr.Errors() } if upgradeMgr != nil && upgradeMgr.MarkerWatcher() != nil { c.managerChans.upgradeMarkerUpdate = upgradeMgr.MarkerWatcher().Watch() } return c } // State returns the current state for the coordinator. // Called by external goroutines. func (c *Coordinator) State() State { return c.stateBroadcaster.Get() } // IsActive is a blocking method that waits for a channel response // from the coordinator loop. This can be used to as a basic health check, // as we'll timeout and return false if the coordinator run loop doesn't // respond to our channel. func (c *Coordinator) IsActive(timeout time.Duration) bool { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() select { case <-c.heartbeatChan: return true case <-ctx.Done(): return false } } func (c *Coordinator) RegisterMonitoringServer(s configReloader) { c.monitoringServerReloader = s } // StateSubscribe returns a channel that reports changes in Coordinator state. // // bufferLen specifies how many state changes should be queued in addition to // the most recent one. If bufferLen is 0, reads on the channel always return // the current state. Otherwise, multiple changes that occur between reads // will accumulate up to bufferLen. If the most recent state has already been // read, reads on the channel will block until the next state change. // // The returned channel always returns at least one value, and will keep // returning changes until its context is cancelled or the Coordinator shuts // down. After Coordinator shutdown, the channel will continue returning // pending changes until the subscriber reads the final one, when the channel // will be closed. On context cancel, the channel is closed immediately. // // This is safe to call from external goroutines, and subscriber behavior can // never block Coordinator -- see the broadcaster package for detailed // performance guarantees. func (c *Coordinator) StateSubscribe(ctx context.Context, bufferLen int) chan State { return c.stateBroadcaster.Subscribe(ctx, bufferLen) } // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // // Protection returns the current agent protection configuration // // This is needed to be able to access the protection configuration for actions validation // func (c *Coordinator) Protection() protection.Config { // c.mx.RLock() // defer c.mx.RUnlock() // return c.protection // } // // setProtection sets protection configuration // func (c *Coordinator) setProtection(protectionConfig protection.Config) { // c.mx.Lock() // c.protection = protectionConfig // c.mx.Unlock() // } // ReExec performs the re-execution. // Called from external goroutines. func (c *Coordinator) ReExec(callback reexec.ShutdownCallbackFn, argOverrides ...string) { // override the overall state to stopping until the re-execution is complete c.SetOverrideState(agentclient.Stopping, "Re-executing") c.reexecMgr.ReExec(callback, argOverrides...) } // Upgrade runs the upgrade process. // Called from external goroutines. func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) error { // early check outside of upgrader before overriding the state if !c.upgradeMgr.Upgradeable() { return ErrNotUpgradable } // early check capabilities to ensure this upgrade actions is allowed if c.caps != nil { if !c.caps.AllowUpgrade(version, sourceURI) { return ErrNotUpgradable } } // A previous upgrade may be cancelled and needs some time to // run the callback to clear the state var err error for i := 0; i < 5; i++ { s := c.State() if s.State != agentclient.Upgrading { err = nil break } err = ErrUpgradeInProgress time.Sleep(1 * time.Second) } if err != nil { return err } // override the overall state to upgrading until the re-execution is complete c.SetOverrideState(agentclient.Upgrading, fmt.Sprintf("Upgrading to version %s", version)) // initialize upgrade details actionID := "" if action != nil { actionID = action.ActionID } det := details.NewDetails(version, details.StateRequested, actionID) det.RegisterObserver(c.SetUpgradeDetails) cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...) if err != nil { c.ClearOverrideState() if errors.Is(err, upgrade.ErrUpgradeSameVersion) { // Set upgrade state to completed so update no longer shows in-progress. det.SetState(details.StateCompleted) return c.upgradeMgr.AckAction(ctx, c.fleetAcker, action) } det.Fail(err) return err } if cb != nil { det.SetState(details.StateRestarting) c.ReExec(cb) } return nil } func (c *Coordinator) logUpgradeDetails(details *details.Details) { c.logger.Infow("updated upgrade details", "upgrade_details", details) } // AckUpgrade is the method used on startup to ack a previously successful upgrade action. // Called from external goroutines. func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error { return c.upgradeMgr.Ack(ctx, acker) } // PerformAction executes an action on a unit. // Called from external goroutines. func (c *Coordinator) PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) { return c.runtimeMgr.PerformAction(ctx, comp, unit, name, params) } // PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then // it performs diagnostics for all current units. // Called from external goroutines. func (c *Coordinator) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic { return c.runtimeMgr.PerformDiagnostics(ctx, req...) } // PerformComponentDiagnostics executes the diagnostic action for the provided components. func (c *Coordinator) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) { return c.runtimeMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...) } // SetLogLevel changes the entire log level for the running Elastic Agent. // Called from external goroutines. func (c *Coordinator) SetLogLevel(ctx context.Context, lvl *logp.Level) error { if lvl == nil { return fmt.Errorf("logp.Level passed to Coordinator.SetLogLevel() must be not nil") } select { case <-ctx.Done(): return ctx.Err() case c.logLevelCh <- *lvl: // set global once the level change has been taken by the channel logger.SetLevel(*lvl) return nil } } // watchRuntimeComponents listens for state updates from the runtime // manager, logs them, and forwards them to CoordinatorState. // Runs in its own goroutine created in Coordinator.Run. func (c *Coordinator) watchRuntimeComponents(ctx context.Context) { state := make(map[string]runtime.ComponentState) var subChan <-chan runtime.ComponentComponentState // A real Coordinator will always have a runtime manager, but unit tests // may not initialize all managers -- in that case we leave subChan nil, // and just idle until Coordinator shuts down. if c.runtimeMgr != nil { subChan = c.runtimeMgr.SubscribeAll(ctx).Ch() } for { select { case <-ctx.Done(): return case s := <-subChan: oldState, ok := state[s.Component.ID] if !ok { componentLog := coordinatorComponentLog{ ID: s.Component.ID, State: s.State.State.String(), } logBasedOnState(c.logger, s.State.State, fmt.Sprintf("Spawned new component %s: %s", s.Component.ID, s.State.Message), "component", componentLog) for ui, us := range s.State.Units { unitLog := coordinatorUnitLog{ ID: ui.UnitID, Type: ui.UnitType.String(), State: us.State.String(), } logBasedOnState(c.logger, us.State, fmt.Sprintf("Spawned new unit %s: %s", ui.UnitID, us.Message), "component", componentLog, "unit", unitLog) } } else { componentLog := coordinatorComponentLog{ ID: s.Component.ID, State: s.State.State.String(), } if oldState.State != s.State.State { cl := coordinatorComponentLog{ ID: s.Component.ID, State: s.State.State.String(), OldState: oldState.State.String(), } logBasedOnState(c.logger, s.State.State, fmt.Sprintf("Component state changed %s (%s->%s): %s", s.Component.ID, oldState.State.String(), s.State.State.String(), s.State.Message), "component", cl) } for ui, us := range s.State.Units { oldUS, ok := oldState.Units[ui] if !ok { unitLog := coordinatorUnitLog{ ID: ui.UnitID, Type: ui.UnitType.String(), State: us.State.String(), } logBasedOnState(c.logger, us.State, fmt.Sprintf("Spawned new unit %s: %s", ui.UnitID, us.Message), "component", componentLog, "unit", unitLog) } else if oldUS.State != us.State { unitLog := coordinatorUnitLog{ ID: ui.UnitID, Type: ui.UnitType.String(), State: us.State.String(), OldState: oldUS.State.String(), } logBasedOnState(c.logger, us.State, fmt.Sprintf("Unit state changed %s (%s->%s): %s", ui.UnitID, oldUS.State.String(), us.State.String(), us.Message), "component", componentLog, "unit", unitLog) } } } state[s.Component.ID] = s.State if s.State.State == client.UnitStateStopped { delete(state, s.Component.ID) } // Forward the final changes back to Coordinator, unless our context // has ended. select { case c.managerChans.runtimeManagerUpdate <- s: case <-ctx.Done(): return } } } } // Run runs the Coordinator. Must be called on the Coordinator's main goroutine. // // The RuntimeManager, ConfigManager and VarsManager that is passed into NewCoordinator are also ran and lifecycle controlled by the Run. // // If any of the three managers fail, the Coordinator will shut down and // Run will return an error. func (c *Coordinator) Run(ctx context.Context) error { // log all changes in the state of the runtime and update the coordinator state watchCtx, watchCanceller := context.WithCancel(ctx) defer watchCanceller() go c.watchRuntimeComponents(watchCtx) // Close the state broadcaster on finish, but leave it running in the // background until all subscribers have read the final values or their // context ends, so test listeners and such can collect Coordinator's // shutdown state. defer close(c.stateBroadcaster.InputChan) if c.varsMgr != nil { c.setCoordinatorState(agentclient.Starting, "Waiting for initial configuration and composable variables") } else { // vars not initialized, go directly to running c.setCoordinatorState(agentclient.Healthy, "Running") } // The usual state refresh happens in the main run loop in Coordinator.runner, // so before/after the runner call we need to trigger state change broadcasts // manually with refreshState. c.refreshState() err := c.runner(ctx) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { c.setCoordinatorState(agentclient.Stopped, "Requested to be stopped") c.setFleetState(agentclient.Stopped, "Requested to be stopped") } else { var message string if err != nil { message = fmt.Sprintf("Fatal coordinator error: %v", err.Error()) } else { // runner should always return a non-nil error, but if it doesn't, // report it. message = "Coordinator terminated with unknown error (runner returned nil)" } c.setCoordinatorState(agentclient.Failed, message) c.setFleetState(agentclient.Stopped, message) } // Broadcast the final state in case anyone is still listening c.refreshState() return err } // DiagnosticHooks returns diagnostic hooks that can be connected to the control server to provide diagnostic // information about the state of the Elastic Agent. // Called by external goroutines. func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { hooks := diagnostics.Hooks{ { Name: "agent-info", Filename: "agent-info.yaml", Description: "current state of the agent information of the running Elastic Agent", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { meta, err := c.agentInfo.ECSMetadata(c.logger) if err != nil { c.logger.Errorw("Error getting ECS metadata", "error.message", err) } output := struct { Headers map[string]string `yaml:"headers"` LogLevel string `yaml:"log_level"` RawLogLevel string `yaml:"log_level_raw"` Metadata *info.ECSMeta `yaml:"metadata"` }{ Headers: c.agentInfo.Headers(), LogLevel: c.agentInfo.LogLevel(), RawLogLevel: c.agentInfo.RawLogLevel(), Metadata: meta, } o, err := yaml.Marshal(output) if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } return o }, }, { Name: "local-config", Filename: "local-config.yaml", Description: "current local configuration of the running Elastic Agent", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { if c.cfg == nil { return []byte("error: failed no local configuration") } o, err := yaml.Marshal(c.cfg) if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } return o }, }, { Name: "pre-config", Filename: "pre-config.yaml", Description: "current pre-configuration of the running Elastic Agent before variable substitution", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { if c.ast == nil { return []byte("error: failed no configuration by the coordinator") } cfg, err := c.ast.Map() if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } o, err := yaml.Marshal(cfg) if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } return o }, }, { Name: "variables", Filename: "variables.yaml", Description: "current variable contexts of the running Elastic Agent", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { if c.vars == nil { return []byte("error: failed no variables by the coordinator") } vars := make([]map[string]interface{}, 0, len(c.vars)) for _, v := range c.vars { m, err := v.Map() if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } vars = append(vars, m) } o, err := yaml.Marshal(struct { Variables []map[string]interface{} `yaml:"variables"` }{ Variables: vars, }) if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } return o }, }, { Name: "computed-config", Filename: "computed-config.yaml", Description: "current computed configuration of the running Elastic Agent after variable substitution", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { cfg := c.derivedConfig o, err := yaml.Marshal(cfg) if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } return o }, }, { Name: "components-expected", Filename: "components-expected.yaml", Description: "current expected components model of the running Elastic Agent", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { comps := c.componentModel o, err := yaml.Marshal(struct { Components []component.Component `yaml:"components"` }{ Components: comps, }) if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } return o }, }, { Name: "components-actual", Filename: "components-actual.yaml", Description: "actual components model of the running Elastic Agent", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { components := c.State().Components componentConfigs := make([]component.Component, len(components)) for i := 0; i < len(components); i++ { componentConfigs[i] = components[i].Component } o, err := yaml.Marshal(struct { Components []component.Component `yaml:"components"` }{ Components: componentConfigs, }) if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } return o }, }, { Name: "state", Filename: "state.yaml", Description: "current state of running components by the Elastic Agent", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { type StateComponentOutput struct { ID string `yaml:"id"` State runtime.ComponentState `yaml:"state"` } type StateCollectorStatus struct { Status componentstatus.Status `yaml:"status"` Err string `yaml:"error,omitempty"` Timestamp string `yaml:"timestamp"` Components map[string]*StateCollectorStatus `yaml:"components,omitempty"` } type StateHookOutput struct { State agentclient.State `yaml:"state"` Message string `yaml:"message"` FleetState agentclient.State `yaml:"fleet_state"` FleetMessage string `yaml:"fleet_message"` LogLevel logp.Level `yaml:"log_level"` Components []StateComponentOutput `yaml:"components"` Collector *StateCollectorStatus `yaml:"collector,omitempty"` UpgradeDetails *details.Details `yaml:"upgrade_details,omitempty"` } var toCollectorStatus func(status *status.AggregateStatus) *StateCollectorStatus toCollectorStatus = func(status *status.AggregateStatus) *StateCollectorStatus { s := &StateCollectorStatus{ Status: status.Status(), Timestamp: status.Timestamp().Format(time.RFC3339Nano), } statusErr := status.Err() if statusErr != nil { s.Err = statusErr.Error() } if len(status.ComponentStatusMap) > 0 { s.Components = make(map[string]*StateCollectorStatus, len(status.ComponentStatusMap)) for k, v := range status.ComponentStatusMap { s.Components[k] = toCollectorStatus(v) } } return s } s := c.State() n := len(s.Components) compStates := make([]StateComponentOutput, n) for i := 0; i < n; i++ { compStates[i] = StateComponentOutput{ ID: s.Components[i].Component.ID, State: s.Components[i].State, } } var collectorStatus *StateCollectorStatus if s.Collector != nil { collectorStatus = toCollectorStatus(s.Collector) } output := StateHookOutput{ State: s.State, Message: s.Message, FleetState: s.FleetState, FleetMessage: s.FleetMessage, LogLevel: s.LogLevel, Components: compStates, Collector: collectorStatus, UpgradeDetails: s.UpgradeDetails, } o, err := yaml.Marshal(output) if err != nil { return []byte(fmt.Sprintf("error: %q", err)) } return o }, }, { Name: "otel", Filename: "otel.yaml", Description: "current otel configuration used by the Elastic Agent", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { if c.otelCfg == nil { return []byte("no active OTel configuration") } o, err := yaml.Marshal(c.otelCfg.ToStringMap()) if err != nil { return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err)) } return o }, }, diagnostics.Hook{ Name: "otel-final", Filename: "otel-final.yaml", Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { if c.finalOtelCfg == nil { return []byte("no active OTel configuration") } o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap()) if err != nil { return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err)) } return o }, }, } return hooks } // runner performs the actual work of running all the managers. // Called on the main Coordinator goroutine, from Coordinator.Run. // // if one of the managers terminates the others are also stopped and then the whole runner returns func (c *Coordinator) runner(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() defer c.componentPIDTicker.Stop() // We run nil checks before starting the various managers so that unit tests // only have to initialize / mock the specific components they're testing. // If a manager is nil, we prebuffer its return channel with nil also so // handleCoordinatorDone doesn't block waiting for its result on shutdown. // In a live agent, the manager fields are never nil. runtimeErrCh := make(chan error, 1) if c.runtimeMgr != nil { go func() { err := c.runtimeMgr.Run(ctx) cancel() runtimeErrCh <- err }() } else { runtimeErrCh <- nil } configErrCh := make(chan error, 1) if c.configMgr != nil { go func() { err := c.configMgr.Run(ctx) cancel() configErrCh <- err }() } else { configErrCh <- nil } varsErrCh := make(chan error, 1) if c.varsMgr != nil { go func() { err := c.varsMgr.Run(ctx) cancel() varsErrCh <- err }() } else { varsErrCh <- nil } otelErrCh := make(chan error, 1) if c.otelMgr != nil { go func() { err := c.otelMgr.Run(ctx) cancel() otelErrCh <- err }() } else { otelErrCh <- nil } upgradeMarkerWatcherErrCh := make(chan error, 1) if c.upgradeMgr != nil && c.upgradeMgr.MarkerWatcher() != nil { err := c.upgradeMgr.MarkerWatcher().Run(ctx) if err != nil { upgradeMarkerWatcherErrCh <- err } else { upgradeMarkerWatcherErrCh <- nil } } else { upgradeMarkerWatcherErrCh <- nil } // Keep looping until the context ends. for ctx.Err() == nil { c.runLoopIteration(ctx) } // If we got fatal errors from any of the managers, return them. // Otherwise, just return the context's closing error. err := collectManagerErrors(managerShutdownTimeout, varsErrCh, runtimeErrCh, configErrCh, otelErrCh, upgradeMarkerWatcherErrCh) if err != nil { c.logger.Debugf("Manager errors on Coordinator shutdown: %v", err.Error()) return err } return ctx.Err() } // runLoopIteration runs one iteration of the Coorinator's internal run // loop in a standalone helper function to enable testing. func (c *Coordinator) runLoopIteration(ctx context.Context) { select { case <-ctx.Done(): return case runtimeErr := <-c.managerChans.runtimeManagerError: // runtime manager errors report the result of a policy update. // Coordinator transitions from starting to healthy when a policy update // is successful. c.setRuntimeUpdateError(runtimeErr) if runtimeErr == nil { c.setCoordinatorState(agentclient.Healthy, "Running") } case configErr := <-c.managerChans.configManagerError: if c.isManaged { var wErr *WarningError if configErr == nil { c.setFleetState(agentclient.Healthy, "Connected") } else if errors.As(configErr, &wErr) { // we received a warning from Fleet, set state to degraded and the warning as state string c.setFleetState(agentclient.Degraded, wErr.Error()) } else { c.setFleetState(agentclient.Failed, configErr.Error()) } } else { // not managed gets sets as an overall error for the agent c.setConfigManagerError(configErr) } case actionsErr := <-c.managerChans.actionsError: c.setConfigManagerActionsError(actionsErr) case varsErr := <-c.managerChans.varsManagerError: c.setVarsManagerError(varsErr) case otelErr := <-c.managerChans.otelManagerError: c.setOTelError(otelErr) case overrideState := <-c.overrideStateChan: c.setOverrideState(overrideState) case upgradeDetails := <-c.upgradeDetailsChan: c.setUpgradeDetails(upgradeDetails) case c.heartbeatChan <- struct{}{}: case <-c.componentPIDTicker.C: // if we hit the ticker and we've got a new PID, // reload the component model if c.componentPidRequiresUpdate.Swap(false) { err := c.refreshComponentModel(ctx) if err != nil { err = fmt.Errorf("error refreshing component model for PID update: %w", err) c.setConfigManagerError(err) c.logger.Errorf("%s", err) } } case componentState := <-c.managerChans.runtimeManagerUpdate: // New component change reported by the runtime manager via // Coordinator.watchRuntimeComponents(), merge it with the // Coordinator state. c.applyComponentState(componentState) case change := <-c.managerChans.configManagerUpdate: if err := c.processConfig(ctx, change.Config()); err != nil { c.logger.Errorf("applying new policy: %s", err.Error()) change.Fail(err) } else { if err := change.Ack(); err != nil { err = fmt.Errorf("failed to ack configuration change: %w", err) // Workaround: setConfigManagerError is usually used by the config // manager to report failed ACKs / etc when communicating with Fleet. // We need to report a failed ACK here, but the policy change has // already been successfully applied so we don't want to report it as // a general Coordinator or policy failure. // This arises uniquely here because this is the only case where an // action is responsible for reporting the failure of its own ACK // call. The "correct" fix is to make this Ack() call unfailable // and handle ACK retries and reporting in the config manager like // with other action types -- this error would then end up invoking // setConfigManagerError "organically" via the config manager's // reporting channel. In the meantime, we do it manually. c.setConfigManagerError(err) c.logger.Errorf("%s", err.Error()) } } case vars := <-c.managerChans.varsManagerUpdate: if ctx.Err() == nil { c.processVars(ctx, vars) } case collector := <-c.managerChans.otelManagerUpdate: c.state.Collector = collector c.stateNeedsRefresh = true case ll := <-c.logLevelCh: if ctx.Err() == nil { c.processLogLevel(ctx, ll) } case upgradeMarker := <-c.managerChans.upgradeMarkerUpdate: if ctx.Err() == nil { c.setUpgradeDetails(upgradeMarker.Details) } } // At the end of each iteration, if we made any changes to the state, // collect them and send them to stateBroadcaster. if c.stateNeedsRefresh { c.refreshState() } } // Always called on the main Coordinator goroutine. func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (err error) { if c.otelMgr != nil { c.otelCfg = cfg.OTel } return c.processConfigAgent(ctx, cfg) } // Always called on the main Coordinator goroutine. func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config) (err error) { span, ctx := apm.StartSpan(ctx, "config", "app.internal") defer func() { apm.CaptureError(ctx, err).Send() span.End() }() err = c.generateAST(cfg) c.setConfigError(err) if err != nil { return err } // pass the observed vars from the AST to the varsMgr err = c.observeASTVars(ctx) if err != nil { // only possible error here is the context being cancelled return err } // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // c.setProtection(protectionConfig) if c.vars != nil { return c.refreshComponentModel(ctx) } return nil } // Generate the AST for a new incoming configuration and, if successful, // assign it to the Coordinator's ast field. func (c *Coordinator) generateAST(cfg *config.Config) (err error) { defer func() { // Update configErr, which stores the results of the most recent policy // update and is merged into the Coordinator state in // generateReportableState. c.setConfigError(err) }() if err = info.InjectAgentConfig(cfg); err != nil { return err } // perform and verify ast translation m, err := cfg.ToMapStr() if err != nil { return fmt.Errorf("could not create the map from the configuration: %w", err) } // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // protectionConfig, err := protection.GetAgentProtectionConfig(m) // if err != nil && !errors.Is(err, protection.ErrNotFound) { // return fmt.Errorf("could not read the agent protection configuration: %w", err) // } rawAst, err := transpiler.NewAST(m) if err != nil { return fmt.Errorf("could not create the AST from the configuration: %w", err) } // applying updated agent process limits if err := limits.Apply(cfg); err != nil { return fmt.Errorf("could not update limits config: %w", err) } if err := features.Apply(cfg); err != nil { return fmt.Errorf("could not update feature flags config: %w", err) } // Check the upgrade and monitoring managers before updating them. Real // Coordinators always have them, but not all tests do, and in that case // we should skip the Reload call rather than segfault. if c.upgradeMgr != nil { if err := c.upgradeMgr.Reload(cfg); err != nil { return fmt.Errorf("failed to reload upgrade manager configuration: %w", err) } } if c.monitorMgr != nil { if err := c.monitorMgr.Reload(cfg); err != nil { return fmt.Errorf("failed to reload monitor manager configuration: %w", err) } } if c.monitoringServerReloader != nil { if err := c.monitoringServerReloader.Reload(cfg); err != nil { return fmt.Errorf("failed to reload monitor manager configuration: %w", err) } } c.ast = rawAst return nil } // observeASTVars identifies the variables that are referenced in the computed AST and passed to // the varsMgr so it knows what providers are being referenced. If a providers is not being // referenced then the provider does not need to be running. func (c *Coordinator) observeASTVars(ctx context.Context) error { if c.varsMgr == nil { // No varsMgr (only happens in testing) return nil } var vars []string if c.ast != nil { inputs, ok := transpiler.Lookup(c.ast, "inputs") if ok { vars = inputs.Vars(vars, c.varsMgr.DefaultProvider()) } outputs, ok := transpiler.Lookup(c.ast, "outputs") if ok { vars = outputs.Vars(vars, c.varsMgr.DefaultProvider()) } } updated, err := c.varsMgr.Observe(ctx, vars) if err != nil { // context cancel return err } if updated != nil { // provided an updated set of vars (observed changed) c.vars = updated } return nil } // processVars updates the transpiler vars in the Coordinator. // Called on the main Coordinator goroutine. func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) { c.vars = vars err := c.refreshComponentModel(ctx) if err != nil { c.logger.Errorf("updating Coordinator variables: %s", err.Error()) } } // Called on the main Coordinator goroutine. func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) { c.setLogLevel(ll) err := c.refreshComponentModel(ctx) if err != nil { c.logger.Errorf("updating log level: %s", err.Error()) } } // Regenerate the component model based on the current vars and AST, then // forward the result to the runtime manager. // Always called on the main Coordinator goroutine. func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) { if c.ast == nil || c.vars == nil { // Nothing to process yet return nil } span, ctx := apm.StartSpan(ctx, "refreshComponentModel", "app.internal") defer func() { apm.CaptureError(ctx, err).Send() span.End() }() // regenerate the component model err = c.generateComponentModel() if err != nil { return fmt.Errorf("generating component model: %w", err) } signed, err := component.SignedFromPolicy(c.derivedConfig) if err != nil { if !errors.Is(err, component.ErrNotFound) { c.logger.Errorf("Failed to parse \"signed\" properties: %v", err) return err } // Some "signed" properties are not found, continue. c.logger.Debugf("Continue with missing \"signed\" properties: %v", err) } model := &component.Model{ Components: c.componentModel, Signed: signed, } c.logger.Info("Updating running component model") c.logger.With("components", model.Components).Debug("Updating running component model") return c.updateManagersWithConfig(model) } // updateManagersWithConfig updates runtime managers with the component model and config. // Components may be sent to different runtimes depending on various criteria. func (c *Coordinator) updateManagersWithConfig(model *component.Model) error { runtimeModel, otelModel := c.splitModelBetweenManagers(model) c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model") c.runtimeMgr.Update(*runtimeModel) return c.updateOtelManagerConfig(otelModel) } // updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration // from the component model passed in and from the hybrid-mode otel config set on the Coordinator. func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { finalOtelCfg := confmap.New() var componentOtelCfg *confmap.Conf if len(model.Components) > 0 { var err error c.logger.With("components", model.Components).Debug("Updating otel manager model") componentOtelCfg, err = configtranslate.GetOtelConfig(model, c.agentInfo, c.monitorMgr.ComponentMonitoringConfig) if err != nil { c.logger.Errorf("failed to generate otel config: %v", err) } componentIDs := make([]string, 0, len(model.Components)) for _, comp := range model.Components { componentIDs = append(componentIDs, comp.ID) } c.logger.With("component_ids", componentIDs).Warn("The Otel runtime manager is HIGHLY EXPERIMENTAL and only intended for testing. Use at your own risk.") } if componentOtelCfg != nil { err := finalOtelCfg.Merge(componentOtelCfg) if err != nil { c.logger.Error("failed to merge otel config: %v", err) } } if c.otelCfg != nil { err := finalOtelCfg.Merge(c.otelCfg) if err != nil { c.logger.Error("failed to merge otel config: %v", err) } } if len(finalOtelCfg.AllKeys()) == 0 { // if the config is empty, we want to send nil to the manager, so it knows to stop the collector finalOtelCfg = nil } c.otelMgr.Update(finalOtelCfg) c.finalOtelCfg = finalOtelCfg return nil } // splitModelBetweenManager splits the model components between the runtime manager and the otel manager. func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) { var otelComponents, runtimeComponents []component.Component for _, comp := range model.Components { switch comp.RuntimeManager { case component.OtelRuntimeManager: otelComponents = append(otelComponents, comp) case component.ProcessRuntimeManager: runtimeComponents = append(runtimeComponents, comp) default: // this should be impossible if we parse the configuration correctly c.logger.Errorf("unknown runtime manager for component: %s, ignoring", comp.RuntimeManager) } } otelModel = &component.Model{ Components: otelComponents, // the signed portion of the policy is only used by Defend, so otel doesn't need it for anything } runtimeModel = &component.Model{ Components: runtimeComponents, Signed: model.Signed, } return } // generateComponentModel regenerates the configuration tree and // components from the current AST and vars and returns the result. // Called from both the main Coordinator goroutine and from external // goroutines via diagnostics hooks. func (c *Coordinator) generateComponentModel() (err error) { defer func() { // Update componentGenErr with the results. c.setComponentGenError(err) }() ast := c.ast.ShallowClone() // perform variable substitution for inputs inputs, ok := transpiler.Lookup(ast, "inputs") if ok { renderedInputs, err := transpiler.RenderInputs(inputs, c.vars) if err != nil { return fmt.Errorf("rendering inputs failed: %w", err) } err = transpiler.Insert(ast, renderedInputs, "inputs") if err != nil { return fmt.Errorf("inserting rendered inputs failed: %w", err) } } // perform variable substitution for outputs // outputs only support the context variables (dynamic provides are not provide to the outputs) outputs, ok := transpiler.Lookup(ast, "outputs") if ok { renderedOutputs, err := transpiler.RenderOutputs(outputs, c.vars) if err != nil { return fmt.Errorf("rendering outputs failed: %w", err) } err = transpiler.Insert(ast, renderedOutputs, "outputs") if err != nil { return fmt.Errorf("inserting rendered outputs failed: %w", err) } } cfg, err := ast.Map() if err != nil { return fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err) } var configInjector component.GenerateMonitoringCfgFn if c.monitorMgr != nil && c.monitorMgr.Enabled() { configInjector = c.monitorMgr.MonitoringConfig } existingCompState := make(map[string]uint64, len(c.state.Components)) for _, comp := range c.state.Components { existingCompState[comp.Component.ID] = comp.State.Pid } comps, err := c.specs.ToComponents( cfg, configInjector, c.state.LogLevel, c.agentInfo, existingCompState, ) if err != nil { return fmt.Errorf("failed to render components: %w", err) } // Filter any disallowed inputs/outputs from the components comps = c.filterByCapabilities(comps) for _, modifier := range c.modifiers { comps, err = modifier(comps, cfg) if err != nil { return fmt.Errorf("failed to modify components: %w", err) } } // If we made it this far, update our internal derived values and // return with no error c.derivedConfig = cfg lastComponentModel := c.componentModel c.componentModel = comps c.checkAndLogUpdate(lastComponentModel) return nil } // compares the last component model with an updated model, // logging any differences. func (c *Coordinator) checkAndLogUpdate(lastComponentModel []component.Component) { if lastComponentModel == nil { c.logger.Debugf("Received initial component update; total of %d components", len(c.componentModel)) return } type compCheck struct { inCurrent bool inLast bool diffUnits map[string]diffCheck } lastCompMap := convertComponentListToMap(lastComponentModel) currentCompMap := convertComponentListToMap(c.componentModel) compDiffMap := map[string]compCheck{} outDiffMap := map[string]diffCheck{} // kinda-callbacks for dealing with output logic foundInLast := func(outName string) { if outDiff, ok := outDiffMap[outName]; ok { outDiff.inLast = true outDiffMap[outName] = outDiff } else { outDiffMap[outName] = diffCheck{inLast: true} } } foundInUpdated := func(outName string) { if outDiff, ok := outDiffMap[outName]; ok { outDiff.inNew = true outDiffMap[outName] = outDiff } else { outDiffMap[outName] = diffCheck{inNew: true} } } // diff the maps // find added & updated components for id, comp := range currentCompMap { // check output foundInUpdated(comp.OutputType) // compare with last state diff := compCheck{inCurrent: true} if lastComp, ok := lastCompMap[id]; ok { diff.inLast = true // if the unit is in both the past and previous, check for updated units diff.diffUnits = diffUnitList(lastComp.Units, comp.Units) foundInLast(lastComp.OutputType) // a bit of optimization: after we're done, we'll need to iterate over lastCompMap to fetch removed units, // so delete items we don't need to iterate over delete(lastCompMap, id) } compDiffMap[id] = diff } // find removed components // if something is still in this map, that means it's only in this map for id, comp := range lastCompMap { compDiffMap[id] = compCheck{inLast: true} foundInLast(comp.OutputType) } addedList := []string{} removedList := []string{} formattedUpdated := []string{} // reduced to list of added/removed outputs addedOutputs := []string{} removedOutputs := []string{} // take our diff map and format everything for output for id, diff := range compDiffMap { if diff.inLast && !diff.inCurrent { removedList = append(removedList, id) } if !diff.inLast && diff.inCurrent { addedList = append(addedList, id) } // format a user-readable list of diffs if diff.inLast && diff.inCurrent { units := []string{} for unitId, state := range diff.diffUnits { action := "" if state.inLast && !state.inNew { action = "removed" } if state.inNew && !state.inLast { action = "added" } if state.updated { action = "updated" } if action != "" { units = append(units, fmt.Sprintf("(%s: %s)", unitId, action)) } } if len(units) > 0 { formatted := fmt.Sprintf("%s: %v", id, units) formattedUpdated = append(formattedUpdated, formatted) } } } // format outputs for output, comp := range outDiffMap { if comp.inLast && !comp.inNew { removedOutputs = append(removedOutputs, output) } if !comp.inLast && comp.inNew { addedOutputs = append(addedOutputs, output) } } logStruct := UpdateStats{ Components: UpdateComponentChange{ Added: addedList, Removed: removedList, Count: len(c.componentModel), Updated: formattedUpdated, }, Outputs: UpdateComponentChange{ Added: addedOutputs, Removed: removedOutputs, }, } c.logger.Infow("component model updated", "changes", logStruct) } // Filter any inputs and outputs in the generated component model // based on whether they're excluded by the capabilities config func (c *Coordinator) filterByCapabilities(comps []component.Component) []component.Component { if c.caps == nil { // No active filters, return unchanged return comps } result := []component.Component{} for _, component := range comps { if component.InputSpec != nil && !c.caps.AllowInput(component.InputType) { c.logger.Infof("Component '%v' with input type '%v' filtered by capabilities.yml", component.ID, component.InputType) continue } if !c.caps.AllowOutput(component.OutputType) { c.logger.Infof("Component '%v' with output type '%v' filtered by capabilities.yml", component.ID, component.OutputType) continue } result = append(result, component) } return result } // helpers for checkAndLogUpdate func convertUnitListToMap(unitList []component.Unit) map[string]component.Unit { unitMap := map[string]component.Unit{} for _, c := range unitList { unitMap[c.ID] = c } return unitMap } func convertComponentListToMap(compList []component.Component) map[string]component.Component { compMap := map[string]component.Component{} for _, c := range compList { compMap[c.ID] = c } return compMap } func diffUnitList(old, new []component.Unit) map[string]diffCheck { oldMap := convertUnitListToMap(old) newMap := convertUnitListToMap(new) diffMap := map[string]diffCheck{} // find new and updated units for id, newUnits := range newMap { diff := diffCheck{inNew: true} if oldUnit, ok := oldMap[id]; ok { diff.inLast = true if newUnits.Config != nil && oldUnit.Config != nil && newUnits.Config.GetSource() != nil && oldUnit.Config.GetSource() != nil { diff.updated = !reflect.DeepEqual(newUnits.Config.GetSource().AsMap(), oldUnit.Config.GetSource().AsMap()) } delete(oldMap, id) } diffMap[id] = diff } // find removed units for id := range oldMap { diffMap[id] = diffCheck{inLast: true} } return diffMap } // collectManagerErrors listens on the shutdown channels for the // runtime, config, and vars managers as well as the upgrade marker // watcher and waits for up to the specified timeout for them to // report their final status. // It returns any resulting errors as a multierror, or nil if no errors // were reported. // Called on the main Coordinator goroutine. func collectManagerErrors(timeout time.Duration, varsErrCh, runtimeErrCh, configErrCh, otelErrCh, upgradeMarkerWatcherErrCh chan error) error { var runtimeErr, configErr, varsErr, otelErr, upgradeMarkerWatcherErr error var returnedRuntime, returnedConfig, returnedVars, returnedOtel, returnedUpgradeMarkerWatcher bool // in case other components are locked up, let us time out timeoutWait := time.NewTimer(timeout) defer timeoutWait.Stop() /* Wait for all managers to gently shut down. All managers send an error status on their termination channel after their Run method returns. Logic: If all three manager channels return a value, or close, we're done. If any errors are non-nil (and not just context.Canceled), collect and return them with multierror. Otherwise, return nil. */ // combinedErr will store any reported errors as well as timeout errors // for unresponsive managers. var errs []error waitLoop: for !returnedRuntime || !returnedConfig || !returnedVars || !returnedOtel || !returnedUpgradeMarkerWatcher { select { case runtimeErr = <-runtimeErrCh: returnedRuntime = true case configErr = <-configErrCh: returnedConfig = true case varsErr = <-varsErrCh: returnedVars = true case otelErr = <-otelErrCh: returnedOtel = true case upgradeMarkerWatcherErr = <-upgradeMarkerWatcherErrCh: returnedUpgradeMarkerWatcher = true case <-timeoutWait.C: var timeouts []string if !returnedRuntime { timeouts = []string{"no response from runtime manager"} } if !returnedConfig { timeouts = append(timeouts, "no response from config manager") } if !returnedVars { timeouts = append(timeouts, "no response from vars manager") } if !returnedOtel { timeouts = append(timeouts, "no response from otel manager") } if !returnedUpgradeMarkerWatcher { timeouts = append(timeouts, "no response from upgrade marker watcher") } timeoutStr := strings.Join(timeouts, ", ") errs = append(errs, fmt.Errorf("timeout while waiting for managers to shut down: %v", timeoutStr)) break waitLoop } } if runtimeErr != nil && !errors.Is(runtimeErr, context.Canceled) { errs = append(errs, fmt.Errorf("runtime manager: %w", runtimeErr)) } if configErr != nil && !errors.Is(configErr, context.Canceled) { errs = append(errs, fmt.Errorf("config manager: %w", configErr)) } if varsErr != nil && !errors.Is(varsErr, context.Canceled) { errs = append(errs, fmt.Errorf("vars manager: %w", varsErr)) } if otelErr != nil && !errors.Is(otelErr, context.Canceled) { errs = append(errs, fmt.Errorf("otel manager: %w", otelErr)) } if upgradeMarkerWatcherErr != nil && !errors.Is(upgradeMarkerWatcherErr, context.Canceled) { errs = append(errs, fmt.Errorf("upgrade marker watcher: %w", upgradeMarkerWatcherErr)) } return errors.Join(errs...) } type coordinatorComponentLog struct { ID string `json:"id"` State string `json:"state"` OldState string `json:"old_state,omitempty"` } type coordinatorUnitLog struct { ID string `json:"id"` Type string `json:"type"` State string `json:"state"` OldState string `json:"old_state,omitempty"` } func logBasedOnState(l *logger.Logger, state client.UnitState, msg string, args ...interface{}) { // Skipping one more stack frame in order to have correct file line set in the logger output while using this wrapper function l = logger.AddCallerSkip(l, 1) switch state { case client.UnitStateStarting: l.With(args...).Info(msg) case client.UnitStateConfiguring: l.With(args...).Info(msg) case client.UnitStateDegraded: l.With(args...).Warn(msg) case client.UnitStateHealthy: l.With(args...).Info(msg) case client.UnitStateFailed: l.With(args...).Error(msg) case client.UnitStateStopping: l.With(args...).Info(msg) case client.UnitStateStopped: l.With(args...).Info(msg) default: l.With(args...).Info(msg) } }