func()

in internal/pkg/server/fleet.go [442:563]


func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgroup.Group, bulker bulk.Bulk, tracer *apm.Tracer) (err error) {
	esCli := bulker.Client()

	// Version check is not performed in standalone mode because it is expected that
	// standalone Fleet Server may be running with older versions of Elasticsearch.
	if !f.standAlone {
		// Check version compatibility with Elasticsearch
		remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version)
		if err != nil {
			if len(remoteVersion) != 0 {
				return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w",
					f.bi.Version, remoteVersion, err)
			}
			return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err)
		}

		// Migrations are not executed in standalone mode. When needed, they will be executed
		// by some external process.
		loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
			return dl.Migrate(ctx, bulker)
		})
		if err = loggedMigration(); err != nil {
			return fmt.Errorf("failed to run subsystems: %w", err)
		}
	}

	// Run scheduler for periodic GC/cleanup
	gcCfg := cfg.Inputs[0].Server.GC
	sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval))
	if err != nil {
		return fmt.Errorf("failed to create elasticsearch GC: %w", err)
	}
	g.Go(loggedRunFunc(ctx, "Elasticsearch GC", sched.Run))

	// Monitoring es client, longer timeout, no retries
	monCli, err := es.NewClient(ctx, cfg, true, elasticsearchOptions(
		cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi,
	)...)
	if err != nil {
		return err
	}

	// Policy index monitor
	pim, err := monitor.New(dl.FleetPolicies, esCli, monCli,
		monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize),
		monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout),
		monitor.WithAPMTracer(tracer),
		monitor.WithDebounceTime(cfg.Inputs[0].Monitor.PolicyDebounceTime),
	)
	if err != nil {
		return err
	}
	g.Go(loggedRunFunc(ctx, "Policy index monitor", pim.Run))

	// Policy monitor
	pm := policy.NewMonitor(bulker, pim, cfg.Inputs[0].Server.Limits)
	g.Go(loggedRunFunc(ctx, "Policy monitor", pm.Run))

	// Policy self monitor
	var sm policy.SelfMonitor
	if f.standAlone {
		sm = policy.NewStandAloneSelfMonitor(bulker, f.reporter)
	} else {
		sm = policy.NewSelfMonitor(cfg.Fleet, bulker, pim, cfg.Inputs[0].Policy.ID, f.reporter)
	}
	g.Go(loggedRunFunc(ctx, "Policy self monitor", sm.Run))

	// Actions monitoring
	am, err := monitor.NewSimple(dl.FleetActions, esCli, monCli,
		monitor.WithExpiration(true),
		monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize),
		monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout),
		monitor.WithAPMTracer(tracer),
	)
	if err != nil {
		return err
	}
	g.Go(loggedRunFunc(ctx, "Action monitor", am.Run))

	ad := action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst)
	g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run))

	bc := checkin.NewBulk(bulker)
	g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run))

	ct, err := api.NewCheckinT(f.verCon, &cfg.Inputs[0].Server, f.cache, bc, pm, am, ad, bulker)
	if err != nil {
		return err
	}
	et, err := api.NewEnrollerT(f.verCon, &cfg.Inputs[0].Server, bulker, f.cache)
	if err != nil {
		return err
	}

	at := api.NewArtifactT(&cfg.Inputs[0].Server, bulker, f.cache)
	ack := api.NewAckT(&cfg.Inputs[0].Server, bulker, f.cache)
	st := api.NewStatusT(&cfg.Inputs[0].Server, bulker, f.cache, api.WithSelfMonitor(sm), api.WithBuildInfo(f.bi))
	ut := api.NewUploadT(&cfg.Inputs[0].Server, bulker, monCli, f.cache) // uses no-retry client for bufferless chunk upload
	ft := api.NewFileDeliveryT(&cfg.Inputs[0].Server, bulker, monCli, f.cache)
	pt := api.NewPGPRetrieverT(&cfg.Inputs[0].Server, bulker, f.cache)
	auditT := api.NewAuditT(&cfg.Inputs[0].Server, bulker, f.cache)

	for _, endpoint := range (&cfg.Inputs[0].Server).BindEndpoints() {
		apiServer := api.NewServer(endpoint, &cfg.Inputs[0].Server,
			api.WithCheckin(ct),
			api.WithEnroller(et),
			api.WithArtifact(at),
			api.WithAck(ack),
			api.WithStatus(st),
			api.WithUpload(ut),
			api.WithFileDelivery(ft),
			api.WithPGP(pt),
			api.WithAudit(auditT),
			api.WithTracer(tracer),
		)
		g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error {
			return apiServer.Run(ctx)
		}))
	}

	return nil
}