internal/pkg/agent/cmd/container.go (1,004 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package cmd import ( "bytes" "context" "encoding/base64" "encoding/json" "fmt" "io" "io/fs" "net/url" "os" "os/exec" "path/filepath" "regexp" "slices" "strconv" "strings" "sync" "syscall" "time" "github.com/cenkalti/backoff/v4" "github.com/spf13/cobra" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/cli" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/crypto" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/remote" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/core/process" "github.com/elastic/elastic-agent/pkg/utils" "github.com/elastic/elastic-agent/version" ) const ( requestRetrySleepEnv = "KIBANA_REQUEST_RETRY_SLEEP" maxRequestRetriesEnv = "KIBANA_REQUEST_RETRY_COUNT" defaultRequestRetrySleep = "1s" // sleep 1 sec between retries for HTTP requests defaultMaxRequestRetries = "30" // maximum number of retries for HTTP requests agentBaseDirectory = "/usr/share/elastic-agent" // directory that holds all elastic-agent related files defaultStateDirectory = agentBaseDirectory + "/state" // directory that will hold the state data logsPathPerms = 0775 ) // Used to strip the appended ({uuid}) from the name of an enrollment token. This makes much easier for // a container to reference a token by name, without having to know what the generated UUID is for that name. var tokenNameStrip = regexp.MustCompile(`\s\([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\)$`) func newContainerCommand(_ []string, streams *cli.IOStreams) *cobra.Command { cmd := cobra.Command{ Hidden: true, // not exposed over help; used by container entrypoint only Use: "container", Short: "Bootstrap Elastic Agent to run inside a container", Long: `This command should only be used as an entrypoint for a container. This will prepare the Elastic Agent using environment variables to run inside of the container. The following actions are possible and grouped based on the actions. * Elastic Agent Fleet Enrollment This enrolls the Elastic Agent into a Fleet Server. It is also possible to have this create a new enrollment token for this specific Elastic Agent. FLEET_ENROLL - set to 1 for enrollment into Fleet Server. If not set, Elastic Agent is run in standalone mode. FLEET_URL - URL of the Fleet Server to enroll into FLEET_ENROLLMENT_TOKEN - token to use for enrollment. This is not needed in case FLEET_SERVER_ENABLED and FLEET_ENROLL is set. Then the token is fetched from Kibana. FLEET_ENROLL_TIMEOUT - The timeout duration for the enroll commnd. Defaults to 10m. A negative value disables the timeout. FLEET_CA - path to certificate authority to use with communicate with Fleet Server [$KIBANA_CA] FLEET_INSECURE - communicate with Fleet with either insecure HTTP or unverified HTTPS ELASTIC_AGENT_CERT - path to certificate to use for connecting to fleet-server. ELASTIC_AGENT_CERT_KEY - path to private key use for connecting to fleet-server. The following vars are need in the scenario that Elastic Agent should automatically fetch its own token. KIBANA_FLEET_HOST - Kibana host to enable create enrollment token on [$KIBANA_HOST] FLEET_TOKEN_NAME - token name to use for fetching token from Kibana. This requires Kibana configs to be set. FLEET_TOKEN_POLICY_NAME - token policy name to use for fetching token from Kibana. This requires Kibana configs to be set. * Bootstrapping Fleet Server This bootstraps the Fleet Server to be run by this Elastic Agent. At least one Fleet Server is required in a Fleet deployment for other Elastic Agents to bootstrap. In case the Elastic Agent is run without Fleet Server, these variables are not needed. If FLEET_SERVER_ENABLE and FLEET_ENROLL is set but no FLEET_ENROLLMENT_TOKEN, the token is automatically fetched from Kibana. FLEET_SERVER_ENABLE - set to 1 enables bootstrapping of Fleet Server inside Elastic Agent (forces FLEET_ENROLL enabled) FLEET_SERVER_ELASTICSEARCH_HOST - Elasticsearch host for Fleet Server to communicate with [$ELASTICSEARCH_HOST] FLEET_SERVER_ELASTICSEARCH_CA - path to certificate authority to use to communicate with Elasticsearch [$ELASTICSEARCH_CA] FLEET_SERVER_ELASTICSEARCH_CA_TRUSTED_FINGERPRINT - The sha-256 fingerprint value of the certificate authority to trust FLEET_SERVER_ELASTICSEARCH_INSECURE - disables cert validation for communication with Elasticsearch FLEET_SERVER_SERVICE_TOKEN - service token to use for communication with Elasticsearch FLEET_SERVER_SERVICE_TOKEN_PATH - path to service token file to use for communication with Elasticsearch FLEET_SERVER_POLICY_ID - policy ID for Fleet Server to use for itself ("Default Fleet Server policy" used when undefined) FLEET_SERVER_HOST - binding host for Fleet Server HTTP (overrides the policy). By default this is 0.0.0.0. FLEET_SERVER_PORT - binding port for Fleet Server HTTP (overrides the policy) FLEET_SERVER_CERT - path to certificate to use for HTTPS endpoint FLEET_SERVER_CERT_KEY - path to private key for certificate to use for HTTPS endpoint FLEET_SERVER_CERT_KEY_PASSPHRASE - path to private key passphrase file for certificate to use for HTTPS endpoint FLEET_SERVER_ES_CERT - path to certificate to use for connecting to Elasticsearch FLEET_SERVER_ES_CERT_KEY - path to private key for certificate to use for connecting to Elasticsearch FLEET_SERVER_CLIENT_AUTH - fleet-server mTLS client authentication for connecting elastic-agents. Must be one of [none, optional, required]. A default of none is used. FLEET_SERVER_INSECURE_HTTP - expose Fleet Server over HTTP (not recommended; insecure) FLEET_SERVER_INIT_TIMEOUT - Sets the initial timeout when starting up the fleet server under agent. Default: 30s. * Preparing Kibana for Fleet This prepares the Fleet plugin that exists inside of Kibana. This must either be enabled here or done externally before Fleet Server will actually successfully start. All the Kibana variables are not needed in case Elastic Agent should not setup Fleet. KIBANA_FLEET_HOST - Kibana host accessible from Fleet Server. [$KIBANA_HOST] KIBANA_FLEET_USERNAME - Kibana username to service token [$KIBANA_USERNAME] KIBANA_FLEET_PASSWORD - Kibana password to service token [$KIBANA_PASSWORD] KIBANA_FLEET_CA - path to certificate authority to use with communicate with Kibana [$KIBANA_CA] KIBANA_REQUEST_RETRY_SLEEP - sleep duration taken when agent performs a request to Kibana [default 1s] KIBANA_REQUEST_RETRY_COUNT - number of retries agent performs when executing a request to Kibana [default 30] The following environment variables are provided as a convenience to prevent a large number of environment variables to be used when the same credentials will be used across all the possible actions above. ELASTICSEARCH_HOST - Elasticsearch host [http://elasticsearch:9200] ELASTICSEARCH_USERNAME - Elasticsearch username [elastic] ELASTICSEARCH_PASSWORD - Elasticsearch password [changeme] ELASTICSEARCH_CA - path to certificate authority to use to communicate with Elasticsearch KIBANA_HOST - Kibana host [http://kibana:5601] KIBANA_FLEET_USERNAME - Kibana username to enable Fleet [$ELASTICSEARCH_USERNAME] KIBANA_FLEET_PASSWORD - Kibana password to enable Fleet [$ELASTICSEARCH_PASSWORD] KIBANA_CA - path to certificate authority to use with communicate with Kibana [$ELASTICSEARCH_CA] ELASTIC_AGENT_TAGS - user provided tags for the agent [linux,staging] * Elastic-Agent event logging If EVENTS_TO_STDERR is set to true log entries containing event data or whole raw events will be logged to stderr alongside all other log entries. If unset or set to false, the events will be logged to a separate file that is not collected alongside the monitoring logs, however they will be present in diagnostics. By default when this command starts it will check for an existing fleet.yml. If that file already exists then all the above actions will be skipped, because the Elastic Agent has already been enrolled. To ensure that enrollment occurs on every start of the container set FLEET_FORCE to 1. `, Run: func(c *cobra.Command, args []string) { if err := logContainerCmd(streams); err != nil { logError(streams, err) os.Exit(1) } }, } return &cmd } func logError(streams *cli.IOStreams, err error) { fmt.Fprintf(streams.Err, "Error: %v\n%s\n", err, troubleshootMessage()) } func logInfo(streams *cli.IOStreams, a ...interface{}) { fmt.Fprintln(streams.Out, a...) } func logContainerCmd(streams *cli.IOStreams) error { logsPath := envWithDefault("", "LOGS_PATH") if logsPath != "" { // log this entire command to a file as well as to the passed streams if err := os.MkdirAll(logsPath, logsPathPerms); err != nil { return fmt.Errorf("preparing LOGS_PATH(%s) failed: %w", logsPath, err) } logPath := filepath.Join(logsPath, "elastic-agent-startup.log") w, err := os.Create(logPath) if err != nil { return fmt.Errorf("opening startup log(%s) failed: %w", logPath, err) } defer w.Close() streams.Out = io.MultiWriter(streams.Out, w) streams.Err = io.MultiWriter(streams.Out, w) } return containerCmd(streams) } func containerCmd(streams *cli.IOStreams) error { // set paths early so all action below use the defined paths if err := setPaths("", "", "", "", true); err != nil { return err } elasticCloud := envBool("ELASTIC_AGENT_CLOUD") // if not in cloud mode, always run the agent runAgent := !elasticCloud // create access configuration from ENV and config files cfg, err := defaultAccessConfig() if err != nil { return err } for _, f := range []string{"fleet-setup.yml", "credentials.yml"} { c, err := config.LoadFile(filepath.Join(paths.Config(), f)) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("parsing config file(%s): %w", f, err) } if c != nil { err = c.UnpackTo(&cfg) if err != nil { return fmt.Errorf("unpacking config file(%s): %w", f, err) } // if in elastic cloud mode, only run the agent when configured runAgent = true } } // start apm-server legacy process when in cloud mode var wg sync.WaitGroup var apmProc *process.Info apmPath := os.Getenv("APM_SERVER_PATH") if elasticCloud { logInfo(streams, "Starting in elastic cloud mode") if elasticCloud && apmPath != "" { // run legacy APM Server as a daemon; send termination signal // to the main process if the daemon is stopped mainProc, err := os.FindProcess(os.Getpid()) if err != nil { return errors.New(err, "finding current process") } if apmProc, err = runLegacyAPMServer(streams); err != nil { return errors.New(err, "starting legacy apm-server") } wg.Add(1) // apm-server legacy process logInfo(streams, "Legacy apm-server daemon started.") go func() { if err := func() error { apmProcState, err := apmProc.Process.Wait() if err != nil { return err } if apmProcState.ExitCode() != 0 { return fmt.Errorf("apm-server process exited with %d", apmProcState.ExitCode()) } return nil }(); err != nil { logError(streams, err) } wg.Done() // sending kill signal to current process (elastic-agent) logInfo(streams, "Initiate shutdown elastic-agent.") mainProc.Signal(syscall.SIGTERM) //nolint:errcheck //not required }() defer func() { if apmProc != nil { apmProc.Stop() //nolint:errcheck //not required logInfo(streams, "Initiate shutdown legacy apm-server.") } }() } } if runAgent { // run the main elastic-agent container command err = runContainerCmd(streams, cfg) } // wait until APM Server shut down wg.Wait() return err } func runContainerCmd(streams *cli.IOStreams, cfg setupConfig) error { var err error initTimeout := envTimeout(fleetInitTimeoutName) if cfg.FleetServer.Enable { err = ensureServiceToken(streams, &cfg) if err != nil { return err } } shouldEnroll, err := shouldFleetEnroll(cfg) if err != nil { return err } if shouldEnroll { var policy *kibanaPolicy token := cfg.Fleet.EnrollmentToken if token == "" && !cfg.FleetServer.Enable { client, err := kibanaClient(cfg.Kibana, cfg.Kibana.Headers) if err != nil { return err } policy, err = kibanaFetchPolicy(cfg, client, streams) if err != nil { return err } token, err = kibanaFetchToken(cfg, client, policy, streams, cfg.Fleet.TokenName) if err != nil { return err } } policyID := cfg.FleetServer.PolicyID if policy != nil { policyID = policy.ID } if policyID != "" { logInfo(streams, "Policy selected for enrollment: ", policyID) } executable, err := os.Executable() if err != nil { return err } cmdArgs, err := buildEnrollArgs(cfg, token, policyID) if err != nil { return err } enroll := exec.Command(executable, cmdArgs...) enroll.Stdout = streams.Out enroll.Stderr = streams.Err err = enroll.Start() if err != nil { return errors.New("failed to execute enrollment command", err) } err = enroll.Wait() if err != nil { return errors.New("enrollment failed", err) } } return run(containerCfgOverrides, false, initTimeout, isContainer) } // TokenResp is used to decode a response for generating a service token type TokenResp struct { Name string `json:"name"` Value string `json:"value"` } // ensureServiceToken will ensure that the cfg specified has the service_token attributes filled. // // If no token is specified it will try to use the value from service_token_path // If no filepath is specified it will use the elasticsearch username/password to request a new token from Kibana func ensureServiceToken(streams *cli.IOStreams, cfg *setupConfig) error { // There's already a service token if cfg.Kibana.Fleet.ServiceToken != "" || cfg.FleetServer.Elasticsearch.ServiceToken != "" { return nil } // read from secret file if cfg.FleetServer.Elasticsearch.ServiceTokenPath != "" { p, err := os.ReadFile(cfg.FleetServer.Elasticsearch.ServiceTokenPath) if err != nil { return fmt.Errorf("unable to open service_token_path: %w", err) } cfg.Kibana.Fleet.ServiceToken = string(p) cfg.FleetServer.Elasticsearch.ServiceToken = string(p) return nil } if cfg.Kibana.Fleet.ServiceTokenPath != "" { p, err := os.ReadFile(cfg.Kibana.Fleet.ServiceTokenPath) if err != nil { return fmt.Errorf("unable to open service_token_path: %w", err) } cfg.Kibana.Fleet.ServiceToken = string(p) cfg.FleetServer.Elasticsearch.ServiceToken = string(p) return nil } // request new token if cfg.Kibana.Fleet.Username == "" || cfg.Kibana.Fleet.Password == "" { return fmt.Errorf("username/password must be provided to retrieve service token") } logInfo(streams, "Requesting service_token from Kibana.") // Client is not passed in to this function because this function will use username/password and then // all the following clients will use the created service token. client, err := kibanaClient(cfg.Kibana, cfg.Kibana.Headers) if err != nil { return err } code, r, err := client.Request("POST", "/api/fleet/service_tokens", nil, nil, nil) if err != nil { return fmt.Errorf("request to get security token from Kibana failed: %w", err) } if code >= 400 { return fmt.Errorf("request to get security token from Kibana failed with status %d, body: %s", code, string(r)) } t := TokenResp{} err = json.Unmarshal(r, &t) if err != nil { return fmt.Errorf("unable to decode response: %w", err) } logInfo(streams, "Created service_token named:", t.Name) cfg.Kibana.Fleet.ServiceToken = t.Value cfg.FleetServer.Elasticsearch.ServiceToken = t.Value return nil } func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string, error) { args := []string{ "enroll", "-f", "-c", paths.ConfigFile(), "--path.home", paths.Top(), // --path.home actually maps to paths.Top() "--path.config", paths.Config(), "--path.logs", paths.Logs(), "--path.socket", paths.ControlSocket(), "--skip-daemon-reload", } if paths.Downloads() != "" { args = append(args, "--path.downloads", paths.Downloads()) } if !paths.IsVersionHome() { args = append(args, "--path.home.unversioned") } if tags := envWithDefault("", "ELASTIC_AGENT_TAGS"); tags != "" { args = append(args, "--tag", tags) } if cfg.FleetServer.Enable { connStr, err := buildFleetServerConnStr(cfg.FleetServer) if err != nil { return nil, err } args = append(args, "--fleet-server-es", connStr) if cfg.FleetServer.Elasticsearch.ServiceTokenPath != "" { args = append(args, "--fleet-server-service-token-path", cfg.FleetServer.Elasticsearch.ServiceTokenPath) } else if cfg.FleetServer.Elasticsearch.ServiceTokenPath == "" && cfg.FleetServer.Elasticsearch.ServiceToken != "" { args = append(args, "--fleet-server-service-token", cfg.FleetServer.Elasticsearch.ServiceToken) } if policyID != "" { args = append(args, "--fleet-server-policy", policyID) } if cfg.FleetServer.Elasticsearch.CA != "" { args = append(args, "--fleet-server-es-ca", cfg.FleetServer.Elasticsearch.CA) } if cfg.FleetServer.Elasticsearch.CATrustedFingerprint != "" { args = append(args, "--fleet-server-es-ca-trusted-fingerprint", cfg.FleetServer.Elasticsearch.CATrustedFingerprint) } if cfg.FleetServer.Elasticsearch.Cert != "" { args = append(args, "--fleet-server-es-cert", cfg.FleetServer.Elasticsearch.Cert) } if cfg.FleetServer.Elasticsearch.CertKey != "" { args = append(args, "--fleet-server-es-cert-key", cfg.FleetServer.Elasticsearch.CertKey) } if cfg.FleetServer.Host != "" { args = append(args, "--fleet-server-host", cfg.FleetServer.Host) } if cfg.FleetServer.Port != "" { args = append(args, "--fleet-server-port", cfg.FleetServer.Port) } if cfg.FleetServer.Cert != "" { args = append(args, "--fleet-server-cert", cfg.FleetServer.Cert) } if cfg.FleetServer.CertKey != "" { args = append(args, "--fleet-server-cert-key", cfg.FleetServer.CertKey) } if cfg.FleetServer.PassphrasePath != "" { args = append(args, "--fleet-server-cert-key-passphrase", cfg.FleetServer.PassphrasePath) } if cfg.FleetServer.ClientAuth != "" { args = append(args, "--fleet-server-client-auth", cfg.FleetServer.ClientAuth) } for k, v := range cfg.FleetServer.Headers { args = append(args, "--header", k+"="+v) } if cfg.Fleet.URL != "" { args = append(args, "--url", cfg.Fleet.URL) } if cfg.FleetServer.InsecureHTTP { args = append(args, "--fleet-server-insecure-http") } if cfg.FleetServer.InsecureHTTP || cfg.Fleet.Insecure { args = append(args, "--insecure") } if cfg.FleetServer.Elasticsearch.Insecure { args = append(args, "--fleet-server-es-insecure") } if cfg.FleetServer.Timeout != 0 { args = append(args, "--fleet-server-timeout") args = append(args, cfg.FleetServer.Timeout.String()) } } else { if cfg.Fleet.URL == "" { return nil, errors.New("FLEET_URL is required when FLEET_ENROLL is true without FLEET_SERVER_ENABLE") } args = append(args, "--url", cfg.Fleet.URL) if cfg.Fleet.Insecure { args = append(args, "--insecure") } } if cfg.Fleet.CA != "" { args = append(args, "--certificate-authorities", cfg.Fleet.CA) } if token != "" { args = append(args, "--enrollment-token", token) } if cfg.Fleet.ID != "" { args = append(args, "--id", cfg.Fleet.ID) } if cfg.Fleet.ReplaceToken != "" { args = append(args, "--replace-token", cfg.Fleet.ReplaceToken) } if cfg.Fleet.DaemonTimeout != 0 { args = append(args, "--daemon-timeout") args = append(args, cfg.Fleet.DaemonTimeout.String()) } if cfg.Fleet.EnrollTimeout != 0 { args = append(args, "--enroll-timeout") args = append(args, cfg.Fleet.EnrollTimeout.String()) } if cfg.Fleet.Cert != "" { args = append(args, "--elastic-agent-cert", cfg.Fleet.Cert) } if cfg.Fleet.CertKey != "" { args = append(args, "--elastic-agent-cert-key", cfg.Fleet.CertKey) } return args, nil } func buildFleetServerConnStr(cfg fleetServerConfig) (string, error) { u, err := url.Parse(cfg.Elasticsearch.Host) if err != nil { return "", err } path := "" if u.Path != "" { path += "/" + strings.TrimLeft(u.Path, "/") } return fmt.Sprintf("%s://%s%s", u.Scheme, u.Host, path), nil } func kibanaFetchPolicy(cfg setupConfig, client *kibana.Client, streams *cli.IOStreams) (*kibanaPolicy, error) { var policies kibanaPolicies err := performGET(cfg, client, "/api/fleet/agent_policies", &policies, streams.Err, "Kibana fetch policy") if err != nil { return nil, err } return findPolicy(cfg, policies.Items) } func kibanaFetchToken(cfg setupConfig, client *kibana.Client, policy *kibanaPolicy, streams *cli.IOStreams, tokenName string) (string, error) { var keys kibanaAPIKeys err := performGET(cfg, client, "/api/fleet/enrollment_api_keys", &keys, streams.Err, "Kibana fetch token") if err != nil { return "", err } key, err := findKey(keys.Items, policy, tokenName) if err != nil { return "", err } var keyDetail kibanaAPIKeyDetail err = performGET(cfg, client, fmt.Sprintf("/api/fleet/enrollment_api_keys/%s", key.ID), &keyDetail, streams.Err, "Kibana fetch token detail") if err != nil { return "", err } return keyDetail.Item.APIKey, nil } func kibanaClient(cfg kibanaConfig, headers map[string]string) (*kibana.Client, error) { var tls *tlscommon.Config if cfg.Fleet.CA != "" { tls = &tlscommon.Config{ CAs: []string{cfg.Fleet.CA}, } } transport := httpcommon.DefaultHTTPTransportSettings() transport.TLS = tls return kibana.NewClientWithConfigDefault(&kibana.ClientConfig{ Host: cfg.Fleet.Host, Username: cfg.Fleet.Username, Password: cfg.Fleet.Password, ServiceToken: cfg.Fleet.ServiceToken, IgnoreVersion: true, Transport: transport, Headers: headers, }, 0, "Elastic-Agent", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String()) } func findPolicy(cfg setupConfig, policies []kibanaPolicy) (*kibanaPolicy, error) { policyID := "" policyName := cfg.Fleet.TokenPolicyName if cfg.FleetServer.Enable { policyID = cfg.FleetServer.PolicyID } for _, policy := range policies { if policyID != "" { if policyID == policy.ID { return &policy, nil } } else if policyName != "" { if policyName == policy.Name { return &policy, nil } } else if cfg.FleetServer.Enable { if policy.IsDefaultFleetServer { return &policy, nil } } else { if policy.IsDefault { return &policy, nil } } } return nil, fmt.Errorf(`unable to find policy named "%s"`, policyName) } func findKey(keys []kibanaAPIKey, policy *kibanaPolicy, tokenName string) (*kibanaAPIKey, error) { for _, key := range keys { name := strings.TrimSpace(tokenNameStrip.ReplaceAllString(key.Name, "")) if name == tokenName && key.PolicyID == policy.ID { return &key, nil } } return nil, fmt.Errorf(`unable to find enrollment token named "%s" in policy "%s"`, tokenName, policy.Name) } func envWithDefault(def string, keys ...string) string { for _, key := range keys { val, ok := os.LookupEnv(key) if ok { return val } } return def } func envBool(keys ...string) bool { for _, key := range keys { val, ok := os.LookupEnv(key) if ok && isTrue(val) { return true } } return false } func envTimeout(keys ...string) time.Duration { for _, key := range keys { val, ok := os.LookupEnv(key) if ok { dur, err := time.ParseDuration(val) if err == nil { return dur } } } return 0 } func envMap(key string) map[string]string { m := make(map[string]string) prefix := key + "=" for _, env := range os.Environ() { if !strings.HasPrefix(env, prefix) { continue } envVal := strings.TrimPrefix(env, prefix) keyValue := strings.SplitN(envVal, "=", 2) if len(keyValue) != 2 { continue } m[keyValue[0]] = keyValue[1] } return m } func isTrue(val string) bool { trueVals := []string{"1", "true", "yes", "y"} val = strings.ToLower(val) for _, v := range trueVals { if val == v { return true } } return false } func performGET(cfg setupConfig, client *kibana.Client, path string, response interface{}, writer io.Writer, msg string) error { var lastErr error for i := 0; i < cfg.Kibana.RetryMaxCount; i++ { code, result, err := client.Request("GET", path, nil, nil, nil) if err != nil || code != 200 { if err != nil { err = fmt.Errorf("http GET request to %s%s fails: %w. Response: %s", client.URL, path, err, truncateString(result)) } else { err = fmt.Errorf("http GET request to %s%s fails. StatusCode: %d Response: %s", client.URL, path, code, truncateString(result)) } fmt.Fprintf(writer, "%s failed: %s\n", msg, err) <-time.After(cfg.Kibana.RetrySleepDuration) continue } if response == nil { return nil } return json.Unmarshal(result, response) } return lastErr } func truncateString(b []byte) string { const maxLength = 250 runes := bytes.Runes(b) if len(runes) > maxLength { runes = append(runes[:maxLength], []rune("... (truncated)")...) } return strings.ReplaceAll(string(runes), "\n", " ") } // runLegacyAPMServer extracts the bundled apm-server from elastic-agent // to path and runs it with args. func runLegacyAPMServer(streams *cli.IOStreams) (*process.Info, error) { name := "apm" logInfo(streams, "Preparing apm-server for legacy mode.") platform, err := component.LoadPlatformDetail(isContainer) if err != nil { return nil, fmt.Errorf("failed to gather system information: %w", err) } specs, err := component.LoadRuntimeSpecs(paths.Components(), platform) if err != nil { return nil, fmt.Errorf("failed to detect inputs and outputs: %w", err) } spec, err := specs.GetInput(name) if err != nil { return nil, fmt.Errorf("failed to detect apm-server input: %w", err) } // add APM Server specific configuration var args []string addEnv := func(arg, env string) { if v := os.Getenv(env); v != "" { args = append(args, arg, v) } } addSettingEnv := func(arg, env string) { if v := os.Getenv(env); v != "" { args = append(args, "-E", fmt.Sprintf("%v=%v", arg, v)) } } addEnv("--path.home", "HOME_PATH") addEnv("--path.config", "CONFIG_PATH") addEnv("--path.data", "DATA_PATH") addEnv("--path.logs", "LOGS_PATH") addEnv("--httpprof", "HTTPPROF") addSettingEnv("gc_percent", "APMSERVER_GOGC") logInfo(streams, "Starting legacy apm-server daemon as a subprocess."+spec.BinaryPath) options := []process.StartOption{process.WithArgs(args)} wdir := filepath.Dir(spec.BinaryPath) if wdir != "." { options = append(options, process.WithCmdOptions(func(c *exec.Cmd) error { c.Dir = wdir return nil })) } return process.Start(spec.BinaryPath, options...) } func containerCfgOverrides(cfg *configuration.Configuration) { logsPath := envWithDefault("", "LOGS_PATH") if logsPath == "" { // when no LOGS_PATH defined the container should log to stderr cfg.Settings.LoggingConfig.ToStderr = true cfg.Settings.LoggingConfig.ToFiles = false } eventsToStderrEnv := envWithDefault("false", "EVENTS_TO_STDERR") eventsToStderr, err := strconv.ParseBool(eventsToStderrEnv) if err != nil { logp.Warn("cannot parse EVENS_TO_STDERR='%s' as boolean, logging events to file'", eventsToStderrEnv) } if eventsToStderr { cfg.Settings.EventLoggingConfig.ToFiles = false cfg.Settings.EventLoggingConfig.ToStderr = true } configuration.OverrideDefaultContainerGRPCPort(cfg.Settings.GRPC) } func setPaths(statePath, configPath, logsPath, socketPath string, writePaths bool) error { statePath = envWithDefault(statePath, "STATE_PATH") if statePath == "" { statePath = defaultStateDirectory } topPath := filepath.Join(statePath, "data") configPath = envWithDefault(configPath, "CONFIG_PATH") if configPath == "" { configPath = statePath } if _, err := os.Stat(configPath); errors.Is(err, fs.ErrNotExist) { if err := os.MkdirAll(configPath, 0755); err != nil { return fmt.Errorf("cannot create folders for config path '%s': %w", configPath, err) } } if socketPath == "" { socketPath = utils.SocketURLWithFallback(statePath, topPath) } // ensure that the directory and sub-directory data exists if err := os.MkdirAll(topPath, 0755); err != nil { return fmt.Errorf("preparing STATE_PATH(%s) failed: %w", statePath, err) } // ensure that the elastic-agent.yml exists in the state directory or if given in the config directory baseConfig := filepath.Join(configPath, paths.DefaultConfigName) if _, err := os.Stat(baseConfig); os.IsNotExist(err) { if err := copyFile(baseConfig, paths.ConfigFile(), 0); err != nil { return err } } originalInstall := paths.Install() paths.SetTop(topPath) paths.SetConfig(configPath) paths.SetControlSocket(socketPath) // when custom top path is provided the home directory is not versioned paths.SetVersionHome(false) // install path stays on container default mount (otherwise a bind mounted directory could have noexec set) paths.SetInstall(originalInstall) // set LOGS_PATH is given logsPath = envWithDefault(logsPath, "LOGS_PATH") if logsPath != "" { paths.SetLogs(logsPath) // ensure that the logs directory exists if err := os.MkdirAll(filepath.Join(logsPath), logsPathPerms); err != nil { return fmt.Errorf("preparing LOGS_PATH(%s) failed: %w", logsPath, err) } } // ensure that the internal logger directory exists loggerPath := filepath.Join(paths.Home(), logger.DefaultLogDirectory) if err := os.MkdirAll(loggerPath, logsPathPerms); err != nil { return fmt.Errorf("preparing internal log path(%s) failed: %w", loggerPath, err) } // persist the paths so other commands in the container will use the correct paths if writePaths { if err := writeContainerPaths(statePath, configPath, logsPath, socketPath); err != nil { return err } } return nil } type containerPaths struct { StatePath string `config:"state_path" yaml:"state_path"` ConfigPath string `config:"config_path" yaml:"config_path,omitempty"` LogsPath string `config:"logs_path" yaml:"logs_path,omitempty"` SocketPath string `config:"socket_path" yaml:"socket_path,omitempty"` } func writeContainerPaths(statePath, configPath, logsPath, socketPath string) error { pathFile := filepath.Join(statePath, "container-paths.yml") fp, err := os.Create(pathFile) if err != nil { return fmt.Errorf("failed creating %s: %w", pathFile, err) } b, err := yaml.Marshal(containerPaths{ StatePath: statePath, ConfigPath: configPath, LogsPath: logsPath, SocketPath: socketPath, }) if err != nil { return fmt.Errorf("failed to marshal for %s: %w", pathFile, err) } _, err = fp.Write(b) if err != nil { return fmt.Errorf("failed to write %s: %w", pathFile, err) } return nil } func tryContainerLoadPaths() error { statePath := envWithDefault("", "STATE_PATH") if statePath == "" { statePath = defaultStateDirectory } pathFile := filepath.Join(statePath, "container-paths.yml") _, err := os.Stat(pathFile) if os.IsNotExist(err) { // no container-paths.yml file exists, so nothing to do return nil } cfg, err := config.LoadFile(pathFile) if err != nil { return fmt.Errorf("failed to load %s: %w", pathFile, err) } var paths containerPaths err = cfg.UnpackTo(&paths) if err != nil { return fmt.Errorf("failed to unpack %s: %w", pathFile, err) } return setPaths(paths.StatePath, paths.ConfigPath, paths.LogsPath, paths.SocketPath, false) } func copyFile(destPath string, srcPath string, mode os.FileMode) error { // if mode is unset; set to the same as the source file if mode == 0 { info, err := os.Stat(srcPath) if err == nil { // ignoring error because; os.Open will also error if the file cannot be stat'd mode = info.Mode() } } src, err := os.Open(srcPath) if err != nil { return err } defer src.Close() dest, err := os.OpenFile(destPath, os.O_CREATE|os.O_WRONLY, mode) if err != nil { return err } defer dest.Close() _, err = io.Copy(dest, src) return err } type kibanaPolicy struct { ID string `json:"id"` Name string `json:"name"` Status string `json:"status"` IsDefault bool `json:"is_default"` IsDefaultFleetServer bool `json:"is_default_fleet_server"` } type kibanaPolicies struct { Items []kibanaPolicy `json:"items"` } type kibanaAPIKey struct { ID string `json:"id"` Name string `json:"name"` Active bool `json:"active"` PolicyID string `json:"policy_id"` APIKey string `json:"api_key"` } type kibanaAPIKeys struct { Items []kibanaAPIKey `json:"items"` } type kibanaAPIKeyDetail struct { Item kibanaAPIKey `json:"item"` } func envDurationWithDefault(defVal string, keys ...string) (time.Duration, error) { valStr := defVal for _, key := range keys { val, ok := os.LookupEnv(key) if ok { valStr = val break } } return time.ParseDuration(valStr) } func envIntWithDefault(defVal string, keys ...string) (int, error) { valStr := defVal for _, key := range keys { val, ok := os.LookupEnv(key) if ok { valStr = val break } } return strconv.Atoi(valStr) } // isContainer changes the platform details to be a container. // // Runtime specifications can provide unique configurations when running in a container, this ensures that // those configurations are used versus the standard Linux configurations. func isContainer(detail component.PlatformDetail) component.PlatformDetail { detail.OS = component.Container return detail } var ( newFleetClient = func(log *logger.Logger, apiKey string, cfg remote.Config) (fleetclient.Sender, error) { return fleetclient.NewAuthWithConfig(log, apiKey, cfg) } newEncryptedDiskStore = storage.NewEncryptedDiskStore statAgentConfigFile = os.Stat ) // agentInfo implements the AgentInfo interface, and it used in shouldFleetEnroll. type agentInfo struct { id string } func (a *agentInfo) AgentID() string { return a.id } // shouldFleetEnroll returns true if the elastic-agent should enroll to fleet. func shouldFleetEnroll(setupCfg setupConfig) (bool, error) { if !setupCfg.Fleet.Enroll { // Enrollment is explicitly disabled in the setup configuration. return false, nil } if setupCfg.Fleet.Force { // Enrollment is explicitly enforced by the setup configuration. return true, nil } agentCfgFilePath := paths.AgentConfigFile() _, err := statAgentConfigFile(agentCfgFilePath) if os.IsNotExist(err) { // The agent configuration file does not exist, so enrollment is required. return true, nil } ctx, cancel := context.WithCancel(context.Background()) defer cancel() store, err := newEncryptedDiskStore(ctx, agentCfgFilePath) if err != nil { return false, fmt.Errorf("failed to instantiate encrypted disk store: %w", err) } reader, err := store.Load() if err != nil { return false, fmt.Errorf("failed to load from disk store: %w", err) } cfg, err := config.NewConfigFrom(reader) if err != nil { return false, fmt.Errorf("failed to read from disk store: %w", err) } storedConfig, err := configuration.NewFromConfig(cfg) if err != nil { return false, fmt.Errorf("failed to read from disk store: %w", err) } // Check if enrolling with a specifically defined Elastic Agent ID. // If the ID's don't match then it needs to enroll. if setupCfg.Fleet.ID != "" && (storedConfig.Fleet.Info == nil || storedConfig.Fleet.Info.ID != setupCfg.Fleet.ID) { // ID is a mismatch return true, nil } storedFleetHosts := storedConfig.Fleet.Client.GetHosts() if len(storedFleetHosts) == 0 || !slices.Contains(storedFleetHosts, setupCfg.Fleet.URL) { // The Fleet URL in the setup does not exist in the stored configuration, so enrollment is required. return true, nil } // Evaluate the stored enrollment token hash against the setup enrollment token if both are present. // Note that when "upgrading" from an older agent version the enrollment token hash will not exist // in the stored configuration. if len(storedConfig.Fleet.EnrollmentTokenHash) > 0 && len(setupCfg.Fleet.EnrollmentToken) > 0 { enrollmentHashBytes, err := base64.StdEncoding.DecodeString(storedConfig.Fleet.EnrollmentTokenHash) if err != nil { return false, fmt.Errorf("failed to decode enrollment token hash: %w", err) } err = crypto.ComparePBKDF2HashAndPassword(enrollmentHashBytes, []byte(setupCfg.Fleet.EnrollmentToken)) switch { case errors.Is(err, crypto.ErrMismatchedHashAndPassword): // The stored enrollment token hash does not match the new token, so enrollment is required. return true, nil case err != nil: return false, fmt.Errorf("failed to compare enrollment token hash: %w", err) } } // Evaluate the stored replace token hash against the setup replace token if both are present. // Note that when "upgrading" from an older agent version the replace token hash will not exist // in the stored configuration. if len(storedConfig.Fleet.ReplaceTokenHash) > 0 && len(setupCfg.Fleet.ReplaceToken) > 0 { replaceHashBytes, err := base64.StdEncoding.DecodeString(storedConfig.Fleet.ReplaceTokenHash) if err != nil { return false, fmt.Errorf("failed to decode replace token hash: %w", err) } err = crypto.ComparePBKDF2HashAndPassword(replaceHashBytes, []byte(setupCfg.Fleet.ReplaceToken)) switch { case errors.Is(err, crypto.ErrMismatchedHashAndPassword): // The stored enrollment token hash does not match the new token, so enrollment is required. return true, nil case err != nil: return false, fmt.Errorf("failed to compare replace token hash: %w", err) } } // Validate the stored API token to check if the agent is still authorized with Fleet. log, err := logger.New("fleet_client", false) if err != nil { return false, fmt.Errorf("failed to create logger: %w", err) } fc, err := newFleetClient(log, storedConfig.Fleet.AccessAPIKey, storedConfig.Fleet.Client) if err != nil { return false, fmt.Errorf("failed to create fleet client: %w", err) } // Perform an ACK request with **empty events** to verify the validity of the API token. // If the agent has been manually un-enrolled through the Kibana UI, the ACK request will fail due to an invalid API token. // In such cases, the agent should automatically re-enroll and "recover" their enrollment status without manual intervention, // maintaining seamless operation. err = ackFleet(ctx, fc, storedConfig.Fleet.Info.ID) switch { case errors.Is(err, fleetclient.ErrInvalidAPIKey): // The API key is invalid, so enrollment is required. return true, nil case err != nil: return false, fmt.Errorf("failed to validate api token: %w", err) } saveConfig := false // Update the stored enrollment token hash if there is no previous enrollment token hash // (can happen when "upgrading" from an older version of the agent) and setup enrollment token is present. if len(storedConfig.Fleet.EnrollmentTokenHash) == 0 && len(setupCfg.Fleet.EnrollmentToken) > 0 { enrollmentHashBytes, err := crypto.GeneratePBKDF2FromPassword([]byte(setupCfg.Fleet.EnrollmentToken)) if err != nil { return false, errors.New("failed to generate enrollment token hash") } enrollmentTokenHash := base64.StdEncoding.EncodeToString(enrollmentHashBytes) storedConfig.Fleet.EnrollmentTokenHash = enrollmentTokenHash saveConfig = true } // Update the stored replace token hash if there is no previous replace token hash // (can happen when "upgrading" from an older version of the agent) and setup replace token is present. if len(storedConfig.Fleet.ReplaceTokenHash) == 0 && len(setupCfg.Fleet.ReplaceToken) > 0 { replaceHashBytes, err := crypto.GeneratePBKDF2FromPassword([]byte(setupCfg.Fleet.ReplaceToken)) if err != nil { return false, errors.New("failed to generate replace token hash") } replaceTokenHash := base64.StdEncoding.EncodeToString(replaceHashBytes) storedConfig.Fleet.ReplaceTokenHash = replaceTokenHash saveConfig = true } if saveConfig { data, err := yaml.Marshal(storedConfig) if err != nil { return false, errors.New("could not marshal config") } if err := safelyStoreAgentInfo(store, bytes.NewReader(data)); err != nil { return false, fmt.Errorf("failed to store agent config: %w", err) } } return false, nil } // ackFleet performs an ACK request to the fleet server with **empty events**. func ackFleet(ctx context.Context, client fleetclient.Sender, agentID string) error { const retryInterval = time.Second const maxRetries = 3 ackRequest := &fleetapi.AckRequest{Events: nil} ackCMD := fleetapi.NewAckCmd(&agentInfo{agentID}, client) retries := 0 return backoff.Retry(func() error { retries++ _, err := ackCMD.Execute(ctx, ackRequest) switch { case err == nil: return nil case errors.Is(err, fleetclient.ErrInvalidAPIKey) || retries == maxRetries: return backoff.Permanent(err) default: return err } }, &backoff.ConstantBackOff{Interval: retryInterval}) }