func()

in internal/pkg/server/fleet.go [88:234]


func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error {
	// ctx is cancelled when a SIGTERM or SIGINT is received or when the agent wrappers calls stop because a unit is being stopped/removed.

	// Replace context with cancellable ctx
	// in order to automatically cancel all the go routines
	// that were started in the scope of this function on function exit
	ctx, cn := context.WithCancel(ctx)
	defer cn()

	log := zerolog.Ctx(ctx)
	err := initCfg.LoadServerLimits(log)
	if err != nil {
		return fmt.Errorf("encountered error while loading server limits: %w", err)
	}
	cacheCfg := config.CopyCache(initCfg)
	log.Info().Interface("cfg", cacheCfg).Msg("Setting cache config options")
	cache, err := cache.New(cacheCfg, cache.WithLog(log))
	if err != nil {
		return err
	}
	f.cache = cache

	var curCfg *config.Config
	newCfg := initCfg

	stop := func(cn context.CancelFunc, g *errgroup.Group) {
		if cn != nil {
			cn()
		}
		if g != nil {
			err := g.Wait()
			if err != nil {
				log.Error().Err(err).Msg("error encountered while stopping server")
			}
		}
	}

	start := func(ctx context.Context, runfn runFuncCfg, cfg *config.Config, ech chan<- error) (*errgroup.Group, context.CancelFunc) {
		ctx, cn = context.WithCancel(ctx)
		g, ctx := errgroup.WithContext(ctx)

		g.Go(func() error {
			err := runfn(ctx, cfg)
			if err != nil {
				ech <- err
			}
			return err
		})
		return g, cn
	}

	var (
		proCancel, srvCancel context.CancelFunc
		proEg, srvEg         *errgroup.Group
	)

	started := false
	ech := make(chan error, 2)

LOOP:
	for {
		if started {
			f.reporter.UpdateState(client.UnitStateConfiguring, "Re-configuring", nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
		} else {
			started = true
			f.reporter.UpdateState(client.UnitStateStarting, "Starting", nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
		}

		err := newCfg.LoadServerLimits(log)
		if err != nil {
			return fmt.Errorf("encountered error while loading server limits: %w", err)
		}

		// Create or recreate cache
		if configCacheChanged(curCfg, newCfg) {
			log.Info().Msg("reconfigure cache on configuration change")
			cacheCfg := config.CopyCache(newCfg)
			err := f.cache.Reconfigure(cacheCfg)
			log.Info().Err(err).Interface("cfg", cacheCfg).Msg("reconfigure cache complete")
			if err != nil {
				return err
			}
		}

		// Start or restart profiler
		if configChangedProfiler(curCfg, newCfg) {
			if proCancel != nil {
				log.Info().Msg("stopping profiler on configuration change")
				stop(proCancel, proEg)
			}
			proEg, proCancel = nil, nil
			if newCfg.Inputs[0].Server.Profiler.Enabled {
				log.Info().Msg("starting profiler on configuration change")
				proEg, proCancel = start(ctx, func(ctx context.Context, cfg *config.Config) error {
					return profile.RunProfiler(ctx, cfg.Inputs[0].Server.Profiler.Bind)
				}, newCfg, ech)
			}
		}

		// Start or restart server
		if configChangedServer(*log, curCfg, newCfg) {
			if srvCancel != nil {
				log.Info().Msg("stopping server on configuration change")
				stop(srvCancel, srvEg)
				select {
				case err := <-ech:
					log.Debug().Err(err).Msg("Server stopped intercepted expected context cancel error.")
				case <-time.After(time.Second * 5):
					log.Warn().Msg("Server stopped expected context cancel error missing.")
				}
			}
			log.Info().Msg("starting server on configuration change")
			srvEg, srvCancel = start(ctx, func(ctx context.Context, cfg *config.Config) error {
				return f.runServer(ctx, cfg)
			}, newCfg, ech)
		}

		curCfg = newCfg
		f.l.Lock()
		f.cfg = curCfg
		f.l.Unlock()

		select {
		case newCfg = <-f.cfgCh:
			log.Info().Msg("Server configuration update")
		case err := <-ech:
			f.reporter.UpdateState(client.UnitStateFailed, fmt.Sprintf("Error - %s", err), nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
			log.Error().Err(err).Msg("Fleet Server failed")
			return err
		case <-ctx.Done():
			f.reporter.UpdateState(client.UnitStateStopping, "Stopping", nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
			break LOOP
		}
	}

	// Server is coming down; wait for the server group to exit cleanly.
	// Timeout if something is locked up.
	err = safeWait(log, srvEg, curCfg.Inputs[0].Server.Timeouts.Drain)

	// Eat cancel error to minimize confusion in logs
	if errors.Is(err, context.Canceled) {
		err = nil
	}

	log.Info().Err(err).Msg("Fleet Server exited")
	return err
}