internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go (411 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package handlers import ( "bytes" "context" goerrors "errors" "fmt" "io" "net/http" "sort" "time" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/elastic-agent/internal/pkg/agent/application/actions" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker" "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/remote" "github.com/elastic/elastic-agent/pkg/core/logger" ) const ( apiStatusTimeout = 15 * time.Second ) // PolicyChangeHandler is a handler for POLICY_CHANGE action. type PolicyChangeHandler struct { log *logger.Logger agentInfo info.Agent config *configuration.Configuration store storage.Store ch chan coordinator.ConfigChange setters []actions.ClientSetter policyLogLevelSetter logLevelSetter coordinator *coordinator.Coordinator // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // // Last known valid signature validation key // signatureValidationKey []byte } // NewPolicyChangeHandler creates a new PolicyChange handler. func NewPolicyChangeHandler( log *logger.Logger, agentInfo info.Agent, config *configuration.Configuration, store storage.Store, ch chan coordinator.ConfigChange, policyLogLevelSetter logLevelSetter, coordinator *coordinator.Coordinator, setters ...actions.ClientSetter, ) *PolicyChangeHandler { return &PolicyChangeHandler{ log: log, agentInfo: agentInfo, config: config, store: store, ch: ch, setters: setters, coordinator: coordinator, policyLogLevelSetter: policyLogLevelSetter, } } // AddSetter adds a setter into a collection of client setters. func (h *PolicyChangeHandler) AddSetter(cs actions.ClientSetter) { if h.setters == nil { h.setters = make([]actions.ClientSetter, 0) } h.setters = append(h.setters, cs) } // Handle handles policy change action. func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error { h.log.Debugf("handlerPolicyChange: action '%+v' received", a) action, ok := a.(*fleetapi.ActionPolicyChange) if !ok { return fmt.Errorf("invalid type, expected ActionPolicyChange and received %T", a) } // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // // Validate policy signature and overlay signed configuration // policy, signatureValidationKey, err := protection.ValidatePolicySignature(h.log, action.Policy, h.signatureValidationKey) // if err != nil { // return errors.New(err, "could not validate the policy signed configuration", errors.TypeConfig) // } // h.log.Debugf("handlerPolicyChange: policy validation result: signature validation key length: %v, err: %v", len(signatureValidationKey), err) // // Cache signature validation key for the next policy handling // h.signatureValidationKey = signatureValidationKey c, err := config.NewConfigFrom(action.Data.Policy) if err != nil { return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) } h.log.Debugf("handlerPolicyChange: emit configuration for action %+v", a) err = h.handlePolicyChange(ctx, c) if err != nil { return err } h.ch <- newPolicyChange(ctx, c, a, acker, false) return nil } // Watch returns the channel for configuration change notifications. func (h *PolicyChangeHandler) Watch() <-chan coordinator.ConfigChange { return h.ch } func (h *PolicyChangeHandler) validateFleetServerHosts(ctx context.Context, cfg *config.Config) (*remote.Config, error) { // do not update fleet-server host from policy; no setters provided with local Fleet Server if len(h.setters) == 0 { return nil, nil } parsedConfig, err := configuration.NewPartialFromConfigNoDefaults(cfg) if err != nil { return nil, fmt.Errorf("parsing fleet config: %w", err) } if parsedConfig.Fleet == nil { // there is no client config (weird) return nil, nil } if clientEqual(h.config.Fleet.Client, parsedConfig.Fleet.Client) { // already the same hosts return nil, nil } // make a copy the current client config and apply the changes on this copy newFleetClientConfig := h.config.Fleet.Client updateFleetConfig(h.log, parsedConfig.Fleet.Client, &newFleetClientConfig) // Test new config err = testFleetConfig(ctx, h.log, newFleetClientConfig, h.config.Fleet.AccessAPIKey) if err != nil { return nil, fmt.Errorf("validating fleet client config: %w", err) } return &newFleetClientConfig, nil } func testFleetConfig(ctx context.Context, log *logger.Logger, clientConfig remote.Config, apiKey string) error { fleetClient, err := client.NewAuthWithConfig( log, apiKey, clientConfig) if err != nil { return errors.New( err, "fail to create API client with updated config", errors.TypeConfig, errors.M("hosts", append( clientConfig.Hosts, clientConfig.Host))) } ctx, cancel := context.WithTimeout(ctx, apiStatusTimeout) defer cancel() // TODO: a HEAD should be enough as we need to test only the connectivity part resp, err := fleetClient.Send(ctx, http.MethodGet, "/api/status", nil, nil, nil) if err != nil { return errors.New( err, "fail to communicate with Fleet Server API client hosts", errors.TypeNetwork, errors.M("hosts", clientConfig.Hosts)) } if resp.StatusCode != http.StatusOK { return errors.New( err, fmt.Sprintf("fleet server ping returned a bad status code: %d", resp.StatusCode), errors.TypeNetwork, errors.M("hosts", clientConfig.Hosts)) } // discard body for proper cancellation and connection reuse _, _ = io.Copy(io.Discard, resp.Body) resp.Body.Close() return nil } // updateFleetConfig copies the relevant Fleet client settings from policyConfig on agentConfig. The destination struct is modified in-place func updateFleetConfig(log *logger.Logger, policyConfig remote.Config, agentConfig *remote.Config) { // Hosts is the only connectivity field sent Fleet, let's clear everything else aside from Hosts if len(policyConfig.Hosts) > 0 { agentConfig.Hosts = make([]string, len(policyConfig.Hosts)) copy(agentConfig.Hosts, policyConfig.Hosts) agentConfig.Host = "" agentConfig.Protocol = "" agentConfig.Path = "" } // Empty proxies from fleet are ignored. That way a proxy set by --proxy-url // it won't be overridden by an absent or empty proxy from fleet-server. // However, if there is a proxy sent by fleet-server, it'll take precedence. // Therefore, it's not possible to remove a proxy once it's set. if policyConfig.Transport.Proxy.URL == nil || policyConfig.Transport.Proxy.URL.String() == "" { log.Debugw("proxy from fleet is empty or null, the proxy will not be changed", "current_proxy", agentConfig.Transport.Proxy.URL) } else { log.Debugw("received proxy from fleet, applying it", "old_proxy", agentConfig.Transport.Proxy.URL, "new_proxy", policyConfig.Transport.Proxy.URL) // copy the proxy struct agentConfig.Transport.Proxy = policyConfig.Transport.Proxy // replace in agentConfig the attributes that are passed by reference within the proxy struct // Headers map agentConfig.Transport.Proxy.Headers = map[string]string{} for k, v := range policyConfig.Transport.Proxy.Headers { agentConfig.Transport.Proxy.Headers[k] = v } // Proxy URL urlCopy := *policyConfig.Transport.Proxy.URL agentConfig.Transport.Proxy.URL = &urlCopy } if policyConfig.Transport.TLS != nil { tlsCopy := tlscommon.Config{} if agentConfig.Transport.TLS != nil { // copy the TLS struct tlsCopy = *agentConfig.Transport.TLS } if policyConfig.Transport.TLS.Certificate == emptyCertificateConfig() { log.Debug("TLS certificates from fleet are empty or null, the TLS config will not be changed") } else { tlsCopy.Certificate = policyConfig.Transport.TLS.Certificate log.Debug("received TLS certificate/key from fleet, applying it") } if len(policyConfig.Transport.TLS.CAs) == 0 { log.Debug("TLS CAs from fleet are empty or null, the TLS config will not be changed") } else { tlsCopy.CAs = make([]string, len(policyConfig.Transport.TLS.CAs)) copy(tlsCopy.CAs, policyConfig.Transport.TLS.CAs) log.Debug("received TLS CAs from fleet, applying it") } agentConfig.Transport.TLS = &tlsCopy } } func emptyCertificateConfig() tlscommon.CertificateConfig { return tlscommon.CertificateConfig{} } func (h *PolicyChangeHandler) handlePolicyChange(ctx context.Context, c *config.Config) (err error) { var validationErr error // validate Fleet connectivity with the new configuration var validatedConfig *remote.Config validatedConfig, err = h.validateFleetServerHosts(ctx, c) if err != nil { validationErr = goerrors.Join(validationErr, fmt.Errorf("validating Fleet client config: %w", err)) } // validate agent settings // agent logging loggingConfig, err := validateLoggingConfig(c) if err != nil { validationErr = goerrors.Join(validationErr, fmt.Errorf("validating logging config: %w", err)) } if validationErr != nil { return validationErr } // apply logging configuration err = h.applyLoggingConfig(ctx, loggingConfig) if err != nil { return fmt.Errorf("applying logging config: %w", err) } if validatedConfig != nil { // there's a change in the fleet client settings backupFleetClientCfg := h.config.Fleet.Client // rollback in case of error defer func() { if err != nil { h.config.Fleet.Client = backupFleetClientCfg } }() // modify runtime handler config before saving h.config.Fleet.Client = *validatedConfig } cfg, err := configuration.NewFromConfig(c) if err != nil { return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) } hasEventLoggingOutputChanged := h.hasEventLoggingOutputChanged(cfg) if hasEventLoggingOutputChanged { h.config.Settings.EventLoggingConfig = cfg.Settings.EventLoggingConfig } // persist configuration err = saveConfig(h.agentInfo, h.config, h.store) if err != nil { return fmt.Errorf("saving config: %w", err) } // apply the new Fleet client configuration to the current clients err = h.applyFleetClientConfig(validatedConfig) if err != nil { return fmt.Errorf("applying FleetClientConfig: %w", err) } // If the event logging output has changed, we need to // re-exec the Elastic-Agent to apply the new logging // output. // The new logging configuration has already been persisted // to the disk, the Elastic-Agent will pick it up once it starts. if hasEventLoggingOutputChanged { h.coordinator.ReExec(nil) } return nil } // hasEventLoggingOutputChanged returns true if the output of the event logger has changed func (h *PolicyChangeHandler) hasEventLoggingOutputChanged(new *configuration.Configuration) bool { switch { case h.config.Settings.EventLoggingConfig.ToFiles != new.Settings.EventLoggingConfig.ToFiles: return true case h.config.Settings.EventLoggingConfig.ToStderr != new.Settings.EventLoggingConfig.ToStderr: return true default: return false } } func validateLoggingConfig(cfg *config.Config) (*logger.Config, error) { parsedConfig, err := configuration.NewPartialFromConfigNoDefaults(cfg) if err != nil { return nil, fmt.Errorf("parsing fleet config: %w", err) } if parsedConfig == nil || parsedConfig.Settings == nil || parsedConfig.Settings.LoggingConfig == nil { // no logging config, nothing to do return nil, nil } loggingConfig := parsedConfig.Settings.LoggingConfig logLevel := loggingConfig.Level if logLevel < logp.DebugLevel || logLevel > logp.CriticalLevel { return nil, fmt.Errorf("unrecognized log level %d", logLevel) } return loggingConfig, nil } func (h *PolicyChangeHandler) applyFleetClientConfig(validatedConfig *remote.Config) error { if validatedConfig == nil || len(h.setters) == 0 { // nothing to do for fleet hosts return nil } // the config has already been validated, no need for error handling fleetClient, err := client.NewAuthWithConfig( h.log, h.config.Fleet.AccessAPIKey, *validatedConfig) if err != nil { return fmt.Errorf("creating new fleet client with updated config: %w", err) } for _, setter := range h.setters { setter.SetClient(fleetClient) } return nil } func (h *PolicyChangeHandler) applyLoggingConfig(ctx context.Context, loggingConfig *logger.Config) error { var policyLogLevel *logger.Level if loggingConfig != nil { // we have logging config to set policyLogLevel = &loggingConfig.Level } h.log.Infof("Setting fallback log level %v from policy", policyLogLevel) return h.policyLogLevelSetter.SetLogLevel(ctx, policyLogLevel) } func saveConfig(agentInfo info.Agent, validatedConfig *configuration.Configuration, store storage.Store) error { if validatedConfig == nil { // nothing to do for fleet hosts return nil } reader, err := fleetToReader(agentInfo.AgentID(), agentInfo.Headers(), validatedConfig) if err != nil { return errors.New( err, "fail to persist new Fleet Server API client hosts", errors.TypeUnexpected, errors.M("hosts", validatedConfig.Fleet.Client.Hosts)) } err = store.Save(reader) if err != nil { return errors.New( err, "fail to persist new Fleet Server API client hosts", errors.TypeFilesystem, errors.M("hosts", validatedConfig.Fleet.Client.Hosts)) } return nil } func clientEqual(k1 remote.Config, k2 remote.Config) bool { if k1.Protocol != k2.Protocol { return false } if k1.Path != k2.Path { return false } if k1.Host != k2.Host { return false } sort.Strings(k1.Hosts) sort.Strings(k2.Hosts) if len(k1.Hosts) != len(k2.Hosts) { return false } for i, v := range k1.Hosts { if v != k2.Hosts[i] { return false } } headersEqual := func(h1, h2 httpcommon.ProxyHeaders) bool { if len(h1) != len(h2) { return false } for k, v := range h1 { h2v, found := h2[k] if !found || v != h2v { return false } } return true } // different proxy if k1.Transport.Proxy.URL != k2.Transport.Proxy.URL || k1.Transport.Proxy.Disable != k2.Transport.Proxy.Disable || !headersEqual(k1.Transport.Proxy.Headers, k2.Transport.Proxy.Headers) { return false } return true } func fleetToReader(agentID string, headers map[string]string, cfg *configuration.Configuration) (io.Reader, error) { configToStore := map[string]interface{}{ "fleet": cfg.Fleet, "agent": map[string]interface{}{ // Add event logging configuration here! "id": agentID, "headers": headers, "logging.level": cfg.Settings.LoggingConfig.Level, "logging.event_data.to_files": cfg.Settings.EventLoggingConfig.ToFiles, "logging.event_data.to_stderr": cfg.Settings.EventLoggingConfig.ToStderr, "monitoring.http": cfg.Settings.MonitoringConfig.HTTP, "monitoring.pprof": cfg.Settings.MonitoringConfig.Pprof, }, } data, err := yaml.Marshal(configToStore) if err != nil { return nil, err } return bytes.NewReader(data), nil } type policyChange struct { ctx context.Context cfg *config.Config action fleetapi.Action acker acker.Acker commit bool ackWatcher chan struct{} } func newPolicyChange( ctx context.Context, config *config.Config, action fleetapi.Action, acker acker.Acker, commit bool) *policyChange { var ackWatcher chan struct{} if commit { // we don't need it otherwise ackWatcher = make(chan struct{}) } return &policyChange{ ctx: ctx, cfg: config, action: action, acker: acker, commit: true, ackWatcher: ackWatcher, } } func (l *policyChange) Config() *config.Config { return l.cfg } func (l *policyChange) Ack() error { if l.action == nil { return nil } err := l.acker.Ack(l.ctx, l.action) if err != nil { return err } if l.commit { err := l.acker.Commit(l.ctx) if l.ackWatcher != nil && err == nil { close(l.ackWatcher) } return err } return nil } // WaitAck waits for policy change to be acked. // Policy change ack is awaitable only in case commit flag was set. // Caller is responsible to use any reasonable deadline otherwise // function call can be endlessly blocking. func (l *policyChange) WaitAck(ctx context.Context) { if !l.commit || l.ackWatcher == nil { return } select { case <-l.ackWatcher: case <-ctx.Done(): } } func (l *policyChange) Fail(_ error) { // do nothing }