in pkg/client/client_v2.go [653:733]
func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
c.unitsMu.Lock()
defer c.unitsMu.Unlock()
if c.removeUnexpectedUnits(expected) > 0 {
// Most state changes are reported by the unit itself via
// Unit.UpdateState, which schedules a checkin update by calling
// unitsStateChanged(), but removals are handled immediately
// here so we also need to trigger the update ourselves.
c.unitsStateChanged()
}
var apmConfig *proto.APMConfig
func() {
c.componentMu.RLock()
defer c.componentMu.RUnlock()
if c.componentConfig != nil {
apmConfig = c.componentConfig.ApmConfig
}
}()
for _, agentUnit := range expected.Units {
unit := c.findUnit(agentUnit.Id, UnitType(agentUnit.Type))
if unit == nil {
// new unit
unit = newUnit(
agentUnit.Id,
UnitType(agentUnit.Type),
UnitState(agentUnit.State),
UnitLogLevel(agentUnit.LogLevel),
agentUnit.Config,
agentUnit.ConfigStateIdx,
expected.Features,
apmConfig,
c)
c.units = append(c.units, unit)
changed := UnitChanged{
Type: UnitChangedAdded,
Unit: unit,
}
if expected.Features != nil {
changed.Triggers = TriggeredFeatureChange
}
if apmConfig != nil {
changed.Triggers |= TriggeredAPMChange
}
c.changesCh <- changed
} else {
// existing unit
triggers := unit.updateState(
UnitState(agentUnit.State),
UnitLogLevel(agentUnit.LogLevel),
expected.FeaturesIdx,
expected.Features,
agentUnit.Config,
agentUnit.ConfigStateIdx,
apmConfig,
)
changed := UnitChanged{
Triggers: triggers,
Type: UnitChangedModified,
Unit: unit,
}
if changed.Triggers > TriggeredNothing { // a.k.a something changed
c.changesCh <- changed
}
}
}
// Now that we've propagated feature flags' information to units, record
// the featuresIdx on the client so we can send it up as part of the observed
// state in the next checkin.
// (It's safe to write to featuresIdx here since we hold c.unitsMu.)
c.featuresIdx = expected.FeaturesIdx
}