pkg/component/runtime/state.go (433 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package runtime import ( "errors" "fmt" "reflect" gproto "google.golang.org/protobuf/proto" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent/pkg/component" ) const ( startingMsg = "Starting" stoppedMsg = "Stopped" unknownMsg = "Failed: reported unit is unknown" missingMsg = "Failed: not reported in check-in" ) // ComponentUnitState is the state for a unit running in a component. type ComponentUnitState struct { State client.UnitState `yaml:"state"` Message string `yaml:"message"` Payload map[string]interface{} `yaml:"payload,omitempty"` // internal unitState client.UnitState unitMessage string unitPayload map[string]interface{} configStateIdx uint64 err error } // ComponentUnitKey is a composite key to identify a unit by its type and ID. type ComponentUnitKey struct { UnitType client.UnitType UnitID string } // MarshalYAML implements the Marshaller interface for the componentUnitKey func (key ComponentUnitKey) MarshalYAML() (interface{}, error) { return fmt.Sprintf("%s-%s", key.UnitType.String(), key.UnitID), nil } // ComponentVersionInfo provides version information reported by the component. type ComponentVersionInfo struct { // Name of the binary. Name string `yaml:"name"` // Additional metadata about the binary. Meta map[string]string `yaml:"meta,omitempty"` // BuildHash is the VCS commit hash the program was built from. BuildHash string `yaml:"build_hash"` } // ComponentState is the overall state of the component. type ComponentState struct { State client.UnitState `yaml:"state"` Message string `yaml:"message"` Units map[ComponentUnitKey]ComponentUnitState `yaml:"units"` // We don't serialize the Features field as YAML so it doesn't show up // in the diagnostics-generated state.yaml file, keeping it concise. Features *proto.Features `yaml:"-"` FeaturesIdx uint64 `yaml:"features_idx"` Component *proto.Component `yaml:"component,omitempty"` ComponentIdx uint64 `yaml:"component_idx"` VersionInfo ComponentVersionInfo `yaml:"version_info"` // The PID of the process, as obtained from the *from the Protobuf API* // As of now, this is only used by Endpoint, as agent doesn't know the PID // of the endpoint service. If you need the PID for beats, use the coordinator/communicator Pid uint64 // internal expectedUnits map[ComponentUnitKey]expectedUnitState expectedFeatures *proto.Features expectedFeaturesIdx uint64 expectedComponent *proto.Component expectedComponentIdx uint64 } // expectedUnitState is the expected state of a unit. type expectedUnitState struct { state client.UnitState configStateIdx uint64 config *proto.UnitExpectedConfig err error logLevel client.UnitLogLevel } func newComponentState(comp *component.Component) (s ComponentState) { s.State = client.UnitStateStarting s.Message = startingMsg s.Units = make(map[ComponentUnitKey]ComponentUnitState) s.expectedUnits = make(map[ComponentUnitKey]expectedUnitState) s.expectedFeaturesIdx = 1 s.expectedComponentIdx = 1 // Merge initial component state. s.syncExpected(comp) s.syncUnits(comp) return s } // Copy returns a copy of the structure. func (s *ComponentState) Copy() (c ComponentState) { c = *s c.Units = make(map[ComponentUnitKey]ComponentUnitState) for k, v := range s.Units { c.Units[k] = v } c.expectedUnits = make(map[ComponentUnitKey]expectedUnitState) for k, v := range s.expectedUnits { c.expectedUnits[k] = v } c.Features = s.Features c.FeaturesIdx = s.FeaturesIdx c.expectedFeatures = s.expectedFeatures c.expectedFeaturesIdx = s.expectedFeaturesIdx c.Component = s.Component c.ComponentIdx = s.ComponentIdx c.expectedComponent = s.expectedComponent c.expectedComponentIdx = s.expectedComponentIdx return c } func (s *ComponentState) syncExpected(comp *component.Component) bool { changed := false touched := make(map[ComponentUnitKey]bool) for _, unit := range comp.Units { key := ComponentUnitKey{ UnitType: unit.Type, UnitID: unit.ID, } touched[key] = true expected, ok := s.expectedUnits[key] if ok { if expected.logLevel != unit.LogLevel { expected.logLevel = unit.LogLevel changed = true } if !gproto.Equal(expected.config, unit.Config) { expected.config = unit.Config expected.configStateIdx++ changed = true } } else { expected.state = client.UnitStateHealthy expected.logLevel = unit.LogLevel expected.config = unit.Config expected.configStateIdx = 1 changed = true } if !errors.Is(expected.err, unit.Err) { expected.err = unit.Err if expected.err != nil { expected.state = client.UnitStateFailed } changed = true } s.expectedUnits[key] = expected } for key, unit := range s.expectedUnits { _, ok := touched[key] if !ok { if unit.state != client.UnitStateStopped { unit.state = client.UnitStateStopped changed = true // unit is a copy and must be set back into the map s.expectedUnits[key] = unit } } } if !gproto.Equal(s.expectedFeatures, comp.Features) { changed = true s.expectedFeaturesIdx++ s.expectedFeatures = comp.Features } if !gproto.Equal(s.expectedComponent, comp.Component) { changed = true s.expectedComponentIdx++ s.expectedComponent = comp.Component } return changed } func (s *ComponentState) syncUnits(comp *component.Component) bool { changed := false touched := make(map[ComponentUnitKey]bool) for _, unit := range comp.Units { key := ComponentUnitKey{ UnitType: unit.Type, UnitID: unit.ID, } touched[key] = true existing, ok := s.Units[key] if !ok { existing.State = client.UnitStateStarting existing.Message = startingMsg existing.Payload = nil existing.configStateIdx = 0 existing.unitState = client.UnitStateStarting existing.unitMessage = startingMsg existing.unitPayload = nil changed = true } existing.err = unit.Err if existing.err != nil { errMsg := existing.err.Error() if existing.State != client.UnitStateFailed || existing.Message != errMsg || diffPayload(existing.Payload, nil) { existing.State = client.UnitStateFailed existing.Message = existing.err.Error() existing.Payload = nil changed = true } } s.Units[key] = existing } for key, unit := range s.Units { _, ok := touched[key] if !ok { if unit.State != client.UnitStateStopped { unit.State = client.UnitStateStopped unit.Message = stoppedMsg unit.Payload = nil unit.unitState = client.UnitStateStopped unit.unitMessage = stoppedMsg unit.unitPayload = nil changed = true // unit is a copy and must be set back into the map s.Units[key] = unit } } } if !gproto.Equal(s.Features, comp.Features) { s.Features = comp.Features changed = true } if !gproto.Equal(s.Component, comp.Component) { s.Component = comp.Component changed = true } return changed } func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { changed := false if s.Pid != checkin.Pid { changed = true s.Pid = checkin.Pid } touched := make(map[ComponentUnitKey]bool) for _, unit := range checkin.Units { key := ComponentUnitKey{ UnitType: client.UnitType(unit.Type), UnitID: unit.Id, } var payload map[string]interface{} if unit.Payload != nil { payload = unit.Payload.AsMap() } touched[key] = true _, inExpected := s.expectedUnits[key] existing := s.Units[key] existing.unitState = client.UnitState(unit.State) existing.unitMessage = unit.Message existing.unitPayload = payload existing.configStateIdx = unit.ConfigStateIdx if existing.err != nil && existing.unitState != client.UnitStateStopped { errMsg := existing.err.Error() if existing.State != client.UnitStateFailed || existing.Message != errMsg || diffPayload(existing.Payload, nil) { changed = true existing.State = client.UnitStateFailed existing.Message = errMsg existing.Payload = nil } } else if !inExpected && existing.unitState != client.UnitStateStopped { if existing.State != client.UnitStateFailed || existing.Message != unknownMsg || diffPayload(existing.Payload, nil) { changed = true existing.State = client.UnitStateFailed existing.Message = unknownMsg existing.Payload = nil } } else { if existing.unitState != existing.State || existing.unitMessage != existing.Message || diffPayload(existing.unitPayload, existing.Payload) { changed = true existing.State = existing.unitState existing.Message = existing.unitMessage existing.Payload = existing.unitPayload } } s.Units[key] = existing } for key, unit := range s.Units { // Look for units that weren't in the checkin. if _, ok := touched[key]; ok { continue } unit.unitState = client.UnitStateStarting unit.unitMessage = "" unit.unitPayload = nil unit.configStateIdx = 0 if unit.err != nil { errMsg := unit.err.Error() if unit.State != client.UnitStateFailed || unit.Message != errMsg || diffPayload(unit.Payload, nil) { changed = true unit.State = client.UnitStateFailed unit.Message = errMsg unit.Payload = nil } } else if unit.State != client.UnitStateStarting && unit.State != client.UnitStateStopped { if unit.State != client.UnitStateFailed || unit.Message != missingMsg || diffPayload(unit.Payload, nil) { changed = true unit.State = client.UnitStateFailed unit.Message = missingMsg unit.Payload = nil } } s.Units[key] = unit } if checkin.VersionInfo != nil { if checkin.VersionInfo.Name != "" && s.VersionInfo.Name != checkin.VersionInfo.Name { s.VersionInfo.Name = checkin.VersionInfo.Name changed = true } if checkin.VersionInfo.BuildHash != "" && s.VersionInfo.BuildHash != checkin.VersionInfo.BuildHash { s.VersionInfo.BuildHash = checkin.VersionInfo.BuildHash changed = true } if checkin.VersionInfo.Meta != nil && diffMeta(s.VersionInfo.Meta, checkin.VersionInfo.Meta) { s.VersionInfo.Meta = checkin.VersionInfo.Meta changed = true } } if s.FeaturesIdx != checkin.FeaturesIdx { s.FeaturesIdx = checkin.FeaturesIdx changed = true } if s.ComponentIdx != checkin.ComponentIdx { s.ComponentIdx = checkin.ComponentIdx changed = true } return changed } func (s *ComponentState) unsettled() bool { if len(s.expectedUnits) != len(s.Units) { // mismatch on unit count return true } for ek, e := range s.expectedUnits { o, ok := s.Units[ek] if !ok { // unit missing return true } if o.configStateIdx != e.configStateIdx || e.state != o.State { // config or state mismatch return true } } return s.FeaturesIdx != s.expectedFeaturesIdx || s.ComponentIdx != s.expectedComponentIdx } func (s *ComponentState) toCheckinExpected() *proto.CheckinExpected { units := make([]*proto.UnitExpected, 0, len(s.expectedUnits)) for k, u := range s.expectedUnits { e := &proto.UnitExpected{ Id: k.UnitID, Type: proto.UnitType(k.UnitType), State: proto.State(u.state), LogLevel: proto.UnitLogLevel(u.logLevel), ConfigStateIdx: u.configStateIdx, Config: nil, } o, ok := s.Units[k] if !ok || o.configStateIdx != u.configStateIdx { e.Config = u.config } if u.err != nil { if !ok || o.unitState == client.UnitStateStopped || o.configStateIdx == 0 { // unit not existing, already stopped or never sent continue } // unit in error needs to be stopped (no config change) e.State = proto.State_STOPPED e.ConfigStateIdx = o.configStateIdx e.Config = nil } units = append(units, e) } return &proto.CheckinExpected{ Units: units, Features: s.expectedFeatures, FeaturesIdx: s.expectedFeaturesIdx, Component: s.expectedComponent, ComponentIdx: s.expectedComponentIdx, } } func (s *ComponentState) cleanupStopped() bool { cleaned := false for ek, e := range s.expectedUnits { if e.state == client.UnitStateStopped { // should be stopped; check if observed is also reporting stopped o, ok := s.Units[ek] if ok && o.unitState == client.UnitStateStopped { // its also stopped; so it can now be removed from both delete(s.expectedUnits, ek) delete(s.Units, ek) cleaned = true } } } for k, u := range s.Units { _, ok := s.expectedUnits[k] if !ok && u.State == client.UnitStateStopped { // stopped unit that is not expected (remove it) delete(s.Units, k) cleaned = true } } return cleaned } // forceState force updates the state for the entire component, forcing that // state on all units. It returns true if either the component state or any of // the units state changed, false otherwise. func (s *ComponentState) forceState(state client.UnitState, msg string) bool { changed := false if s.State != state || s.Message != msg { s.State = state s.Message = msg changed = true } for k, unit := range s.Units { unitState := state unitMsg := msg if unit.err != nil && state != client.UnitStateStopped { // must stay as failed as then unit config is in error unitState = client.UnitStateFailed unitMsg = unit.err.Error() } if unit.State != unitState || unit.Message != unitMsg || diffPayload(unit.Payload, nil) { unit.State = unitState unit.Message = unitMsg unit.Payload = nil changed = true } // unit is a copy and must be set back into the map s.Units[k] = unit } return changed } // forceExpectedState force updates the expected state for the entire component, forcing that state on all expected units. func (s *ComponentState) forceExpectedState(state client.UnitState) { for k, unit := range s.expectedUnits { if unit.state != state { unit.state = state } // unit is a copy and must be set back into the map s.expectedUnits[k] = unit } } // compState updates just the component state not all the units. func (s *ComponentState) compState(state client.UnitState, msg string) bool { if s.State != state || s.Message != msg { s.State = state s.Message = msg return true } return false } func diffPayload(existing map[string]interface{}, new map[string]interface{}) bool { if existing == nil && new != nil { return true } if existing != nil && new == nil { return true } return !reflect.DeepEqual(existing, new) } func diffMeta(existing map[string]string, new map[string]string) bool { if existing == nil && new != nil { return true } if existing != nil && new == nil { return true } return !reflect.DeepEqual(existing, new) }