in internal/pkg/server/fleet.go [442:563]
func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgroup.Group, bulker bulk.Bulk, tracer *apm.Tracer) (err error) {
esCli := bulker.Client()
// Version check is not performed in standalone mode because it is expected that
// standalone Fleet Server may be running with older versions of Elasticsearch.
if !f.standAlone {
// Check version compatibility with Elasticsearch
remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version)
if err != nil {
if len(remoteVersion) != 0 {
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w",
f.bi.Version, remoteVersion, err)
}
return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err)
}
// Migrations are not executed in standalone mode. When needed, they will be executed
// by some external process.
loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
return dl.Migrate(ctx, bulker)
})
if err = loggedMigration(); err != nil {
return fmt.Errorf("failed to run subsystems: %w", err)
}
}
// Run scheduler for periodic GC/cleanup
gcCfg := cfg.Inputs[0].Server.GC
sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval))
if err != nil {
return fmt.Errorf("failed to create elasticsearch GC: %w", err)
}
g.Go(loggedRunFunc(ctx, "Elasticsearch GC", sched.Run))
// Monitoring es client, longer timeout, no retries
monCli, err := es.NewClient(ctx, cfg, true, elasticsearchOptions(
cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi,
)...)
if err != nil {
return err
}
// Policy index monitor
pim, err := monitor.New(dl.FleetPolicies, esCli, monCli,
monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize),
monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout),
monitor.WithAPMTracer(tracer),
monitor.WithDebounceTime(cfg.Inputs[0].Monitor.PolicyDebounceTime),
)
if err != nil {
return err
}
g.Go(loggedRunFunc(ctx, "Policy index monitor", pim.Run))
// Policy monitor
pm := policy.NewMonitor(bulker, pim, cfg.Inputs[0].Server.Limits)
g.Go(loggedRunFunc(ctx, "Policy monitor", pm.Run))
// Policy self monitor
var sm policy.SelfMonitor
if f.standAlone {
sm = policy.NewStandAloneSelfMonitor(bulker, f.reporter)
} else {
sm = policy.NewSelfMonitor(cfg.Fleet, bulker, pim, cfg.Inputs[0].Policy.ID, f.reporter)
}
g.Go(loggedRunFunc(ctx, "Policy self monitor", sm.Run))
// Actions monitoring
am, err := monitor.NewSimple(dl.FleetActions, esCli, monCli,
monitor.WithExpiration(true),
monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize),
monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout),
monitor.WithAPMTracer(tracer),
)
if err != nil {
return err
}
g.Go(loggedRunFunc(ctx, "Action monitor", am.Run))
ad := action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst)
g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run))
bc := checkin.NewBulk(bulker)
g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run))
ct, err := api.NewCheckinT(f.verCon, &cfg.Inputs[0].Server, f.cache, bc, pm, am, ad, bulker)
if err != nil {
return err
}
et, err := api.NewEnrollerT(f.verCon, &cfg.Inputs[0].Server, bulker, f.cache)
if err != nil {
return err
}
at := api.NewArtifactT(&cfg.Inputs[0].Server, bulker, f.cache)
ack := api.NewAckT(&cfg.Inputs[0].Server, bulker, f.cache)
st := api.NewStatusT(&cfg.Inputs[0].Server, bulker, f.cache, api.WithSelfMonitor(sm), api.WithBuildInfo(f.bi))
ut := api.NewUploadT(&cfg.Inputs[0].Server, bulker, monCli, f.cache) // uses no-retry client for bufferless chunk upload
ft := api.NewFileDeliveryT(&cfg.Inputs[0].Server, bulker, monCli, f.cache)
pt := api.NewPGPRetrieverT(&cfg.Inputs[0].Server, bulker, f.cache)
auditT := api.NewAuditT(&cfg.Inputs[0].Server, bulker, f.cache)
for _, endpoint := range (&cfg.Inputs[0].Server).BindEndpoints() {
apiServer := api.NewServer(endpoint, &cfg.Inputs[0].Server,
api.WithCheckin(ct),
api.WithEnroller(et),
api.WithArtifact(at),
api.WithAck(ack),
api.WithStatus(st),
api.WithUpload(ut),
api.WithFileDelivery(ft),
api.WithPGP(pt),
api.WithAudit(auditT),
api.WithTracer(tracer),
)
g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error {
return apiServer.Run(ctx)
}))
}
return nil
}