func()

in internal/pkg/agent/application/coordinator/coordinator.go [1140:1254]


func (c *Coordinator) runLoopIteration(ctx context.Context) {
	select {
	case <-ctx.Done():
		return

	case runtimeErr := <-c.managerChans.runtimeManagerError:
		// runtime manager errors report the result of a policy update.
		// Coordinator transitions from starting to healthy when a policy update
		// is successful.
		c.setRuntimeUpdateError(runtimeErr)
		if runtimeErr == nil {
			c.setCoordinatorState(agentclient.Healthy, "Running")
		}

	case configErr := <-c.managerChans.configManagerError:
		if c.isManaged {
			var wErr *WarningError
			if configErr == nil {
				c.setFleetState(agentclient.Healthy, "Connected")
			} else if errors.As(configErr, &wErr) {
				// we received a warning from Fleet, set state to degraded and the warning as state string
				c.setFleetState(agentclient.Degraded, wErr.Error())
			} else {
				c.setFleetState(agentclient.Failed, configErr.Error())
			}
		} else {
			// not managed gets sets as an overall error for the agent
			c.setConfigManagerError(configErr)
		}

	case actionsErr := <-c.managerChans.actionsError:
		c.setConfigManagerActionsError(actionsErr)

	case varsErr := <-c.managerChans.varsManagerError:
		c.setVarsManagerError(varsErr)

	case otelErr := <-c.managerChans.otelManagerError:
		c.setOTelError(otelErr)

	case overrideState := <-c.overrideStateChan:
		c.setOverrideState(overrideState)

	case upgradeDetails := <-c.upgradeDetailsChan:
		c.setUpgradeDetails(upgradeDetails)

	case c.heartbeatChan <- struct{}{}:

	case <-c.componentPIDTicker.C:
		// if we hit the ticker and we've got a new PID,
		// reload the component model
		if c.componentPidRequiresUpdate.Swap(false) {
			err := c.refreshComponentModel(ctx)
			if err != nil {
				err = fmt.Errorf("error refreshing component model for PID update: %w", err)
				c.setConfigManagerError(err)
				c.logger.Errorf("%s", err)
			}
		}

	case componentState := <-c.managerChans.runtimeManagerUpdate:
		// New component change reported by the runtime manager via
		// Coordinator.watchRuntimeComponents(), merge it with the
		// Coordinator state.
		c.applyComponentState(componentState)

	case change := <-c.managerChans.configManagerUpdate:
		if err := c.processConfig(ctx, change.Config()); err != nil {
			c.logger.Errorf("applying new policy: %s", err.Error())
			change.Fail(err)
		} else {
			if err := change.Ack(); err != nil {
				err = fmt.Errorf("failed to ack configuration change: %w", err)
				// Workaround: setConfigManagerError is usually used by the config
				// manager to report failed ACKs / etc when communicating with Fleet.
				// We need to report a failed ACK here, but the policy change has
				// already been successfully applied so we don't want to report it as
				// a general Coordinator or policy failure.
				// This arises uniquely here because this is the only case where an
				// action is responsible for reporting the failure of its own ACK
				// call. The "correct" fix is to make this Ack() call unfailable
				// and handle ACK retries and reporting in the config manager like
				// with other action types -- this error would then end up invoking
				// setConfigManagerError "organically" via the config manager's
				// reporting channel. In the meantime, we do it manually.
				c.setConfigManagerError(err)
				c.logger.Errorf("%s", err.Error())
			}
		}

	case vars := <-c.managerChans.varsManagerUpdate:
		if ctx.Err() == nil {
			c.processVars(ctx, vars)
		}

	case collector := <-c.managerChans.otelManagerUpdate:
		c.state.Collector = collector
		c.stateNeedsRefresh = true

	case ll := <-c.logLevelCh:
		if ctx.Err() == nil {
			c.processLogLevel(ctx, ll)
		}

	case upgradeMarker := <-c.managerChans.upgradeMarkerUpdate:
		if ctx.Err() == nil {
			c.setUpgradeDetails(upgradeMarker.Details)
		}
	}

	// At the end of each iteration, if we made any changes to the state,
	// collect them and send them to stateBroadcaster.
	if c.stateNeedsRefresh {
		c.refreshState()
	}
}