internal/pkg/agent/application/managed_mode.go (323 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package application
import (
"context"
"fmt"
"time"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/fleet"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/uploader"
"github.com/elastic/elastic-agent/internal/pkg/queue"
"github.com/elastic/elastic-agent/internal/pkg/remote"
"github.com/elastic/elastic-agent/internal/pkg/runner"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
// dispatchFlushInterval is the max time between calls to dispatcher.Dispatch
const dispatchFlushInterval = time.Minute * 5
type managedConfigManager struct {
log *logger.Logger
agentInfo info.Agent
cfg *configuration.Configuration
client *remote.Client
store storage.Store
stateStore *store.StateStore
actionQueue *queue.ActionQueue
dispatcher *dispatcher.ActionDispatcher
runtime *runtime.Manager
coord *coordinator.Coordinator
fleetInitTimeout time.Duration
initialClientSetters []actions.ClientSetter
fleetAcker *fleet.Acker
actionAcker acker.Acker
retrier *retrier.Retrier
ch chan coordinator.ConfigChange
errCh chan error
}
func newManagedConfigManager(
ctx context.Context,
log *logger.Logger,
agentInfo info.Agent,
cfg *configuration.Configuration,
storeSaver storage.Store,
runtime *runtime.Manager,
fleetInitTimeout time.Duration,
topPath string,
client *remote.Client,
fleetAcker *fleet.Acker,
actionAcker acker.Acker,
retrier *retrier.Retrier,
stateStore *store.StateStore,
clientSetters ...actions.ClientSetter,
) (*managedConfigManager, error) {
actionQueue, err := queue.NewActionQueue(stateStore.Queue(), stateStore)
if err != nil {
return nil, fmt.Errorf("unable to initialize action queue: %w", err)
}
actionDispatcher, err := dispatcher.New(log, topPath, handlers.NewDefault(log), actionQueue)
if err != nil {
return nil, fmt.Errorf("unable to initialize action dispatcher: %w", err)
}
return &managedConfigManager{
log: log,
agentInfo: agentInfo,
cfg: cfg,
client: client,
store: storeSaver,
stateStore: stateStore,
actionQueue: actionQueue,
dispatcher: actionDispatcher,
runtime: runtime,
fleetInitTimeout: fleetInitTimeout,
ch: make(chan coordinator.ConfigChange),
errCh: make(chan error),
initialClientSetters: clientSetters,
fleetAcker: fleetAcker,
actionAcker: actionAcker,
retrier: retrier,
}, nil
}
func (m *managedConfigManager) Run(ctx context.Context) error {
// Check setup correctly in application (the actionDispatcher and coord must be set manually)
if m.coord == nil {
return errors.New("coord must be set before calling Run")
}
// Un-enrolled so we will not do anything.
if m.wasUnenrolled() {
m.log.Warnf("Elastic Agent was previously unenrolled. To reactivate please reconfigure or enroll again.")
return nil
}
// Reload ID because of win7 sync issue
if err := m.agentInfo.ReloadID(ctx); err != nil {
return err
}
// Create context that is cancelled on unenroll.
gatewayCtx, gatewayCancel := context.WithCancel(ctx)
defer gatewayCancel()
// Initialize the actionDispatcher.
policyChanger := m.initDispatcher(gatewayCancel)
// Create ackers to enqueue/retry failed acks
if err := m.coord.AckUpgrade(ctx, m.actionAcker); err != nil {
m.log.Warnf("Failed to ack upgrade: %v", err)
}
// Run the retrier.
retrierRun := make(chan bool)
retrierCtx, retrierCancel := context.WithCancel(ctx)
defer func() {
retrierCancel()
<-retrierRun
}()
go func() {
m.retrier.Run(retrierCtx)
close(retrierRun)
}()
action := m.stateStore.Action()
stateRestored := false
if action != nil && !m.wasUnenrolled() {
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
m.log.Info("restoring current policy from disk")
m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, m.actionAcker, action)
stateRestored = true
}
// In the case this Elastic Agent is running a Fleet Server; we need to ensure that
// the Fleet Server is running before the Fleet gateway is started.
if m.cfg.Fleet.Server != nil {
if stateRestored {
err := m.waitForFleetServer(ctx)
if err != nil {
return fmt.Errorf("failed to initialize Fleet Server: %w", err)
}
} else {
err := m.initFleetServer(ctx, m.cfg.Fleet.Server)
if err != nil {
return fmt.Errorf("failed to initialize Fleet Server: %w", err)
}
}
}
gateway, err := fleetgateway.New(
m.log,
m.agentInfo,
m.client,
m.actionAcker,
m.coord.State,
m.stateStore,
)
if err != nil {
return err
}
// Not running a Fleet Server so the gateway and acker can be changed based on the configuration change.
if m.cfg.Fleet.Server == nil {
policyChanger.AddSetter(gateway)
policyChanger.AddSetter(m.fleetAcker)
for _, cs := range m.initialClientSetters {
policyChanger.AddSetter(cs)
}
} else {
// locally managed fleet server
// init with local address
for _, cs := range m.initialClientSetters {
cs.SetClient(m.client)
}
}
// Proxy errors from the gateway to our own channel.
gatewayErrorsRunner := runner.Start(context.Background(), func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case err := <-gateway.Errors():
m.errCh <- err
}
}
})
// Run the gateway.
gatewayRunner := runner.Start(gatewayCtx, func(ctx context.Context) error {
defer gatewayErrorsRunner.Stop()
return gateway.Run(ctx)
})
go runDispatcher(ctx, m.dispatcher, gateway, m.coord.SetUpgradeDetails, m.actionAcker, dispatchFlushInterval)
<-ctx.Done()
return gatewayRunner.Err()
}
// runDispatcher passes actions collected from gateway to dispatcher or calls Dispatch with no actions every flushInterval.
func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway coordinator.FleetGateway, detailsSetter details.Observer, actionAcker acker.Acker, flushInterval time.Duration) {
t := time.NewTimer(flushInterval)
for {
select {
case <-ctx.Done():
return
case <-t.C: // periodically call the dispatcher to handle scheduled actions.
actionDispatcher.Dispatch(ctx, detailsSetter, actionAcker)
t.Reset(flushInterval)
case actions := <-fleetGateway.Actions():
actionDispatcher.Dispatch(ctx, detailsSetter, actionAcker, actions...)
t.Reset(flushInterval)
}
}
}
// ActionErrors returns the error channel for actions.
// May return errors for fleet managed errors.
func (m *managedConfigManager) ActionErrors() <-chan error {
return m.dispatcher.Errors()
}
func (m *managedConfigManager) Errors() <-chan error {
return m.errCh
}
func (m *managedConfigManager) Watch() <-chan coordinator.ConfigChange {
return m.ch
}
func (m *managedConfigManager) wasUnenrolled() bool {
return m.stateStore.Action() != nil &&
m.stateStore.Action().Type() == fleetapi.ActionTypeUnenroll
}
func (m *managedConfigManager) initFleetServer(ctx context.Context, cfg *configuration.FleetServerConfig) error {
if m.fleetInitTimeout == 0 {
m.fleetInitTimeout = 30 * time.Second
}
ctx, cancel := context.WithTimeout(ctx, m.fleetInitTimeout)
defer cancel()
m.log.Debugf("injecting basic fleet-server for first start, will wait %s", m.fleetInitTimeout)
select {
case <-ctx.Done():
return fmt.Errorf("timeout while waiting for fleet server start: %w", ctx.Err())
case m.ch <- &localConfigChange{injectFleetServerInput}:
}
return m.waitForFleetServer(ctx)
}
func (m *managedConfigManager) waitForFleetServer(ctx context.Context) error {
m.log.Debugf("watching Fleet Server component state")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sub := m.runtime.SubscribeAll(ctx)
for {
select {
case <-ctx.Done():
return ctx.Err()
case compState := <-sub.Ch():
if compState.Component.InputSpec != nil && compState.Component.InputSpec.InputType == "fleet-server" {
if fleetServerRunning(compState.State) {
m.log.With("state", compState.State).Debugf("Fleet Server is running")
return nil
}
m.log.With("state", compState.State).Debugf("Fleet Server is not running")
}
}
}
}
func fleetServerRunning(state runtime.ComponentState) bool {
if state.State == client.UnitStateHealthy {
if len(state.Units) == 0 {
return false
}
for _, unit := range state.Units {
if unit.State != client.UnitStateHealthy {
return false
}
}
return true
}
return false
}
func (m *managedConfigManager) initDispatcher(canceller context.CancelFunc) *handlers.PolicyChangeHandler {
settingsHandler := handlers.NewSettings(
m.log,
m.agentInfo,
m.coord,
)
policyChanger := handlers.NewPolicyChangeHandler(
m.log,
m.agentInfo,
m.cfg,
m.store,
m.ch,
settingsHandler,
m.coord,
)
m.dispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
policyChanger,
)
m.dispatcher.MustRegister(
&fleetapi.ActionPolicyReassign{},
handlers.NewPolicyReassign(m.log),
)
m.dispatcher.MustRegister(
&fleetapi.ActionUnenroll{},
handlers.NewUnenroll(
m.log,
m.coord,
m.ch,
[]context.CancelFunc{canceller},
m.stateStore,
),
)
m.dispatcher.MustRegister(
&fleetapi.ActionUpgrade{},
handlers.NewUpgrade(m.log, m.coord),
)
m.dispatcher.MustRegister(
&fleetapi.ActionSettings{},
settingsHandler,
)
m.dispatcher.MustRegister(
&fleetapi.ActionCancel{},
handlers.NewCancel(
m.log,
m.actionQueue,
),
)
m.dispatcher.MustRegister(
&fleetapi.ActionDiagnostics{},
handlers.NewDiagnostics(
m.log,
paths.Top(), // TODO: stop using global state
m.coord,
m.cfg.Settings.MonitoringConfig.Diagnostics.Limit,
uploader.New(m.agentInfo.AgentID(), m.client, m.cfg.Settings.MonitoringConfig.Diagnostics.Uploader),
),
)
m.dispatcher.MustRegister(
&fleetapi.ActionApp{},
handlers.NewAppAction(m.log, m.coord, m.agentInfo.AgentID()),
)
m.dispatcher.MustRegister(
&fleetapi.ActionUnknown{},
handlers.NewUnknown(m.log),
)
return policyChanger
}