internal/pkg/agent/application/application.go (245 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package application
import (
"context"
"fmt"
"time"
"go.elastic.co/apm/v2"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
stateStore "github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/composable/providers/kubernetes"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/fleet"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/features"
"github.com/elastic/elastic-agent/pkg/limits"
"github.com/elastic/elastic-agent/version"
)
// CfgOverrider allows for application driven overrides of configuration read from disk.
type CfgOverrider func(cfg *configuration.Configuration)
// New creates a new Agent and bootstrap the required subsystem.
func New(
ctx context.Context,
log *logger.Logger,
baseLogger *logger.Logger,
logLevel logp.Level,
agentInfo info.Agent,
reexec coordinator.ReExecManager,
tracer *apm.Tracer,
testingMode bool,
fleetInitTimeout time.Duration,
disableMonitoring bool,
override CfgOverrider,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {
err := version.InitVersionError()
if err != nil {
// non-fatal error, log a warning and move on
log.With("error.message", err).Warnf("Error initializing version information: falling back to %s", release.Version())
}
platform, err := component.LoadPlatformDetail(modifiers...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to gather system information: %w", err)
}
log.Info("Gathered system information")
specs, err := component.LoadRuntimeSpecs(paths.Components(), platform)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to detect inputs and outputs: %w", err)
}
log.With("inputs", specs.Inputs()).Info("Detected available inputs and outputs")
caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), log)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to determine capabilities: %w", err)
}
log.Info("Determined allowed capabilities")
pathConfigFile := paths.ConfigFile()
var rawConfig *config.Config
if testingMode {
// testing mode doesn't read any configuration from the disk
rawConfig, err = config.NewConfigFrom("")
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
// monitoring is always disabled in testing mode
disableMonitoring = true
} else {
log.Infof("Loading baseline config from %v", pathConfigFile)
rawConfig, err = config.LoadFile(pathConfigFile)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
}
if err := info.InjectAgentConfig(rawConfig); err != nil {
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
if override != nil {
override(cfg)
}
// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
}
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)
runtime, err := runtime.NewManager(
log,
baseLogger,
agentInfo,
tracer,
monitor,
cfg.Settings.GRPC,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
}
var configMgr coordinator.ConfigManager
var managed *managedConfigManager
var compModifiers = []coordinator.ComponentsModifier{InjectAPMConfig}
var composableManaged bool
var isManaged bool
var actionAcker acker.Acker
if testingMode {
log.Info("Elastic Agent has been started in testing mode and is managed through the control protocol")
// testing mode uses a config manager that takes configuration from over the control protocol
configMgr = newTestingModeConfigManager(log)
} else if configuration.IsStandalone(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is managed locally")
loader := config.NewLoader(log, paths.ExternalInputs())
rawCfgMap, err := rawConfig.ToMapStr()
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to transform agent configuration into a map: %w", err)
}
discover := config.Discoverer(pathConfigFile, cfg.Settings.Path, paths.ExternalInputs(),
kubernetes.GetHintsInputConfigPath(log, rawCfgMap))
if !cfg.Settings.Reload.Enabled {
log.Debug("Reloading of configuration is off")
configMgr = newOnce(log, discover, loader)
} else {
log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period)
configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader)
}
} else {
isManaged = true
var store storage.Store
store, cfg, err = mergeFleetConfig(ctx, rawConfig)
if err != nil {
return nil, nil, nil, err
}
if configuration.IsFleetServerBootstrap(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode")
compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server))
configMgr = coordinator.NewConfigPatchManager(newFleetServerBootstrapManager(log), PatchAPMConfig(log, rawConfig))
} else {
log.Info("Parsed configuration and determined agent is managed by Fleet")
composableManaged = true
compModifiers = append(compModifiers,
FleetServerComponentModifier(cfg.Fleet.Server),
InjectFleetConfigComponentModifier(cfg.Fleet, agentInfo),
EndpointSignedComponentModifier(),
EndpointTLSComponentModifier(log),
InjectProxyEndpointModifier(),
)
client, err := fleetclient.NewAuthWithConfig(log, cfg.Fleet.AccessAPIKey, cfg.Fleet.Client)
if err != nil {
return nil, nil, nil, errors.New(err,
"fail to create API client",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, cfg.Fleet.Client.Host))
}
stateStorage, err := stateStore.NewStateStoreWithMigration(ctx, log, paths.AgentActionStoreFile(), paths.AgentStateStoreFile())
if err != nil {
return nil, nil, nil, errors.New(err, fmt.Sprintf("fail to read state store '%s'", paths.AgentStateStoreFile()))
}
fleetAcker, err := fleet.NewAcker(log, agentInfo, client)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create acker: %w", err)
}
retrier := retrier.New(fleetAcker, log)
batchedAcker := lazy.NewAcker(fleetAcker, log, lazy.WithRetrier(retrier))
actionAcker = stateStore.NewStateStoreActionAcker(batchedAcker, stateStorage)
// TODO: stop using global state
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, upgrader)
if err != nil {
return nil, nil, nil, err
}
configMgr = coordinator.NewConfigPatchManager(managed, PatchAPMConfig(log, rawConfig))
}
}
varsManager, err := composable.New(log, rawConfig, composableManaged)
if err != nil {
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}
otelManager := otelmanager.NewOTelManager(log.Named("otel_manager"))
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, compModifiers...)
if managed != nil {
// the coordinator requires the config manager as well as in managed-mode the config manager requires the
// coordinator, so it must be set here once the coordinator is created
managed.coord = coord
}
// every time we change the limits we'll see the log message
limits.AddLimitsOnChangeCallback(func(new, old limits.LimitsConfig) {
log.Debugf("agent limits have changed: %+v -> %+v", old, new)
}, "application.go")
// applying the initial limits for the agent process
if err := limits.Apply(rawConfig); err != nil {
return nil, nil, nil, fmt.Errorf("could not parse and apply limits config: %w", err)
}
// It is important that feature flags from configuration are applied as late as possible. This will ensure that
// any feature flag change callbacks are registered before they get called by `features.Apply`.
if err := features.Apply(rawConfig); err != nil {
return nil, nil, nil, fmt.Errorf("could not parse and apply feature flags config: %w", err)
}
return coord, configMgr, varsManager, nil
}
func mergeFleetConfig(ctx context.Context, rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) {
path := paths.AgentConfigFile()
store, err := storage.NewEncryptedDiskStore(ctx, path)
if err != nil {
return nil, nil, fmt.Errorf("error instantiating encrypted disk store: %w", err)
}
reader, err := store.Load()
if err != nil {
return store, nil, errors.New(err, "could not initialize config store",
errors.TypeFilesystem,
errors.M(errors.MetaKeyPath, path))
}
config, err := config.NewConfigFrom(reader)
if err != nil {
return store, nil, errors.New(err,
fmt.Sprintf("fail to read configuration %s for the elastic-agent", path),
errors.TypeFilesystem,
errors.M(errors.MetaKeyPath, path))
}
// merge local configuration and configuration persisted from fleet.
err = rawConfig.Merge(config)
if err != nil {
return store, nil, errors.New(err,
fmt.Sprintf("fail to merge configuration with %s for the elastic-agent", path),
errors.TypeConfig,
errors.M(errors.MetaKeyPath, path))
}
cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
return store, nil, errors.New(err,
fmt.Sprintf("fail to unpack configuration from %s", path),
errors.TypeFilesystem,
errors.M(errors.MetaKeyPath, path))
}
// Fix up fleet.agent.id otherwise the fleet.agent.id is empty string
if cfg.Settings != nil && cfg.Fleet != nil && cfg.Fleet.Info != nil && cfg.Fleet.Info.ID == "" {
cfg.Fleet.Info.ID = cfg.Settings.ID
}
if err := cfg.Fleet.Valid(); err != nil {
return store, nil, errors.New(err,
"fleet configuration is invalid",
errors.TypeFilesystem,
errors.M(errors.MetaKeyPath, path))
}
return store, cfg, nil
}