x-pack/libbeat/management/unit.go (308 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package management import ( "fmt" "sync" "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" ) // unitState is the current state of a unit type unitState struct { state status.Status msg string } type clientUnit interface { ID() string Type() client.UnitType Expected() client.Expected UpdateState(state client.UnitState, message string, payload map[string]interface{}) error RegisterAction(action client.Action) UnregisterAction(action client.Action) RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) } // agentUnit implements status.StatusReporter and holds an unitState // for the input as well as a unitState for each stream of // the input in when this a client.UnitTypeInput. type agentUnit struct { softDeleted bool mtx sync.Mutex logger *logp.Logger clientUnit clientUnit inputLevelState unitState streamIDs []string streamStates map[string]unitState } // getUnitState converts status.Status to client.UnitState func getUnitState(s status.Status) client.UnitState { switch s { case status.Unknown: // must be started if its unknown return client.UnitStateStarting case status.Starting: return client.UnitStateStarting case status.Configuring: return client.UnitStateConfiguring case status.Running: return client.UnitStateHealthy case status.Degraded: return client.UnitStateDegraded case status.Failed: return client.UnitStateFailed case status.Stopping: return client.UnitStateStopping case status.Stopped: return client.UnitStateStopped default: // as this is an unknown state, return failed to get some attention return client.UnitStateFailed } } // getUnitState converts status.Status to client.UnitState func getStatus(s client.UnitState) status.Status { switch s { case client.UnitStateStarting: return status.Starting case client.UnitStateConfiguring: return status.Configuring case client.UnitStateHealthy: return status.Running case client.UnitStateDegraded: return status.Degraded case client.UnitStateFailed: return status.Failed case client.UnitStateStopping: return status.Stopping case client.UnitStateStopped: return status.Stopped default: return status.Unknown } } func getStreamStates(expected client.Expected) (map[string]unitState, []string) { expectedCfg := expected.Config if expectedCfg == nil { return nil, nil } streamStates := make(map[string]unitState, len(expectedCfg.Streams)) streamIDs := make([]string, len(expectedCfg.Streams)) for idx, stream := range expectedCfg.Streams { streamState := unitState{ state: status.Unknown, msg: "", } if id := stream.GetId(); id != "" { streamIDs[idx] = id streamStates[id] = streamState continue } if cfgName := expectedCfg.GetName(); cfgName != "" { id := fmt.Sprintf("%s.[%d]", cfgName, idx) streamIDs[idx] = id streamStates[id] = streamState continue } id := fmt.Sprintf("%s.[%d]", expectedCfg.GetId(), idx) streamIDs[idx] = id streamStates[id] = streamState } return streamStates, streamIDs } // newAgentUnit creates a new agentUnit. In case the supplied client.Unit is of type // client.UnitTypeInput it initializes the streamStates with a unitState.Unknown func newAgentUnit(cu clientUnit, log *logp.Logger) *agentUnit { var ( streamStates map[string]unitState streamIDs []string ) if cu.Type() == client.UnitTypeInput { streamStates, streamIDs = getStreamStates(cu.Expected()) } return &agentUnit{ clientUnit: cu, logger: log, streamIDs: streamIDs, streamStates: streamStates, } } // RegisterAction registers action handler for this unit. func (u *agentUnit) RegisterAction(action client.Action) { u.mtx.Lock() defer u.mtx.Unlock() if u.clientUnit == nil { return } u.clientUnit.RegisterAction(action) } // UnregisterAction unregisters action handler with the client. func (u *agentUnit) UnregisterAction(action client.Action) { u.mtx.Lock() defer u.mtx.Unlock() if u.clientUnit == nil { return } u.clientUnit.UnregisterAction(action) } func (u *agentUnit) RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) { u.mtx.Lock() defer u.mtx.Unlock() if u.clientUnit == nil { return } u.clientUnit.RegisterDiagnosticHook(name, description, filename, contentType, hook) } func (u *agentUnit) Expected() client.Expected { u.mtx.Lock() defer u.mtx.Unlock() if u.clientUnit == nil { return client.Expected{} } return u.clientUnit.Expected() } func (u *agentUnit) ID() string { u.mtx.Lock() defer u.mtx.Unlock() if u.clientUnit == nil { return "" } return u.clientUnit.ID() } // calcState calculates the current state of the unit. func (u *agentUnit) calcState() (status.Status, string) { // for type output return the unit state directly as it has no streams if u.clientUnit.Type() == client.UnitTypeOutput { return u.inputLevelState.state, u.inputLevelState.msg } // if inputLevelState state is not running return the inputLevelState state if u.inputLevelState.state != status.Running { return u.inputLevelState.state, u.inputLevelState.msg } // inputLevelState state is marked as running, check the stream states reportedStatus := status.Running reportedMsg := "Healthy" for _, streamState := range u.streamStates { switch streamState.state { case status.Degraded: if reportedStatus != status.Degraded { reportedStatus = status.Degraded reportedMsg = streamState.msg } case status.Failed: // return the first failed stream return streamState.state, streamState.msg } } return reportedStatus, reportedMsg } // Type of the unit. func (u *agentUnit) Type() client.UnitType { u.mtx.Lock() defer u.mtx.Unlock() if u.clientUnit == nil { return client.UnitTypeInput } return u.clientUnit.Type() } // UpdateState updates the state for the unit. func (u *agentUnit) UpdateState(state status.Status, msg string, payload map[string]interface{}) error { u.mtx.Lock() defer u.mtx.Unlock() if u.clientUnit == nil { return nil } if u.inputLevelState.state == state && u.inputLevelState.msg == msg { return nil } u.inputLevelState = unitState{ state: state, msg: msg, } state, msg = u.calcState() if u.clientUnit.Type() == client.UnitTypeOutput || len(u.streamStates) == 0 { return u.clientUnit.UpdateState(getUnitState(state), msg, payload) } streamsPayload := make(map[string]interface{}, len(u.streamStates)) for streamID, streamState := range u.streamStates { streamsPayload[streamID] = map[string]interface{}{ "status": getUnitState(streamState.state).String(), "error": streamState.msg, } } if payload == nil { payload = make(map[string]interface{}) } payload["streams"] = streamsPayload return u.clientUnit.UpdateState(getUnitState(state), msg, payload) } // updateStateForStream updates the state for a specific stream in the agent unit. func (u *agentUnit) updateStateForStream(streamID string, state status.Status, msg string) { u.mtx.Lock() defer u.mtx.Unlock() if u.clientUnit == nil || u.streamStates == nil { return } if _, ok := u.streamStates[streamID]; !ok { return } if u.streamStates[streamID].state == state { return } u.streamStates[streamID] = unitState{ state: state, msg: msg, } state, msg = u.calcState() streamsPayload := make(map[string]interface{}, len(u.streamStates)) for id, streamState := range u.streamStates { streamsPayload[id] = map[string]interface{}{ "status": getUnitState(streamState.state).String(), "error": streamState.msg, } } payload := map[string]interface{}{ "streams": streamsPayload, } if err := u.clientUnit.UpdateState(getUnitState(state), msg, payload); err != nil { u.logger.Warnf("failed to update state for input %s: %v", u.ID(), err) } } func (u *agentUnit) update(cu *client.Unit) { u.mtx.Lock() defer u.mtx.Unlock() u.softDeleted = false u.clientUnit = cu inputStatus := getStatus(cu.Expected().State) if u.inputLevelState.state != inputStatus { u.inputLevelState = unitState{ state: inputStatus, } } newStreamStates, newStreamIDs := getStreamStates(cu.Expected()) for key, state := range newStreamStates { if _, exists := u.streamStates[key]; exists { continue } u.streamStates[key] = state } for key := range u.streamStates { if _, exists := newStreamStates[key]; !exists { delete(u.streamStates, key) } } switch { case len(newStreamIDs) != len(u.streamIDs): u.streamIDs = newStreamIDs default: for idx, streamID := range u.streamIDs { if newStreamIDs[idx] != streamID { u.streamIDs = newStreamIDs break } } } } func (u *agentUnit) markAsDeleted() { u.mtx.Lock() defer u.mtx.Unlock() u.softDeleted = true } // GetReporterForStreamByIndex returns a status reporter for the stream at the given index. // Note if the index is out of range it returns nil. It is up to the caller to check the return value. func (u *agentUnit) GetReporterForStreamByIndex(idx int) status.StatusReporter { u.mtx.Lock() defer u.mtx.Unlock() if idx >= len(u.streamIDs) { return nil } return &streamStatusReporter{ id: u.streamIDs[idx], unit: u, } } // streamStatusReporter implements status.StatusReporter type streamStatusReporter struct { id string unit *agentUnit } // UpdateStatus updates the status of the stream unit. func (s *streamStatusReporter) UpdateStatus(state status.Status, msg string) { s.unit.updateStateForStream(s.id, state, msg) } // ID of the stream unit. func (s *streamStatusReporter) ID() string { return s.id }