internal/pkg/server/fleet.go (469 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package server import ( "context" "errors" "fmt" "os" "reflect" "runtime/debug" "sync" "time" "go.elastic.co/apm/v2" apmtransport "go.elastic.co/apm/v2/transport" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/fleet-server/v7/internal/pkg/action" "github.com/elastic/fleet-server/v7/internal/pkg/api" "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/gc" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/policy" "github.com/elastic/fleet-server/v7/internal/pkg/profile" "github.com/elastic/fleet-server/v7/internal/pkg/scheduler" "github.com/elastic/fleet-server/v7/internal/pkg/state" "github.com/elastic/fleet-server/v7/internal/pkg/ver" "github.com/hashicorp/go-version" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) const kUAFleetServer = "Fleet-Server" // Fleet is an instance of the fleet-server. type Fleet struct { standAlone bool bi build.Info verCon version.Constraints cfgCh chan *config.Config cache cache.Cache reporter state.Reporter // Used for diagnostics reporting l sync.RWMutex cfg *config.Config } // NewFleet creates the actual fleet server service. func NewFleet(bi build.Info, reporter state.Reporter, standAlone bool) (*Fleet, error) { verCon, err := api.BuildVersionConstraint(bi.Version) if err != nil { return nil, err } return &Fleet{ standAlone: standAlone, bi: bi, verCon: verCon, cfgCh: make(chan *config.Config, 1), reporter: reporter, }, nil } type runFunc func(context.Context) error type runFuncCfg func(context.Context, *config.Config) error func (f *Fleet) GetConfig() *config.Config { f.l.RLock() defer f.l.RUnlock() return f.cfg } // Run runs the fleet server func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error { // ctx is cancelled when a SIGTERM or SIGINT is received or when the agent wrappers calls stop because a unit is being stopped/removed. // Replace context with cancellable ctx // in order to automatically cancel all the go routines // that were started in the scope of this function on function exit ctx, cn := context.WithCancel(ctx) defer cn() log := zerolog.Ctx(ctx) err := initCfg.LoadServerLimits(log) if err != nil { return fmt.Errorf("encountered error while loading server limits: %w", err) } cacheCfg := config.CopyCache(initCfg) log.Info().Interface("cfg", cacheCfg).Msg("Setting cache config options") cache, err := cache.New(cacheCfg, cache.WithLog(log)) if err != nil { return err } f.cache = cache var curCfg *config.Config newCfg := initCfg stop := func(cn context.CancelFunc, g *errgroup.Group) { if cn != nil { cn() } if g != nil { err := g.Wait() if err != nil { log.Error().Err(err).Msg("error encountered while stopping server") } } } start := func(ctx context.Context, runfn runFuncCfg, cfg *config.Config, ech chan<- error) (*errgroup.Group, context.CancelFunc) { ctx, cn = context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { err := runfn(ctx, cfg) if err != nil { ech <- err } return err }) return g, cn } var ( proCancel, srvCancel context.CancelFunc proEg, srvEg *errgroup.Group ) started := false ech := make(chan error, 2) LOOP: for { if started { f.reporter.UpdateState(client.UnitStateConfiguring, "Re-configuring", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? } else { started = true f.reporter.UpdateState(client.UnitStateStarting, "Starting", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? } err := newCfg.LoadServerLimits(log) if err != nil { return fmt.Errorf("encountered error while loading server limits: %w", err) } // Create or recreate cache if configCacheChanged(curCfg, newCfg) { log.Info().Msg("reconfigure cache on configuration change") cacheCfg := config.CopyCache(newCfg) err := f.cache.Reconfigure(cacheCfg) log.Info().Err(err).Interface("cfg", cacheCfg).Msg("reconfigure cache complete") if err != nil { return err } } // Start or restart profiler if configChangedProfiler(curCfg, newCfg) { if proCancel != nil { log.Info().Msg("stopping profiler on configuration change") stop(proCancel, proEg) } proEg, proCancel = nil, nil if newCfg.Inputs[0].Server.Profiler.Enabled { log.Info().Msg("starting profiler on configuration change") proEg, proCancel = start(ctx, func(ctx context.Context, cfg *config.Config) error { return profile.RunProfiler(ctx, cfg.Inputs[0].Server.Profiler.Bind) }, newCfg, ech) } } // Start or restart server if configChangedServer(*log, curCfg, newCfg) { if srvCancel != nil { log.Info().Msg("stopping server on configuration change") stop(srvCancel, srvEg) select { case err := <-ech: log.Debug().Err(err).Msg("Server stopped intercepted expected context cancel error.") case <-time.After(time.Second * 5): log.Warn().Msg("Server stopped expected context cancel error missing.") } } log.Info().Msg("starting server on configuration change") srvEg, srvCancel = start(ctx, func(ctx context.Context, cfg *config.Config) error { return f.runServer(ctx, cfg) }, newCfg, ech) } curCfg = newCfg f.l.Lock() f.cfg = curCfg f.l.Unlock() select { case newCfg = <-f.cfgCh: log.Info().Msg("Server configuration update") case err := <-ech: f.reporter.UpdateState(client.UnitStateFailed, fmt.Sprintf("Error - %s", err), nil) //nolint:errcheck // unclear on what should we do if updating the status fails? log.Error().Err(err).Msg("Fleet Server failed") return err case <-ctx.Done(): f.reporter.UpdateState(client.UnitStateStopping, "Stopping", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? break LOOP } } // Server is coming down; wait for the server group to exit cleanly. // Timeout if something is locked up. err = safeWait(log, srvEg, curCfg.Inputs[0].Server.Timeouts.Drain) // Eat cancel error to minimize confusion in logs if errors.Is(err, context.Canceled) { err = nil } log.Info().Err(err).Msg("Fleet Server exited") return err } func configChangedProfiler(curCfg, newCfg *config.Config) bool { changed := true switch { case curCfg == nil: case curCfg.Inputs[0].Server.Profiler.Enabled != newCfg.Inputs[0].Server.Profiler.Enabled: case curCfg.Inputs[0].Server.Profiler.Bind != newCfg.Inputs[0].Server.Profiler.Bind: default: changed = false } return changed } func configCacheChanged(curCfg, newCfg *config.Config) bool { if curCfg == nil { return false } return curCfg.Inputs[0].Cache != newCfg.Inputs[0].Cache } func configChangedServer(zlog zerolog.Logger, curCfg, newCfg *config.Config) bool { changed := true switch { case curCfg == nil: zlog.Info().Msg("initial server configuration") case !reflect.DeepEqual(curCfg.Fleet.CopyNoLogging(), newCfg.Fleet.CopyNoLogging()): zlog.Info().Msg("fleet configuration has changed") case !reflect.DeepEqual(curCfg.Output, newCfg.Output): zlog.Info().Msg("output configuration has changed") case !reflect.DeepEqual(curCfg.Inputs[0].Server, newCfg.Inputs[0].Server): zlog.Info().Msg("server configuration has changed") default: changed = false } return changed } func safeWait(log *zerolog.Logger, g *errgroup.Group, to time.Duration) error { var err error waitCh := make(chan error) go func() { waitCh <- g.Wait() }() select { case err = <-waitCh: case <-time.After(to): log.Warn().Msg("deadlock: goroutine locked up on errgroup.Wait()") err = errors.New("group wait timeout") } return err } func loggedRunFunc(ctx context.Context, tag string, runfn runFunc) func() error { log := zerolog.Ctx(ctx) return func() error { log.Debug().Msg(tag + " started") err := runfn(ctx) lvl := zerolog.DebugLevel switch { case err == nil: case errors.Is(err, context.Canceled): err = nil default: lvl = zerolog.ErrorLevel } log.WithLevel(lvl).Err(err).Msg(tag + " exited") return err } } func initRuntime(log *zerolog.Logger, cfg *config.Config) { gcPercent := cfg.Inputs[0].Server.Runtime.GCPercent if gcPercent != 0 { old := debug.SetGCPercent(gcPercent) log.Info(). Int("old", old). Int("new", gcPercent). Msg("SetGCPercent") } memoryLimit := cfg.Inputs[0].Server.Runtime.MemoryLimit if memoryLimit != 0 { old := debug.SetMemoryLimit(memoryLimit) log.Info(). Int64("old", old). Int64("new", memoryLimit). Msg("SetMemoryLimit") } } func (f *Fleet) initBulker(ctx context.Context, tracer *apm.Tracer, cfg *config.Config) (*bulk.Bulker, error) { es, err := es.NewClient(ctx, cfg, false, elasticsearchOptions( cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi, )...) if err != nil { return nil, err } bulkOpts := bulk.BulkOptsFromCfg(cfg) bulkOpts = append(bulkOpts, bulk.WithBi(f.bi)) blk := bulk.NewBulker(es, tracer, bulkOpts...) return blk, nil } func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { initRuntime(zerolog.Ctx(ctx), cfg) // Create the APM tracer. tracer, err := f.initTracer(ctx, cfg.Inputs[0].Server.Instrumentation) if err != nil { return err } // The metricsServer is only enabled if http.enabled is set in the config metricsServer, err := api.InitMetrics(ctx, cfg, f.bi, tracer) switch { case err != nil: return err case metricsServer != nil: defer func() { _ = metricsServer.Stop() }() } // Bulker is started in its own context and managed in the scope of this function. This is done so // when the `ctx` is cancelled, the bulker will remain executing until this function exits. // This allows the child subsystems to continue to write to the data store while tearing down. bulkCtx, bulkCancel := context.WithCancel(context.Background()) defer bulkCancel() // Create the bulker subsystem bulker, err := f.initBulker(bulkCtx, tracer, cfg) if err != nil { return err } // Execute the bulker engine in a goroutine with its orphaned context. // Create an error channel for the case where the bulker exits // unexpectedly (ie. not cancelled by the bulkCancel context). errCh := make(chan error) go func() { runFunc := loggedRunFunc(bulkCtx, "Bulker", bulker.Run) // Emit the error from bulker.Run to the local error channel. // The error group will be listening for it. (see comments below) errCh <- runFunc() }() // Wrap context with an error group context to manage the lifecycle // of the subsystems. An error from any subsystem, or if the // parent context is cancelled, will cancel the group. // see https://pkg.go.dev/golang.org/x/sync/errgroup#Group.Go g, ctx := errgroup.WithContext(ctx) // Stub a function for inclusion in the errgroup that exits when // the bulker exits. If the bulker exits before the error group, // this will tear down the error group and g.Wait() will return. // Otherwise it will be a noop. //nolint:nakedret // small function is easy to track g.Go(func() (err error) { select { case err = <-errCh: case <-ctx.Done(): err = ctx.Err() } return }) if tracer != nil { go func() { <-ctx.Done() zerolog.Ctx(ctx).Info().Msg("flushing instrumentation tracer...") tracer.Flush(nil) tracer.Close() }() } if err = f.runSubsystems(ctx, cfg, g, bulker, tracer); err != nil { return err } return g.Wait() } // runSubsystems starts all other subsystems for fleet-server // we assume bulker.Run is called in another goroutine, it's ctx is not the same ctx passed into runSubsystems and used with the passed errgroup. // however if the bulker returns an error, the passed errgroup is canceled. // runSubsystems will also do an ES version check and run migrations if started in agent-mode // The started subsystems are: // - Elasticsearch GC - cleanup expired fleet actions // - Policy Index Monitor - track new documents in the .fleet-policies index // - Policy Monitor - parse .fleet-policies docuuments into usable policies // - Policy Self Monitor - report fleet-server health status based on .fleet-policies index // - Action Monitor - track new documents in the .fleet-actions index // - Action Dispatcher - send actions from the .fleet-actions index to agents that check in // - Bulk Checkin handler - batches agent checkin messages to _bulk endpoint, minimizes changed attributes // - HTTP APIs - start http server on 8220 (default) for external agents, and on 8221 (default) for managing agent in agent-mode or local communications. func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgroup.Group, bulker bulk.Bulk, tracer *apm.Tracer) (err error) { esCli := bulker.Client() // Version check is not performed in standalone mode because it is expected that // standalone Fleet Server may be running with older versions of Elasticsearch. if !f.standAlone { // Check version compatibility with Elasticsearch remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version) if err != nil { if len(remoteVersion) != 0 { return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err) } return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err) } // Migrations are not executed in standalone mode. When needed, they will be executed // by some external process. loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { return dl.Migrate(ctx, bulker) }) if err = loggedMigration(); err != nil { return fmt.Errorf("failed to run subsystems: %w", err) } } // Run scheduler for periodic GC/cleanup gcCfg := cfg.Inputs[0].Server.GC sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval)) if err != nil { return fmt.Errorf("failed to create elasticsearch GC: %w", err) } g.Go(loggedRunFunc(ctx, "Elasticsearch GC", sched.Run)) // Monitoring es client, longer timeout, no retries monCli, err := es.NewClient(ctx, cfg, true, elasticsearchOptions( cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi, )...) if err != nil { return err } // Policy index monitor pim, err := monitor.New(dl.FleetPolicies, esCli, monCli, monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), monitor.WithAPMTracer(tracer), monitor.WithDebounceTime(cfg.Inputs[0].Monitor.PolicyDebounceTime), ) if err != nil { return err } g.Go(loggedRunFunc(ctx, "Policy index monitor", pim.Run)) // Policy monitor pm := policy.NewMonitor(bulker, pim, cfg.Inputs[0].Server.Limits) g.Go(loggedRunFunc(ctx, "Policy monitor", pm.Run)) // Policy self monitor var sm policy.SelfMonitor if f.standAlone { sm = policy.NewStandAloneSelfMonitor(bulker, f.reporter) } else { sm = policy.NewSelfMonitor(cfg.Fleet, bulker, pim, cfg.Inputs[0].Policy.ID, f.reporter) } g.Go(loggedRunFunc(ctx, "Policy self monitor", sm.Run)) // Actions monitoring am, err := monitor.NewSimple(dl.FleetActions, esCli, monCli, monitor.WithExpiration(true), monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), monitor.WithAPMTracer(tracer), ) if err != nil { return err } g.Go(loggedRunFunc(ctx, "Action monitor", am.Run)) ad := action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst) g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run)) bc := checkin.NewBulk(bulker) g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run)) ct, err := api.NewCheckinT(f.verCon, &cfg.Inputs[0].Server, f.cache, bc, pm, am, ad, bulker) if err != nil { return err } et, err := api.NewEnrollerT(f.verCon, &cfg.Inputs[0].Server, bulker, f.cache) if err != nil { return err } at := api.NewArtifactT(&cfg.Inputs[0].Server, bulker, f.cache) ack := api.NewAckT(&cfg.Inputs[0].Server, bulker, f.cache) st := api.NewStatusT(&cfg.Inputs[0].Server, bulker, f.cache, api.WithSelfMonitor(sm), api.WithBuildInfo(f.bi)) ut := api.NewUploadT(&cfg.Inputs[0].Server, bulker, monCli, f.cache) // uses no-retry client for bufferless chunk upload ft := api.NewFileDeliveryT(&cfg.Inputs[0].Server, bulker, monCli, f.cache) pt := api.NewPGPRetrieverT(&cfg.Inputs[0].Server, bulker, f.cache) auditT := api.NewAuditT(&cfg.Inputs[0].Server, bulker, f.cache) for _, endpoint := range (&cfg.Inputs[0].Server).BindEndpoints() { apiServer := api.NewServer(endpoint, &cfg.Inputs[0].Server, api.WithCheckin(ct), api.WithEnroller(et), api.WithArtifact(at), api.WithAck(ack), api.WithStatus(st), api.WithUpload(ut), api.WithFileDelivery(ft), api.WithPGP(pt), api.WithAudit(auditT), api.WithTracer(tracer), ) g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error { return apiServer.Run(ctx) })) } return nil } // Reload reloads the fleet server with the latest configuration. func (f *Fleet) Reload(ctx context.Context, cfg *config.Config) error { select { case f.cfgCh <- cfg: case <-ctx.Done(): } return nil } const envAPMActive = "ELASTIC_APM_ACTIVE" func (f *Fleet) initTracer(ctx context.Context, cfg config.Instrumentation) (*apm.Tracer, error) { if !cfg.Enabled && os.Getenv(envAPMActive) != "true" { return nil, nil } zerolog.Ctx(ctx).Info().Msg("fleet-server instrumentation is enabled") // Use env vars to configure additional APM settings. const ( envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS" envTransactionSampleRate = "ELASTIC_APM_TRANSACTION_SAMPLE_RATE" ) if cfg.GlobalLabels != "" { os.Setenv(envGlobalLabels, cfg.GlobalLabels) defer os.Unsetenv(envGlobalLabels) } if cfg.TransactionSampleRate != "" { os.Setenv(envTransactionSampleRate, cfg.TransactionSampleRate) defer os.Unsetenv(envTransactionSampleRate) } options, err := cfg.APMHTTPTransportOptions() if err != nil { return nil, err } transport, err := apmtransport.NewHTTPTransport(options) if err != nil { return nil, err } return apm.NewTracerOptions(apm.TracerOptions{ ServiceName: "fleet-server", ServiceVersion: f.bi.Version, ServiceEnvironment: cfg.Environment, Transport: transport, }) } func elasticsearchOptions(instumented bool, bi build.Info) []es.ConfigOption { options := []es.ConfigOption{es.WithUserAgent(kUAFleetServer, bi)} if instumented { options = append(options, es.InstrumentRoundTripper()) } return options }