in internal/pkg/server/fleet.go [88:234]
func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error {
// ctx is cancelled when a SIGTERM or SIGINT is received or when the agent wrappers calls stop because a unit is being stopped/removed.
// Replace context with cancellable ctx
// in order to automatically cancel all the go routines
// that were started in the scope of this function on function exit
ctx, cn := context.WithCancel(ctx)
defer cn()
log := zerolog.Ctx(ctx)
err := initCfg.LoadServerLimits(log)
if err != nil {
return fmt.Errorf("encountered error while loading server limits: %w", err)
}
cacheCfg := config.CopyCache(initCfg)
log.Info().Interface("cfg", cacheCfg).Msg("Setting cache config options")
cache, err := cache.New(cacheCfg, cache.WithLog(log))
if err != nil {
return err
}
f.cache = cache
var curCfg *config.Config
newCfg := initCfg
stop := func(cn context.CancelFunc, g *errgroup.Group) {
if cn != nil {
cn()
}
if g != nil {
err := g.Wait()
if err != nil {
log.Error().Err(err).Msg("error encountered while stopping server")
}
}
}
start := func(ctx context.Context, runfn runFuncCfg, cfg *config.Config, ech chan<- error) (*errgroup.Group, context.CancelFunc) {
ctx, cn = context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
err := runfn(ctx, cfg)
if err != nil {
ech <- err
}
return err
})
return g, cn
}
var (
proCancel, srvCancel context.CancelFunc
proEg, srvEg *errgroup.Group
)
started := false
ech := make(chan error, 2)
LOOP:
for {
if started {
f.reporter.UpdateState(client.UnitStateConfiguring, "Re-configuring", nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
} else {
started = true
f.reporter.UpdateState(client.UnitStateStarting, "Starting", nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
}
err := newCfg.LoadServerLimits(log)
if err != nil {
return fmt.Errorf("encountered error while loading server limits: %w", err)
}
// Create or recreate cache
if configCacheChanged(curCfg, newCfg) {
log.Info().Msg("reconfigure cache on configuration change")
cacheCfg := config.CopyCache(newCfg)
err := f.cache.Reconfigure(cacheCfg)
log.Info().Err(err).Interface("cfg", cacheCfg).Msg("reconfigure cache complete")
if err != nil {
return err
}
}
// Start or restart profiler
if configChangedProfiler(curCfg, newCfg) {
if proCancel != nil {
log.Info().Msg("stopping profiler on configuration change")
stop(proCancel, proEg)
}
proEg, proCancel = nil, nil
if newCfg.Inputs[0].Server.Profiler.Enabled {
log.Info().Msg("starting profiler on configuration change")
proEg, proCancel = start(ctx, func(ctx context.Context, cfg *config.Config) error {
return profile.RunProfiler(ctx, cfg.Inputs[0].Server.Profiler.Bind)
}, newCfg, ech)
}
}
// Start or restart server
if configChangedServer(*log, curCfg, newCfg) {
if srvCancel != nil {
log.Info().Msg("stopping server on configuration change")
stop(srvCancel, srvEg)
select {
case err := <-ech:
log.Debug().Err(err).Msg("Server stopped intercepted expected context cancel error.")
case <-time.After(time.Second * 5):
log.Warn().Msg("Server stopped expected context cancel error missing.")
}
}
log.Info().Msg("starting server on configuration change")
srvEg, srvCancel = start(ctx, func(ctx context.Context, cfg *config.Config) error {
return f.runServer(ctx, cfg)
}, newCfg, ech)
}
curCfg = newCfg
f.l.Lock()
f.cfg = curCfg
f.l.Unlock()
select {
case newCfg = <-f.cfgCh:
log.Info().Msg("Server configuration update")
case err := <-ech:
f.reporter.UpdateState(client.UnitStateFailed, fmt.Sprintf("Error - %s", err), nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
log.Error().Err(err).Msg("Fleet Server failed")
return err
case <-ctx.Done():
f.reporter.UpdateState(client.UnitStateStopping, "Stopping", nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
break LOOP
}
}
// Server is coming down; wait for the server group to exit cleanly.
// Timeout if something is locked up.
err = safeWait(log, srvEg, curCfg.Inputs[0].Server.Timeouts.Drain)
// Eat cancel error to minimize confusion in logs
if errors.Is(err, context.Canceled) {
err = nil
}
log.Info().Err(err).Msg("Fleet Server exited")
return err
}