internal/pkg/policy/self.go (249 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package policy
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"go.elastic.co/apm/v2"
"github.com/rs/zerolog"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/state"
)
// DefaultCheckTime is the default interval for self to check for its policy.
const DefaultCheckTime = 5 * time.Second
// DefaultCheckTimeout is the default timeout when checking for policies.
const DefaultCheckTimeout = 30 * time.Second
type enrollmentTokenFetcher func(ctx context.Context, bulker bulk.Bulk, policyID string) ([]model.EnrollmentAPIKey, error)
type SelfMonitor interface {
// Run runs the monitor.
Run(ctx context.Context) error
// State gets current state of monitor.
State() client.UnitState
}
type selfMonitorT struct {
log zerolog.Logger
mut sync.Mutex
fleet config.Fleet
bulker bulk.Bulk
monitor monitor.Monitor
policyID string
state client.UnitState
reporter state.Reporter
policy *model.Policy
policyF policyFetcher
policiesIndex string
enrollmentTokenF enrollmentTokenFetcher
checkTime time.Duration
startCh chan struct{}
}
// NewSelfMonitor creates the self policy monitor.
//
// Ensures that the policy that this Fleet Server attached to exists and that it
// has a Fleet Server input defined.
func NewSelfMonitor(fleet config.Fleet, bulker bulk.Bulk, monitor monitor.Monitor, policyID string, reporter state.Reporter) SelfMonitor {
return &selfMonitorT{
fleet: fleet,
bulker: bulker,
monitor: monitor,
policyID: policyID,
state: client.UnitStateStarting,
reporter: reporter,
policyF: dl.QueryLatestPolicies,
policiesIndex: dl.FleetPolicies,
enrollmentTokenF: findEnrollmentAPIKeys,
checkTime: DefaultCheckTime,
startCh: make(chan struct{}),
}
}
// Run runs the monitor.
func (m *selfMonitorT) Run(ctx context.Context) error {
m.log = zerolog.Ctx(ctx).With().Str("ctx", "policy self monitor").Logger()
s := m.monitor.Subscribe()
defer m.monitor.Unsubscribe(s)
_, err := m.process(ctx)
if err != nil {
return err
}
cT := time.NewTimer(m.checkTime)
defer cT.Stop()
close(m.startCh)
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case <-cT.C:
state, err := m.process(ctx)
if err != nil {
return err
}
cT.Reset(m.checkTime)
m.log.Trace().Msg(state.String())
case hits := <-s.Output():
policies := make([]model.Policy, len(hits))
for i, hit := range hits {
err := hit.Unmarshal(&policies[i])
if err != nil {
return err
}
}
state, err := m.processPolicies(ctx, policies)
if err != nil {
return err
}
m.log.Trace().Msg(state.String())
}
}
return nil
}
func (m *selfMonitorT) State() client.UnitState {
m.mut.Lock()
defer m.mut.Unlock()
return m.state
}
func (m *selfMonitorT) waitStart(ctx context.Context) error { //nolint:unused // not sure if this is used in tests
select {
case <-ctx.Done():
return ctx.Err()
case <-m.startCh:
}
return nil
}
func (m *selfMonitorT) process(ctx context.Context) (client.UnitState, error) {
if m.bulker.HasTracer() {
trans := m.bulker.StartTransaction("Check self monitor", "bulker")
ctx = apm.ContextWithTransaction(ctx, trans)
defer trans.End()
}
policies, err := m.policyF(ctx, m.bulker, dl.WithIndexName(m.policiesIndex))
if err != nil {
if !errors.Is(err, es.ErrIndexNotFound) {
return client.UnitStateFailed, nil
}
m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error())
}
if len(policies) == 0 {
return m.updateState(ctx)
}
return m.processPolicies(ctx, policies)
}
func (m *selfMonitorT) processPolicies(ctx context.Context, policies []model.Policy) (client.UnitState, error) {
if len(policies) == 0 {
// nothing to do
return client.UnitStateStarting, nil
}
latest := m.groupByLatest(policies)
for i := range latest {
policy := latest[i]
if m.policyID != "" && policy.PolicyID == m.policyID {
m.policy = &policy
break
} else if m.policyID == "" && policy.DefaultFleetServer {
m.policy = &policy
break
}
}
return m.updateState(ctx)
}
func (m *selfMonitorT) groupByLatest(policies []model.Policy) map[string]model.Policy {
return groupByLatest(policies)
}
func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error) {
m.mut.Lock()
defer m.mut.Unlock()
if m.policy == nil {
// no policy found
m.state = client.UnitStateStarting
if m.policyID == "" {
m.reporter.UpdateState(client.UnitStateStarting, "Waiting on default policy with Fleet Server integration", nil) //nolint:errcheck // not clear what to do in failure cases
} else {
m.reporter.UpdateState(client.UnitStateStarting, fmt.Sprintf("Waiting on policy with Fleet Server integration: %s", m.policyID), nil) //nolint:errcheck // not clear what to do in failure cases
}
return client.UnitStateStarting, nil
}
if !HasFleetServerInput(m.policy.Data.Inputs) {
// no fleet-server input
m.state = client.UnitStateStarting
if m.policyID == "" {
m.reporter.UpdateState(client.UnitStateStarting, "Waiting on fleet-server input to be added to default policy", nil) //nolint:errcheck // not clear what to do in failure cases
} else {
m.reporter.UpdateState(client.UnitStateStarting, fmt.Sprintf("Waiting on fleet-server input to be added to policy: %s", m.policyID), nil) //nolint:errcheck // not clear what to do in failure cases
}
return client.UnitStateStarting, nil
}
reportOutputHealth(ctx, m.bulker, m.log)
state := client.UnitStateHealthy
extendMsg := ""
var payload map[string]interface{}
if m.fleet.Agent.ID == "" {
state = client.UnitStateDegraded
extendMsg = "; missing config fleet.agent.id (expected during bootstrap process)"
// Elastic Agent has not been enrolled; Fleet Server passes back the enrollment token so the Elastic Agent
// can perform enrollment.
tokens, err := m.enrollmentTokenF(ctx, m.bulker, m.policy.PolicyID)
if err != nil {
return client.UnitStateFailed, err
}
if len(tokens) == 0 {
// no tokens created for the policy, still starting
if m.policyID == "" {
m.reporter.UpdateState(client.UnitStateStarting, "Waiting on active enrollment keys to be created in default policy with Fleet Server integration", nil) //nolint:errcheck // not clear what to do in failure cases
} else {
m.reporter.UpdateState(client.UnitStateStarting, fmt.Sprintf("Waiting on active enrollment keys to be created in policy with Fleet Server integration: %s", m.policyID), nil) //nolint:errcheck // not clear what to do in failure cases
}
return client.UnitStateStarting, nil
}
payload = map[string]interface{}{
"enrollment_token": tokens[0].APIKey,
}
}
m.state = state
if m.policyID == "" {
m.reporter.UpdateState(state, fmt.Sprintf("Running on default policy with Fleet Server integration%s", extendMsg), payload) //nolint:errcheck // not clear what to do in failure cases
} else {
m.reporter.UpdateState(state, fmt.Sprintf("Running on policy with Fleet Server integration: %s%s", m.policyID, extendMsg), payload) //nolint:errcheck // not clear what to do in failure cases
}
return state, nil
}
func isOutputCfgOutdated(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger, outputName string) bool {
policy, err := dl.QueryOutputFromPolicy(ctx, bulker, outputName)
if err != nil || policy == nil {
return true
}
hasChanged := bulker.RemoteOutputConfigChanged(zlog, outputName, policy.Data.Outputs[outputName])
return hasChanged
}
func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger) {
//pinging logic
bulkerMap := bulker.GetBulkerMap()
for outputName, outputBulker := range bulkerMap {
if isOutputCfgOutdated(ctx, bulker, zlog, outputName) {
continue
}
doc := model.OutputHealth{
Output: outputName,
State: client.UnitStateHealthy.String(),
Message: "",
}
res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx))
if err != nil {
doc.State = client.UnitStateDegraded.String()
doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error())
zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg(doc.Message)
} else if res.StatusCode != 200 {
doc.State = client.UnitStateDegraded.String()
doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode)
zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg(doc.Message)
}
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg("error writing output health")
}
}
}
func HasFleetServerInput(inputs []map[string]interface{}) bool {
for _, input := range inputs {
attr, ok := input["type"].(string)
if !ok {
return false
}
if attr == "fleet-server" {
return true
}
}
return false
}
func findEnrollmentAPIKeys(ctx context.Context, bulker bulk.Bulk, policyID string) ([]model.EnrollmentAPIKey, error) {
return dl.FindEnrollmentAPIKeys(ctx, bulker, dl.QueryEnrollmentAPIKeyByPolicyID, dl.FieldPolicyID, policyID)
}