pkg/client/unit.go (289 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package client import ( "sync" gproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) // UnitType is the type of the unit, either input or output type UnitType proto.UnitType const ( // UnitTypeInput is an input unit. UnitTypeInput = UnitType(proto.UnitType_INPUT) // UnitTypeOutput is an output unit. UnitTypeOutput = UnitType(proto.UnitType_OUTPUT) ) // String returns string representation for the unit type. func (t UnitType) String() string { switch t { case UnitTypeInput: return "input" case UnitTypeOutput: return "output" } return "unknown" } // UnitLogLevel is the log level the unit should run at. type UnitLogLevel proto.UnitLogLevel const ( // UnitLogLevelError is when the unit should log at error level. UnitLogLevelError = UnitLogLevel(proto.UnitLogLevel_ERROR) // UnitLogLevelWarn is when the unit should log at warn level. UnitLogLevelWarn = UnitLogLevel(proto.UnitLogLevel_WARN) // UnitLogLevelInfo is when the unit should log at info level. UnitLogLevelInfo = UnitLogLevel(proto.UnitLogLevel_INFO) // UnitLogLevelDebug is when the unit should log at debug level. UnitLogLevelDebug = UnitLogLevel(proto.UnitLogLevel_DEBUG) // UnitLogLevelTrace is when the unit should log at trace level. UnitLogLevelTrace = UnitLogLevel(proto.UnitLogLevel_TRACE) ) // String returns string representation for the unit log level. func (l UnitLogLevel) String() string { switch l { case UnitLogLevelError: return "error" case UnitLogLevelWarn: return "warn" case UnitLogLevelInfo: return "info" case UnitLogLevelDebug: return "debug" case UnitLogLevelTrace: return "trace" } return "unknown" } // UnitState is the state for the unit, used both for expected and observed state. type UnitState proto.State const ( // UnitStateStarting is when a unit is starting. UnitStateStarting = UnitState(proto.State_STARTING) // UnitStateConfiguring is when a unit is currently configuring. UnitStateConfiguring = UnitState(proto.State_CONFIGURING) // UnitStateHealthy is when the unit is working exactly as it should. UnitStateHealthy = UnitState(proto.State_HEALTHY) // UnitStateDegraded is when the unit is working but not exactly as its expected. UnitStateDegraded = UnitState(proto.State_DEGRADED) // UnitStateFailed is when the unit is completely broken and failing to work. UnitStateFailed = UnitState(proto.State_FAILED) // UnitStateStopping is when the unit is stopping. UnitStateStopping = UnitState(proto.State_STOPPING) // UnitStateStopped is when the unit is stopped. UnitStateStopped = UnitState(proto.State_STOPPED) ) // String returns string representation for the unit state. func (l UnitState) String() string { switch l { case UnitStateStarting: return "STARTING" case UnitStateConfiguring: return "CONFIGURING" case UnitStateHealthy: return "HEALTHY" case UnitStateDegraded: return "DEGRADED" case UnitStateFailed: return "FAILED" case UnitStateStopping: return "STOPPING" case UnitStateStopped: return "STOPPED" } return "UNKNOWN" } // Unit represents a distinct item that needs to be operating with-in this process. // // This is normally N number of inputs and 1 output (possible for multiple in the future). type Unit struct { id string unitType UnitType expectedStateMu sync.RWMutex expectedState UnitState logLevel UnitLogLevel config *proto.UnitExpectedConfig configIdx uint64 // do I need a mutex? features *proto.Features featuresIdx uint64 // do I really need it? apm *proto.APMConfig stateMu sync.RWMutex state UnitState stateMsg string statePayload *structpb.Struct amx sync.RWMutex actions map[string]Action client *clientV2 dmx sync.RWMutex diagHooks map[string]diagHook } // ID of the unit. func (u *Unit) ID() string { return u.id } // Type of the unit. func (u *Unit) Type() UnitType { return u.unitType } // Expected contains the expected state, log level, features and config for a unit. type Expected struct { Config *proto.UnitExpectedConfig Features *proto.Features LogLevel UnitLogLevel State UnitState APMConfig *proto.APMConfig } // Expected returns the expected state, log level, features, apm and config for the unit. func (u *Unit) Expected() Expected { u.expectedStateMu.RLock() defer u.expectedStateMu.RUnlock() return Expected{ Config: u.config, Features: u.features, LogLevel: u.logLevel, State: u.expectedState, APMConfig: u.apm, } } // State returns the currently reported state for the unit. func (u *Unit) State() (UnitState, string, map[string]interface{}) { u.stateMu.RLock() defer u.stateMu.RUnlock() return u.state, u.stateMsg, u.statePayload.AsMap() } // UpdateState updates the state for the unit. func (u *Unit) UpdateState(state UnitState, message string, payload map[string]interface{}) error { var statePayload *structpb.Struct var err error if payload != nil { statePayload, err = structpb.NewStruct(payload) if err != nil { return err } } u.stateMu.Lock() defer u.stateMu.Unlock() changed := false if u.state != state { u.state = state changed = true } if u.stateMsg != message { u.stateMsg = message changed = true } if (u.statePayload == nil && statePayload != nil) || (u.statePayload != nil && statePayload == nil) || !gproto.Equal(u.statePayload, statePayload) { u.statePayload = statePayload changed = true } if changed { u.client.unitsStateChanged() } return nil } // RegisterAction registers action handler for this unit. func (u *Unit) RegisterAction(action Action) { u.amx.Lock() defer u.amx.Unlock() u.actions[action.Name()] = action } // UnregisterAction unregisters action handler with the client func (u *Unit) UnregisterAction(action Action) { u.amx.Lock() defer u.amx.Unlock() delete(u.actions, action.Name()) } // GetAction finds an action by its name. func (u *Unit) GetAction(name string) (Action, bool) { u.amx.RLock() defer u.amx.RUnlock() act, ok := u.actions[name] return act, ok } // Store returns the store client. func (u *Unit) Store() StoreClient { return &storeClient{ client: u.client, unitID: u.id, unitType: u.unitType, } } // Artifacts returns the artifacts client. func (u *Unit) Artifacts() ArtifactsClient { return u.client.Artifacts() } // Logger returns the log client. func (u *Unit) Logger() LogClient { return &logClient{ client: u.client, unitID: u.id, unitType: u.unitType, } } // RegisterDiagnosticHook registers a diagnostic hook function that will get called when diagnostics is called for // as this unit. Registering the hook at the unit level means it will only be called when diagnostics is requested // for this specific unit. func (u *Unit) RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook DiagnosticHook) { u.dmx.Lock() defer u.dmx.Unlock() u.diagHooks[name] = diagHook{ description: description, filename: filename, contentType: contentType, hook: hook, } } // RegisterOptionalDiagnosticHook is the same as RegisterDiagnosticHook, but it takes a paramTag value that corrisponds // the a parameter in the diagnostic request action. The diagnostic will only run if the action contains that tag in the Params field. func (u *Unit) RegisterOptionalDiagnosticHook(paramTag string, name string, description string, filename string, contentType string, hook DiagnosticHook) { u.dmx.Lock() defer u.dmx.Unlock() u.diagHooks[name] = diagHook{ description: description, filename: filename, contentType: contentType, hook: hook, optionalWithParamTag: paramTag, } } // updateState updates the configuration for this unit, triggering the delegate // function if set. func (u *Unit) updateState( exp UnitState, logLevel UnitLogLevel, expFeaturesIdx uint64, expFeatures *proto.Features, cfg *proto.UnitExpectedConfig, cfgIdx uint64, expAPM *proto.APMConfig) Trigger { var triggers Trigger u.expectedStateMu.Lock() defer u.expectedStateMu.Unlock() if u.expectedState != exp { u.expectedState = exp triggers |= TriggeredStateChange } if u.logLevel != logLevel { u.logLevel = logLevel triggers |= TriggeredLogLevelChange } if u.featuresIdx != expFeaturesIdx { u.featuresIdx = expFeaturesIdx if expFeatures != nil && !gproto.Equal(u.features, expFeatures) { u.features = expFeatures triggers |= TriggeredFeatureChange } } if u.configIdx != cfgIdx { u.configIdx = cfgIdx u.config = cfg triggers |= TriggeredConfigChange } if !gproto.Equal(u.apm, expAPM) { u.apm = expAPM triggers |= TriggeredAPMChange } return triggers } // toObserved returns the observed unit protocol to send over the stream. func (u *Unit) toObserved() *proto.UnitObserved { u.expectedStateMu.RLock() cfgIdx := u.configIdx u.expectedStateMu.RUnlock() u.stateMu.RLock() defer u.stateMu.RUnlock() return &proto.UnitObserved{ Id: u.id, Type: proto.UnitType(u.unitType), ConfigStateIdx: cfgIdx, State: proto.State(u.state), Message: u.stateMsg, Payload: u.statePayload, } } // newUnit creates a new unit that needs to be created in this process. func newUnit( id string, unitType UnitType, exp UnitState, logLevel UnitLogLevel, cfg *proto.UnitExpectedConfig, cfgIdx uint64, features *proto.Features, apmConfig *proto.APMConfig, client *clientV2, ) *Unit { unit := Unit{ id: id, unitType: unitType, config: cfg, configIdx: cfgIdx, expectedState: exp, logLevel: logLevel, features: features, state: UnitStateStarting, stateMsg: "Starting", client: client, actions: make(map[string]Action), diagHooks: make(map[string]diagHook), apm: apmConfig, } return &unit }