internal/pkg/agent/cmd/enroll_cmd.go (982 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package cmd import ( "bytes" "context" "encoding/base64" "fmt" "io" "math/rand/v2" "net" "os" "os/exec" "strconv" "strings" "time" "go.elastic.co/apm/v2" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/secret" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/perms" "github.com/elastic/elastic-agent/internal/pkg/agent/vault" "github.com/elastic/elastic-agent/internal/pkg/cli" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/core/authority" "github.com/elastic/elastic-agent/internal/pkg/core/backoff" monitoringConfig "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/internal/pkg/crypto" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/internal/pkg/remote" "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/control/v2/client/wait" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/core/process" "github.com/elastic/elastic-agent/pkg/utils" ) const ( maxRetriesstoreAgentInfo = 5 waitingForAgent = "Waiting for Elastic Agent to start" waitingForFleetServer = "Waiting for Elastic Agent to start Fleet Server" defaultFleetServerHost = "0.0.0.0" defaultFleetServerPort = 8220 defaultFleetServerInternalHost = "localhost" defaultFleetServerInternalPort = 8221 enrollBackoffInit = time.Second * 5 enrollBackoffMax = time.Minute * 10 ) var ( enrollDelay = 1 * time.Second // max delay to start enrollment daemonTimeout = 30 * time.Second // max amount of for communication to running Agent daemon ) type saver interface { Save(io.Reader) error } // enrollCmd is an enroll subcommand that interacts between the Kibana API and the Agent. type enrollCmd struct { log *logger.Logger options *enrollCmdOption client fleetclient.Sender configStore saver remoteConfig remote.Config agentProc *process.Info configPath string backoffFactory func(done <-chan struct{}) backoff.Backoff // For testability daemonReloadFunc func(context.Context) error } // enrollCmdFleetServerOption define all the supported enrollment options for bootstrapping with Fleet Server. type enrollCmdFleetServerOption struct { ConnStr string ElasticsearchCA string ElasticsearchCASHA256 string ElasticsearchInsecure bool ElasticsearchCert string ElasticsearchCertKey string ServiceToken string ServiceTokenPath string PolicyID string Host string Port uint16 InternalPort uint16 Cert string CertKey string CertKeyPassphrasePath string ClientAuth string Insecure bool SpawnAgent bool Headers map[string]string Timeout time.Duration } // enrollCmdOption define all the supported enrollment option. type enrollCmdOption struct { URL string `yaml:"url,omitempty"` InternalURL string `yaml:"-"` CAs []string `yaml:"ca,omitempty"` CASha256 []string `yaml:"ca_sha256,omitempty"` Certificate string `yaml:"certificate,omitempty"` Key string `yaml:"key,omitempty"` KeyPassphrasePath string `yaml:"key_passphrase_path,omitempty"` Insecure bool `yaml:"insecure,omitempty"` ID string `yaml:"id,omitempty"` ReplaceToken string `yaml:"replace_token,omitempty"` EnrollAPIKey string `yaml:"enrollment_key,omitempty"` Staging string `yaml:"staging,omitempty"` ProxyURL string `yaml:"proxy_url,omitempty"` ProxyDisabled bool `yaml:"proxy_disabled,omitempty"` ProxyHeaders map[string]string `yaml:"proxy_headers,omitempty"` DaemonTimeout time.Duration `yaml:"daemon_timeout,omitempty"` UserProvidedMetadata map[string]interface{} `yaml:"-"` FixPermissions *utils.FileOwner `yaml:"-"` DelayEnroll bool `yaml:"-"` FleetServer enrollCmdFleetServerOption `yaml:"-"` SkipCreateSecret bool `yaml:"-"` SkipDaemonRestart bool `yaml:"-"` Tags []string `yaml:"omitempty"` } // remoteConfig returns the configuration used to connect the agent to a fleet process. func (e *enrollCmdOption) remoteConfig() (remote.Config, error) { cfg, err := remote.NewConfigFromURL(e.URL) if err != nil { return remote.Config{}, err } if cfg.Protocol == remote.ProtocolHTTP && !e.Insecure { return remote.Config{}, fmt.Errorf("connection to fleet-server is insecure, strongly recommended to use a secure connection (override with --insecure)") } var tlsCfg tlscommon.Config // Add any SSL options from the CLI. if len(e.CAs) > 0 || len(e.CASha256) > 0 { tlsCfg.CAs = e.CAs tlsCfg.CASha256 = e.CASha256 } if e.Insecure { tlsCfg.VerificationMode = tlscommon.VerifyNone } if e.Certificate != "" || e.Key != "" { tlsCfg.Certificate = tlscommon.CertificateConfig{ Certificate: e.Certificate, Key: e.Key, PassphrasePath: e.KeyPassphrasePath, } } cfg.Transport.TLS = &tlsCfg proxySettings, err := httpcommon.NewHTTPClientProxySettings(e.ProxyURL, e.ProxyHeaders, e.ProxyDisabled) if err != nil { return remote.Config{}, err } cfg.Transport.Proxy = *proxySettings return cfg, nil } // newEnrollCmd creates a new enrollment with the given store. func newEnrollCmd( log *logger.Logger, options *enrollCmdOption, configPath string, store saver, backoffFactory func(done <-chan struct{}) backoff.Backoff, ) (*enrollCmd, error) { if backoffFactory == nil { backoffFactory = func(done <-chan struct{}) backoff.Backoff { return backoff.NewEqualJitterBackoff(done, enrollBackoffInit, enrollBackoffMax) } } return &enrollCmd{ log: log, options: options, configStore: store, configPath: configPath, daemonReloadFunc: daemonReload, backoffFactory: backoffFactory, }, nil } // Execute enrolls the agent into Fleet. func (c *enrollCmd) Execute(ctx context.Context, streams *cli.IOStreams) error { var err error defer c.stopAgent() // ensure its stopped no matter what span, ctx := apm.StartSpan(ctx, "enroll", "app.internal") defer func() { apm.CaptureError(ctx, err).Send() span.End() }() hasRoot, err := utils.HasRoot() if err != nil { return fmt.Errorf("checking if running with root/Administrator privileges: %w", err) } // Create encryption key from the agent before touching configuration if !c.options.SkipCreateSecret { opts := []vault.OptionFunc{vault.WithUnprivileged(!hasRoot)} if c.options.FixPermissions != nil { opts = append(opts, vault.WithVaultOwnership(*c.options.FixPermissions)) } err = secret.CreateAgentSecret(ctx, opts...) if err != nil { return err } } persistentConfig, err := getPersistentConfig(c.configPath) if err != nil { return err } // localFleetServer indicates that we start our internal fleet server. Agent // will communicate to the internal fleet server on localhost only. // Connection setup should disable proxies in that case. localFleetServer := c.options.FleetServer.ConnStr != "" if localFleetServer && !c.options.DelayEnroll { token, err := c.fleetServerBootstrap(ctx, persistentConfig) if err != nil { return err } if c.options.EnrollAPIKey == "" && token != "" { c.options.EnrollAPIKey = token } } c.remoteConfig, err = c.options.remoteConfig() if err != nil { return errors.New( err, "Error", errors.TypeConfig, errors.M(errors.MetaKeyURI, c.options.URL)) } if localFleetServer { // Ensure that the agent does not use a proxy configuration // when connecting to the local fleet server. // Note that when running fleet-server the enroll request will be sent to :8220, // however when the agent is running afterward requests will be sent to :8221 c.remoteConfig.Transport.Proxy.Disable = true } c.client, err = fleetclient.NewWithConfig(c.log, c.remoteConfig) if err != nil { return errors.New( err, "Error", errors.TypeNetwork, errors.M(errors.MetaKeyURI, c.options.URL)) } if c.options.DelayEnroll { if c.options.FleetServer.Host != "" { return errors.New("--delay-enroll cannot be used with --fleet-server-es", errors.TypeConfig) } err = c.writeDelayEnroll(streams) if err != nil { // context for error already provided in writeDelayEnroll return err } if c.options.FixPermissions != nil { err = perms.FixPermissions(paths.Top(), perms.WithOwnership(*c.options.FixPermissions)) if err != nil { return errors.New(err, "failed to fix permissions") } } return nil } err = c.enrollWithBackoff(ctx, persistentConfig) if err != nil { return fmt.Errorf("fail to enroll: %w", err) } if c.options.FixPermissions != nil { err = perms.FixPermissions(paths.Top(), perms.WithOwnership(*c.options.FixPermissions)) if err != nil { return errors.New(err, "failed to fix permissions") } } defer func() { if err != nil { fmt.Fprintf(streams.Err, "Something went wrong while enrolling the Elastic Agent: %v\n", err) } else { fmt.Fprintln(streams.Out, "Successfully enrolled the Elastic Agent.") } }() if c.agentProc == nil && !c.options.SkipDaemonRestart { if err = c.daemonReloadWithBackoff(ctx); err != nil { c.log.Errorf("Elastic Agent might not be running; unable to trigger restart: %v", err) return fmt.Errorf("could not reload agent daemon, unable to trigger restart: %w", err) } c.log.Info("Successfully triggered restart on running Elastic Agent.") return nil } c.log.Info("Elastic Agent has been enrolled; start Elastic Agent") return nil } func (c *enrollCmd) writeDelayEnroll(streams *cli.IOStreams) error { enrollPath := paths.AgentEnrollFile() data, err := yaml.Marshal(c.options) if err != nil { return errors.New( err, "failed to marshall enrollment options", errors.TypeConfig, errors.M("path", enrollPath)) } err = os.WriteFile(enrollPath, data, 0600) if err != nil { return errors.New( err, "failed to write enrollment options file", errors.TypeFilesystem, errors.M("path", enrollPath)) } fmt.Fprintf(streams.Out, "Successfully wrote %s for delayed enrollment of the Elastic Agent.\n", enrollPath) return nil } func (c *enrollCmd) fleetServerBootstrap(ctx context.Context, persistentConfig map[string]interface{}) (string, error) { c.log.Debug("verifying communication with running Elastic Agent daemon") agentRunning := true if c.options.FleetServer.InternalPort == 0 { c.options.FleetServer.InternalPort = defaultFleetServerInternalPort } _, err := getDaemonState(ctx) if err != nil { if !c.options.FleetServer.SpawnAgent { // wait longer to try and communicate with the Elastic Agent err = wait.ForAgent(ctx, c.options.DaemonTimeout) if err != nil { return "", errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?") } } else { agentRunning = false } } err = c.prepareFleetTLS() if err != nil { return "", err } agentConfig := c.createAgentConfig("", persistentConfig, c.options.FleetServer.Headers) //nolint:dupl // duplicate because same params are passed fleetConfig, err := createFleetServerBootstrapConfig( c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken, c.options.FleetServer.ServiceTokenPath, c.options.FleetServer.PolicyID, c.options.FleetServer.Host, c.options.FleetServer.Port, c.options.FleetServer.InternalPort, c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.CertKeyPassphrasePath, c.options.FleetServer.ElasticsearchCA, c.options.FleetServer.ElasticsearchCASHA256, c.options.CAs, c.options.FleetServer.ClientAuth, c.options.FleetServer.ElasticsearchCert, c.options.FleetServer.ElasticsearchCertKey, c.options.FleetServer.Headers, c.options.ProxyURL, c.options.ProxyDisabled, c.options.ProxyHeaders, c.options.FleetServer.ElasticsearchInsecure, ) if err != nil { return "", err } c.options.FleetServer.InternalPort = fleetConfig.Server.InternalPort configToStore := map[string]interface{}{ "agent": agentConfig, "fleet": fleetConfig, } reader, err := yamlToReader(configToStore) if err != nil { return "", err } if err := safelyStoreAgentInfo(c.configStore, reader); err != nil { return "", err } var agentSubproc <-chan *os.ProcessState if agentRunning { // reload the already running agent err = c.daemonReloadWithBackoff(ctx) if err != nil { return "", errors.New(err, "failed to trigger elastic-agent daemon reload", errors.TypeApplication) } } else { // spawn `run` as a subprocess so enroll can perform the bootstrap process of Fleet Server agentSubproc, err = c.startAgent(ctx) if err != nil { return "", err } } token, err := waitForFleetServer(ctx, agentSubproc, c.log, c.options.FleetServer.Timeout) if err != nil { return "", errors.New(err, "fleet-server failed", errors.TypeApplication) } return token, nil } func (c *enrollCmd) prepareFleetTLS() error { host := c.options.FleetServer.Host if host == "" { host = defaultFleetServerInternalHost } port := c.options.FleetServer.Port if port == 0 { port = defaultFleetServerPort } if c.options.FleetServer.Cert != "" && c.options.FleetServer.CertKey == "" { return errors.New("certificate private key is required when certificate provided") } if c.options.FleetServer.CertKey != "" && c.options.FleetServer.Cert == "" { return errors.New("certificate is required when certificate private key is provided") } if c.options.FleetServer.Cert == "" && c.options.FleetServer.CertKey == "" { if c.options.FleetServer.Insecure { // running insecure, force the binding to localhost (unless specified) if c.options.FleetServer.Host == "" { c.options.FleetServer.Host = defaultFleetServerInternalHost } c.options.URL = "http://" + net.JoinHostPort(host, strconv.Itoa(int(port))) c.options.Insecure = true return nil } c.log.Info("Generating self-signed certificate for Fleet Server") hostname, err := os.Hostname() if err != nil { return err } ca, err := authority.NewCA() if err != nil { return err } pair, err := ca.GeneratePairWithName(hostname) if err != nil { return err } c.options.FleetServer.Cert = string(pair.Crt) c.options.FleetServer.CertKey = string(pair.Key) c.options.URL = "https://" + net.JoinHostPort(hostname, strconv.Itoa(int(port))) c.options.CAs = []string{string(ca.Crt())} } // running with custom Cert and CertKey; URL is required to be set if c.options.URL == "" { return errors.New("url is required when a certificate is provided") } if c.options.FleetServer.InternalPort > 0 { if c.options.FleetServer.InternalPort != defaultFleetServerInternalPort { c.log.Warnf("Internal endpoint configured to: %d. Changing this value is not supported.", c.options.FleetServer.InternalPort) } c.options.InternalURL = net.JoinHostPort(defaultFleetServerInternalHost, strconv.Itoa(int(c.options.FleetServer.InternalPort))) } return nil } const ( daemonReloadInitBackoff = time.Second daemonReloadMaxBackoff = time.Minute daemonReloadRetries = 5 ) func (c *enrollCmd) daemonReloadWithBackoff(ctx context.Context) error { backExp := backoff.NewExpBackoff(ctx.Done(), daemonReloadInitBackoff, daemonReloadMaxBackoff) var lastErr error for i := 0; i < daemonReloadRetries; i++ { attempt := i c.log.Infof("Restarting agent daemon, attempt %d", attempt) err := c.daemonReloadFunc(ctx) if err == nil { return nil } // If the context was cancelled, return early if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { return fmt.Errorf("could not reload daemon after %d retries: %w", attempt, err) } lastErr = err c.log.Errorf("Restart attempt %d failed: '%s'. Waiting for %s", attempt, err, backExp.NextWait().String()) // backoff Wait returns false if context.Done() if !backExp.Wait() { return ctx.Err() } } return fmt.Errorf("could not reload agent's daemon, all retries failed. Last error: %w", lastErr) } func daemonReload(ctx context.Context) error { daemon := client.New() err := daemon.Connect(ctx) if err != nil { return err } defer daemon.Disconnect() return daemon.Restart(ctx) } func (c *enrollCmd) enrollWithBackoff(ctx context.Context, persistentConfig map[string]interface{}) error { delay(ctx, enrollDelay) c.log.Infof("Starting enrollment to URL: %s", c.client.URI()) err := c.enroll(ctx, persistentConfig) if err == nil { return nil } c.log.Infof("1st enrollment attempt failed, retrying enrolling to URL: %s with exponential backoff (init %s, max %s)", c.client.URI(), enrollBackoffInit, enrollBackoffMax) signal := make(chan struct{}) defer close(signal) backExp := c.backoffFactory(signal) RETRYLOOP: for { switch { case errors.Is(err, fleetapi.ErrTooManyRequests): c.log.Warn("Too many requests on the remote server, will retry in a moment.") case errors.Is(err, fleetapi.ErrConnRefused): c.log.Warn("Remote server is not ready to accept connections(Connection Refused), will retry in a moment.") case errors.Is(err, fleetapi.ErrTemporaryServerError): c.log.Warnf("Remote server failed to handle the request(%s), will retry in a moment.", err.Error()) case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), err == nil: break RETRYLOOP case err != nil: c.log.Warnf("Error detected: %s, will retry in a moment.", err.Error()) } if !backExp.Wait() { break RETRYLOOP } c.log.Infof("Retrying enrollment to URL: %s", c.client.URI()) err = c.enroll(ctx, persistentConfig) } return err } func (c *enrollCmd) enroll(ctx context.Context, persistentConfig map[string]interface{}) error { cmd := fleetapi.NewEnrollCmd(c.client) metadata, err := info.Metadata(ctx, c.log) if err != nil { return errors.New(err, "acquiring metadata failed") } // Automatically add the namespace as a tag when installed into a namepsace. // Ensures the development agent is differentiated from others when on the same host. if namespace := paths.InstallNamespace(); namespace != "" { c.options.Tags = append(c.options.Tags, namespace) } r := &fleetapi.EnrollRequest{ EnrollAPIKey: c.options.EnrollAPIKey, Type: fleetapi.PermanentEnroll, ID: c.options.ID, ReplaceToken: c.options.ReplaceToken, Metadata: fleetapi.Metadata{ Local: metadata, UserProvided: c.options.UserProvidedMetadata, Tags: cleanTags(c.options.Tags), }, } resp, err := cmd.Execute(ctx, r) if err != nil { return fmt.Errorf("failed to execute request to fleet-server: %w", err) } fleetConfig, err := createFleetConfigFromEnroll(resp.Item.AccessAPIKey, c.options.EnrollAPIKey, c.options.ReplaceToken, c.remoteConfig) if err != nil { return err } agentConfig := c.createAgentConfig(resp.Item.ID, persistentConfig, c.options.FleetServer.Headers) localFleetServer := c.options.FleetServer.ConnStr != "" if localFleetServer { //nolint:dupl // not duplicates, just similar params are passed serverConfig, err := createFleetServerBootstrapConfig( c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken, c.options.FleetServer.ServiceTokenPath, c.options.FleetServer.PolicyID, c.options.FleetServer.Host, c.options.FleetServer.Port, c.options.FleetServer.InternalPort, c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.CertKeyPassphrasePath, c.options.FleetServer.ElasticsearchCA, c.options.FleetServer.ElasticsearchCASHA256, c.options.CAs, c.options.FleetServer.ClientAuth, c.options.FleetServer.ElasticsearchCert, c.options.FleetServer.ElasticsearchCertKey, c.options.FleetServer.Headers, c.options.ProxyURL, c.options.ProxyDisabled, c.options.ProxyHeaders, c.options.FleetServer.ElasticsearchInsecure, ) if err != nil { return fmt.Errorf( "failed creating fleet-server bootstrap config: %w", err) } // no longer need bootstrap at this point serverConfig.Server.Bootstrap = false fleetConfig.Server = serverConfig.Server // use internal URL for future requests if c.options.InternalURL != "" { fleetConfig.Client.Host = c.options.InternalURL // fleet-server will bind the internal listenter to localhost:8221 // InternalURL is localhost:8221, however cert uses $HOSTNAME, so we need to disable hostname verification. fleetConfig.Client.Transport.TLS.VerificationMode = tlscommon.VerifyCertificate } } configToStore := map[string]interface{}{ "fleet": fleetConfig, "agent": agentConfig, } reader, err := yamlToReader(configToStore) if err != nil { return fmt.Errorf("yamlToReader failed: %w", err) } if err := safelyStoreAgentInfo(c.configStore, reader); err != nil { return fmt.Errorf("failed to store agent config: %w", err) } // clear action store // fail only if file exists and there was a failure if err := os.Remove(paths.AgentActionStoreFile()); !os.IsNotExist(err) { return err } // clear action store // fail only if file exists and there was a failure if err := os.Remove(paths.AgentStateStoreFile()); !os.IsNotExist(err) { return err } return nil } func (c *enrollCmd) startAgent(ctx context.Context) (<-chan *os.ProcessState, error) { cmd, err := os.Executable() if err != nil { return nil, err } c.log.Info("Spawning Elastic Agent daemon as a subprocess to complete bootstrap process.") args := []string{ "run", "-e", "-c", paths.ConfigFile(), "--path.home", paths.Top(), "--path.config", paths.Config(), "--path.logs", paths.Logs(), "--path.socket", paths.ControlSocket(), } if paths.Downloads() != "" { args = append(args, "--path.downloads", paths.Downloads()) } if !paths.IsVersionHome() { args = append(args, "--path.home.unversioned") } proc, err := process.Start( cmd, process.WithContext(ctx), process.WithArgs(args), process.WithCmdOptions(func(c *exec.Cmd) error { c.Stdout = os.Stdout c.Stderr = os.Stderr return nil })) if err != nil { return nil, err } resChan := make(chan *os.ProcessState) go func() { procState, _ := proc.Process.Wait() resChan <- procState }() c.agentProc = proc return resChan, nil } func (c *enrollCmd) stopAgent() { if c.agentProc != nil { _ = c.agentProc.StopWait() c.agentProc = nil } } func yamlToReader(in interface{}) (io.Reader, error) { data, err := yaml.Marshal(in) if err != nil { return nil, errors.New(err, "could not marshal to YAML") } return bytes.NewReader(data), nil } func delay(ctx context.Context, d time.Duration) { t := time.NewTimer(rand.N(d)) defer t.Stop() select { case <-ctx.Done(): case <-t.C: } } func getDaemonState(ctx context.Context) (*client.AgentState, error) { ctx, cancel := context.WithTimeout(ctx, daemonTimeout) defer cancel() daemon := client.New() err := daemon.Connect(ctx) if err != nil { return nil, err } defer daemon.Disconnect() return daemon.State(ctx) } type waitResult struct { enrollmentToken string err error } func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessState, log *logger.Logger, timeout time.Duration) (string, error) { if timeout == 0 { timeout = 2 * time.Minute } if timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } maxBackoff := timeout if maxBackoff <= 0 { // indefinite timeout maxBackoff = 10 * time.Minute } resChan := make(chan waitResult) innerCtx, innerCancel := context.WithCancel(context.Background()) defer innerCancel() go func() { msg := "" msgCount := 0 backExp := expBackoffWithContext(innerCtx, 1*time.Second, maxBackoff) for { // if the timeout is reached, no response was sent on `res`, therefore // send an error if !backExp.Wait() { resChan <- waitResult{err: fmt.Errorf( "timed out waiting for Fleet Server to start after %s", timeout)} } state, err := getDaemonState(innerCtx) if errors.Is(err, context.Canceled) { resChan <- waitResult{err: err} return } if err != nil { log.Debugf("%s: %s", waitingForAgent, err) if msg != waitingForAgent { msg = waitingForAgent msgCount = 0 log.Info(waitingForAgent) } else { msgCount++ if msgCount > 5 { msgCount = 0 log.Infof("%s: %s", waitingForAgent, err) } } continue } unit := getCompUnitFromStatus(state, "fleet-server") if unit == nil { err = errors.New("no fleet-server application running") log.Debugf("%s: %s", waitingForFleetServer, err) if msg != waitingForFleetServer { msg = waitingForFleetServer msgCount = 0 log.Info(waitingForFleetServer) } else { msgCount++ if msgCount > 5 { msgCount = 0 log.Infof("%s: %s", waitingForFleetServer, err) } } continue } log.Debugf("%s: %s - %s", waitingForFleetServer, unit.State, unit.Message) if unit.State == client.Degraded || unit.State == client.Healthy { // app has started and is running if unit.Message != "" { log.Infof("Fleet Server - %s", unit.Message) } // extract the enrollment token from the status payload token := "" if unit.Payload != nil { if enrollToken, ok := unit.Payload["enrollment_token"]; ok { if tokenStr, ok := enrollToken.(string); ok { token = tokenStr } } } resChan <- waitResult{enrollmentToken: token} break } if unit.Message != "" { appMsg := fmt.Sprintf("Fleet Server - %s", unit.Message) if msg != appMsg { msg = appMsg msgCount = 0 log.Info(appMsg) } else { msgCount++ if msgCount > 5 { msgCount = 0 log.Info(appMsg) } } } } }() var res waitResult if agentSubproc == nil { select { case <-ctx.Done(): innerCancel() res = <-resChan case res = <-resChan: } } else { select { case ps := <-agentSubproc: res = waitResult{err: fmt.Errorf("spawned Elastic Agent exited unexpectedly: %s", ps)} case <-ctx.Done(): innerCancel() res = <-resChan case res = <-resChan: } } if res.err != nil { return "", res.err } return res.enrollmentToken, nil } func getCompUnitFromStatus(state *client.AgentState, name string) *client.ComponentUnitState { for _, comp := range state.Components { if comp.Name == name { for _, unit := range comp.Units { if unit.UnitType == client.UnitTypeInput { return &unit } } } } return nil } func safelyStoreAgentInfo(s saver, reader io.Reader) error { var err error signal := make(chan struct{}) backExp := backoff.NewExpBackoff(signal, 100*time.Millisecond, 3*time.Second) for i := 0; i <= maxRetriesstoreAgentInfo; i++ { backExp.Wait() err = storeAgentInfo(s, reader) if !errors.Is(err, filelock.ErrAppAlreadyRunning) { break } } close(signal) return err } func storeAgentInfo(s saver, reader io.Reader) error { fileLock := paths.AgentConfigFileLock() if err := fileLock.TryLock(); err != nil { return err } defer func() { _ = fileLock.Unlock() }() if err := s.Save(reader); err != nil { return errors.New(err, "could not save enrollment information", errors.TypeFilesystem) } return nil } func createFleetServerBootstrapConfig( connStr, serviceToken, serviceTokenPath, policyID, host string, port uint16, internalPort uint16, cert, key, passphrasePath, esCA, esCASHA256 string, cas []string, clientAuth string, esClientCert, esClientCertKey string, headers map[string]string, proxyURL string, proxyDisabled bool, proxyHeaders map[string]string, insecure bool, ) (*configuration.FleetAgentConfig, error) { localFleetServer := connStr != "" es, err := configuration.ElasticsearchFromConnStr(connStr, serviceToken, serviceTokenPath, insecure) if err != nil { return nil, err } if esCA != "" { if es.TLS == nil { es.TLS = &tlscommon.Config{ CAs: []string{esCA}, } } else { es.TLS.CAs = []string{esCA} } } if esCASHA256 != "" { if es.TLS == nil { es.TLS = &tlscommon.Config{ CATrustedFingerprint: esCASHA256, } } else { es.TLS.CATrustedFingerprint = esCASHA256 } } if esClientCert != "" || esClientCertKey != "" { if es.TLS == nil { es.TLS = &tlscommon.Config{} } es.TLS.Certificate = tlscommon.CertificateConfig{ Certificate: esClientCert, Key: esClientCertKey, } } if host == "" { host = defaultFleetServerHost } if port == 0 { port = defaultFleetServerPort } if internalPort == 0 { internalPort = defaultFleetServerInternalPort } if len(headers) > 0 { if es.Headers == nil { es.Headers = make(map[string]string) } // overwrites previously set headers for k, v := range headers { es.Headers[k] = v } } es.ProxyURL = proxyURL es.ProxyDisable = proxyDisabled es.ProxyHeaders = proxyHeaders cfg := configuration.DefaultFleetAgentConfig() cfg.Enabled = true cfg.Server = &configuration.FleetServerConfig{ Bootstrap: true, Output: configuration.FleetServerOutputConfig{ Elasticsearch: es, }, Host: host, Port: port, } if policyID != "" { cfg.Server.Policy = &configuration.FleetServerPolicyConfig{ID: policyID} } if cert != "" || key != "" { cfg.Server.TLS = &tlscommon.ServerConfig{ Certificate: tlscommon.CertificateConfig{ Certificate: cert, Key: key, PassphrasePath: passphrasePath, }, } if insecure { cfg.Server.TLS.VerificationMode = tlscommon.VerifyNone } cfg.Server.TLS.CAs = cas var cAuth tlscommon.TLSClientAuth cfg.Server.TLS.ClientAuth = &cAuth if err := cfg.Server.TLS.ClientAuth.Unpack(clientAuth); err != nil { return nil, errors.New(err, "failed to unpack --fleet-server-client-auth", errors.TypeConfig) } } if localFleetServer { cfg.Client.Transport.Proxy.Disable = true cfg.Server.InternalPort = internalPort } if err := cfg.Valid(); err != nil { return nil, errors.New(err, "invalid enrollment options", errors.TypeConfig) } return cfg, nil } func fleetHashToken(token string) (string, error) { enrollmentHashBytes, err := crypto.GeneratePBKDF2FromPassword([]byte(token)) if err != nil { return "", err } return base64.StdEncoding.EncodeToString(enrollmentHashBytes), nil } func createFleetConfigFromEnroll(accessAPIKey string, enrollmentToken string, replaceToken string, cli remote.Config) (*configuration.FleetAgentConfig, error) { var err error cfg := configuration.DefaultFleetAgentConfig() cfg.Enabled = true cfg.AccessAPIKey = accessAPIKey cfg.Client = cli cfg.EnrollmentTokenHash, err = fleetHashToken(enrollmentToken) if err != nil { return nil, errors.New(err, "failed to generate enrollment hash", errors.TypeConfig) } cfg.ReplaceTokenHash, err = fleetHashToken(replaceToken) if err != nil { return nil, errors.New(err, "failed to generate replace token hash", errors.TypeConfig) } if err := cfg.Valid(); err != nil { return nil, errors.New(err, "invalid enrollment options", errors.TypeConfig) } return cfg, nil } func (c *enrollCmd) createAgentConfig(agentID string, pc map[string]interface{}, headers map[string]string) map[string]interface{} { agentConfig := map[string]interface{}{ "id": agentID, } if len(headers) > 0 { agentConfig["headers"] = headers } if c.options.Staging != "" { staging := fmt.Sprintf("https://staging.elastic.co/%s-%s/downloads/", release.Version(), c.options.Staging[:8]) agentConfig["download"] = map[string]interface{}{ "sourceURI": staging, } } for k, v := range pc { agentConfig[k] = v } return agentConfig } func getPersistentConfig(pathConfigFile string) (map[string]interface{}, error) { persistentMap := make(map[string]interface{}) rawConfig, err := config.LoadFile(pathConfigFile) if os.IsNotExist(err) { return persistentMap, nil } if err != nil { return nil, errors.New(err, fmt.Sprintf("could not read configuration file %s", pathConfigFile), errors.TypeFilesystem, errors.M(errors.MetaKeyPath, pathConfigFile)) } pc := &struct { Headers map[string]string `json:"agent.headers,omitempty" yaml:"agent.headers,omitempty" config:"agent.headers,omitempty"` LogLevel string `json:"agent.logging.level,omitempty" yaml:"agent.logging.level,omitempty" config:"agent.logging.level,omitempty"` MonitoringHTTP *monitoringConfig.MonitoringHTTPConfig `json:"agent.monitoring.http,omitempty" yaml:"agent.monitoring.http,omitempty" config:"agent.monitoring.http,omitempty"` }{ MonitoringHTTP: monitoringConfig.DefaultConfig().HTTP, } if err := rawConfig.UnpackTo(&pc); err != nil { return nil, err } if pc.LogLevel != "" { persistentMap["logging.level"] = pc.LogLevel } if pc.MonitoringHTTP != nil { persistentMap["monitoring.http"] = pc.MonitoringHTTP } return persistentMap, nil } func expBackoffWithContext(ctx context.Context, init, max time.Duration) backoff.Backoff { signal := make(chan struct{}) bo := backoff.NewExpBackoff(signal, init, max) go func() { <-ctx.Done() close(signal) }() return bo } func cleanTags(tags []string) []string { var r []string // Create a map to store unique elements seen := make(map[string]bool) for _, str := range tags { tag := strings.TrimSpace(str) if tag != "" { if _, ok := seen[tag]; !ok { seen[tag] = true r = append(r, tag) } } } return r }