internal/launcher/launcher.go (218 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Config is put into a different package to prevent cyclic imports in case // it is needed in several locations package launcher import ( "errors" "fmt" "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/go-ucfg" "github.com/elastic/cloudbeat/internal/infra/clog" ) const ( reconfigureWaitTimeout = 10 * time.Minute // Time to wait for the beater to stop before ignoring it shutdownGracePeriod = 20 * time.Second ) // ErrStopSignal is used to indicate we got a stop signal var ErrStopSignal = errors.New("stop beater") // ErrGracefulExit is used when the launcher stops before shutdownGracePeriod, after waiting for the beater to stop var ErrGracefulExit = beat.GracefulExit // ErrTimeoutExit is used when the launcher stops after shutdownGracePeriod, without waiting for the beater to stop var ErrTimeoutExit = errors.New("exit after timeout") type launcher struct { wg sync.WaitGroup // WaitGroup used to wait for active beaters beater beat.Beater beaterErr chan error reloader Reloader log *clog.Logger latest *config.C beat *beat.Beat creator beat.Creator validator Validator name string } type Reloader interface { Channel() <-chan *config.C Stop() } type Validator interface { Validate(*config.C) error } func New(log *clog.Logger, name string, reloader Reloader, validator Validator, creator beat.Creator, cfg *config.C) beat.Beater { return &launcher{ beaterErr: make(chan error, 1), wg: sync.WaitGroup{}, log: log, name: name, reloader: reloader, validator: validator, creator: creator, latest: cfg, } } func (l *launcher) Run(b *beat.Beat) error { // Configure the beats Manager to start after all the reloadable hooks are initialized // and shutdown when the function returns. l.beat = b if err := b.Manager.Start(); err != nil { return err } // Wait for Fleet-side reconfiguration only if beater is running in Agent-managed mode. if b.Manager.Enabled() { defer b.Manager.Stop() l.log.Infof("Waiting for initial reconfiguration from Fleet server...") update, err := l.reconfigureWait(reconfigureWaitTimeout) if err != nil { l.log.Errorf("Failed while waiting for the initial reconfiguration from Fleet server: %v", err) return err } if err := l.configUpdate(update); err != nil { return fmt.Errorf("failed to update with initial reconfiguration from Fleet server: %w", err) } } err := l.run() return err } func (l *launcher) run() error { err := l.runLoop() switch { case errors.Is(err, ErrGracefulExit): l.log.Info("Launcher stopped successfully") case errors.Is(err, ErrTimeoutExit): l.log.Info("Launcher stopped after timeout") case err == nil: // unexpected default: l.log.Errorf("Launcher stopped by error: %v", err) } l.reloader.Stop() return err } // runLoop is the loop that keeps the launcher alive func (l *launcher) runLoop() error { l.log.Info("Launcher is running") for { // Run a new beater err := l.runBeater() if err != nil { return fmt.Errorf("launcher could not run Beater: %v", err) } // Wait for something to happen: // config update (val, nil) // stop signal (nil, ErrStopSignal) // beater error (nil, err) cfg, err := l.waitForUpdates() if isConfigUpdate(cfg, err) { l.stopBeater() err = l.configUpdate(cfg) if err != nil { return fmt.Errorf("failed to update Beater config: %w", err) } l.log.Infof("Restart %s with the new configuration of %d keys", l.name, len(l.latest.FlattenedKeys())) continue } if isStopSignal(cfg, err) { return l.stopBeaterWithTimeout(shutdownGracePeriod) } if isBeaterError(cfg, err) { return err } } } func (l *launcher) Stop() { l.log.Info("Launcher is about to shut down gracefully") close(l.beaterErr) } // runBeater creates a new beater and starts a goroutine for running it. // It is protected from panics and ship errors back to beaterErr func (l *launcher) runBeater() error { l.log.Infof("Launcher is creating a new %s", l.name) var err error l.beater, err = l.creator(l.beat, l.latest) if err != nil { return fmt.Errorf("could not create beater: %w", err) } l.wg.Add(1) go func() { defer func() { if r := recover(); r != nil { l.beaterErr <- fmt.Errorf("beater panic recovered: %s", r) } }() defer l.wg.Done() l.log.Infof("Launcher is running the new created %s", l.name) err := l.beater.Run(l.beat) if err != nil { l.beaterErr <- fmt.Errorf("beater returned an error: %w", err) } l.log.Infof("%s run has finished", l.name) }() return nil } func (l *launcher) stopBeater() { l.log.Infof("Launcher is shutting %s down gracefully", l.name) l.beater.Stop() // By waiting to the wait group, we make sure that the old beater has really stopped l.wg.Wait() l.log.Infof("Launcher shut %s down gracefully", l.name) } // Returns an error indicating if the beater was stopped gracefully or not func (l *launcher) stopBeaterWithTimeout(duration time.Duration) error { l.log.Infof("Launcher is shutting %s down gracefully", l.name) l.beater.Stop() wgCh := make(chan struct{}) go func() { // By waiting to the wait group, we make sure that the old beater has really stopped l.wg.Wait() close(wgCh) }() select { case <-time.After(duration): l.log.Infof("Grace period for %s ended", l.name) return ErrTimeoutExit case <-wgCh: l.log.Infof("Launcher shut %s down gracefully", l.name) return ErrGracefulExit } } // waitForUpdates is the function that keeps Launcher runLoop busy. // It will finish for one of following reasons: // 1. The Stop function got called (nil, ErrStopSignal) // 2. The beater run has returned (nil, err) // 3. A config update received (val, nil) func (l *launcher) waitForUpdates() (*config.C, error) { select { case err, ok := <-l.beaterErr: if !ok { l.log.Infof("Launcher received a stop signal") return nil, ErrStopSignal } return nil, err case update, ok := <-l.reloader.Channel(): if !ok { return nil, errors.New("reloader channel unexpectedly closed") } l.log.Infof("Launcher will restart %s to apply the configuration update", l.name) return update, nil } } func isConfigUpdate(cfg *config.C, err error) bool { return cfg != nil && err == nil } func isStopSignal(cfg *config.C, err error) bool { return cfg == nil && errors.Is(err, ErrStopSignal) } func isBeaterError(cfg *config.C, err error) bool { return cfg == nil && err != nil } // configUpdate applies incoming reconfiguration from the Fleet server to the beater config func (l *launcher) configUpdate(update *config.C) error { l.log.Infof("Merging config update from fleet with %d keys", len(update.FlattenedKeys())) return l.latest.MergeWithOpts(update, ucfg.ReplaceArrValues) } // reconfigureWait will wait for and consume incoming reconfiguration from the Fleet server, and keep // discarding them until the incoming config contains the necessary information to start beater // properly, thereafter returning the valid config. func (l *launcher) reconfigureWait(timeout time.Duration) (*config.C, error) { start := time.Now() timer := time.After(timeout) for { select { case <-l.beaterErr: return nil, errors.New("error channel closed") case <-timer: return nil, fmt.Errorf("timed out waiting for reconfiguration after %s", time.Since(start)) case update, ok := <-l.reloader.Channel(): if !ok { return nil, errors.New("reconfiguration channel is closed") } if l.validator != nil { err := l.validator.Validate(update) if err != nil { l.log.Errorf("Config update validation failed: %v", err) healthErr := &BeaterUnhealthyError{} if errors.As(err, healthErr) { l.beat.Manager.UpdateStatus(status.Degraded, healthErr.Error()) } continue } } l.log.Infof("Received valid reconfiguration after waiting for %s", time.Since(start)) return update, nil } } }