in internal/pkg/agent/application/coordinator/coordinator.go [1140:1254]
func (c *Coordinator) runLoopIteration(ctx context.Context) {
select {
case <-ctx.Done():
return
case runtimeErr := <-c.managerChans.runtimeManagerError:
// runtime manager errors report the result of a policy update.
// Coordinator transitions from starting to healthy when a policy update
// is successful.
c.setRuntimeUpdateError(runtimeErr)
if runtimeErr == nil {
c.setCoordinatorState(agentclient.Healthy, "Running")
}
case configErr := <-c.managerChans.configManagerError:
if c.isManaged {
var wErr *WarningError
if configErr == nil {
c.setFleetState(agentclient.Healthy, "Connected")
} else if errors.As(configErr, &wErr) {
// we received a warning from Fleet, set state to degraded and the warning as state string
c.setFleetState(agentclient.Degraded, wErr.Error())
} else {
c.setFleetState(agentclient.Failed, configErr.Error())
}
} else {
// not managed gets sets as an overall error for the agent
c.setConfigManagerError(configErr)
}
case actionsErr := <-c.managerChans.actionsError:
c.setConfigManagerActionsError(actionsErr)
case varsErr := <-c.managerChans.varsManagerError:
c.setVarsManagerError(varsErr)
case otelErr := <-c.managerChans.otelManagerError:
c.setOTelError(otelErr)
case overrideState := <-c.overrideStateChan:
c.setOverrideState(overrideState)
case upgradeDetails := <-c.upgradeDetailsChan:
c.setUpgradeDetails(upgradeDetails)
case c.heartbeatChan <- struct{}{}:
case <-c.componentPIDTicker.C:
// if we hit the ticker and we've got a new PID,
// reload the component model
if c.componentPidRequiresUpdate.Swap(false) {
err := c.refreshComponentModel(ctx)
if err != nil {
err = fmt.Errorf("error refreshing component model for PID update: %w", err)
c.setConfigManagerError(err)
c.logger.Errorf("%s", err)
}
}
case componentState := <-c.managerChans.runtimeManagerUpdate:
// New component change reported by the runtime manager via
// Coordinator.watchRuntimeComponents(), merge it with the
// Coordinator state.
c.applyComponentState(componentState)
case change := <-c.managerChans.configManagerUpdate:
if err := c.processConfig(ctx, change.Config()); err != nil {
c.logger.Errorf("applying new policy: %s", err.Error())
change.Fail(err)
} else {
if err := change.Ack(); err != nil {
err = fmt.Errorf("failed to ack configuration change: %w", err)
// Workaround: setConfigManagerError is usually used by the config
// manager to report failed ACKs / etc when communicating with Fleet.
// We need to report a failed ACK here, but the policy change has
// already been successfully applied so we don't want to report it as
// a general Coordinator or policy failure.
// This arises uniquely here because this is the only case where an
// action is responsible for reporting the failure of its own ACK
// call. The "correct" fix is to make this Ack() call unfailable
// and handle ACK retries and reporting in the config manager like
// with other action types -- this error would then end up invoking
// setConfigManagerError "organically" via the config manager's
// reporting channel. In the meantime, we do it manually.
c.setConfigManagerError(err)
c.logger.Errorf("%s", err.Error())
}
}
case vars := <-c.managerChans.varsManagerUpdate:
if ctx.Err() == nil {
c.processVars(ctx, vars)
}
case collector := <-c.managerChans.otelManagerUpdate:
c.state.Collector = collector
c.stateNeedsRefresh = true
case ll := <-c.logLevelCh:
if ctx.Err() == nil {
c.processLogLevel(ctx, ll)
}
case upgradeMarker := <-c.managerChans.upgradeMarkerUpdate:
if ctx.Err() == nil {
c.setUpgradeDetails(upgradeMarker.Details)
}
}
// At the end of each iteration, if we made any changes to the state,
// collect them and send them to stateBroadcaster.
if c.stateNeedsRefresh {
c.refreshState()
}
}