internal/pkg/agent/cmd/watch.go (189 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package cmd import ( "context" "fmt" "os" "os/signal" "runtime" "syscall" "time" "github.com/spf13/cobra" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/configure" "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/cli" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/version" ) const ( watcherName = "elastic-agent-watcher" watcherLockFile = "watcher.lock" ) func newWatchCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command { cmd := &cobra.Command{ Use: "watch", Short: "Watch the Elastic Agent for failures and initiate rollback", Long: `This command watches Elastic Agent for failures and initiates rollback if necessary.`, Run: func(_ *cobra.Command, _ []string) { cfg := getConfig(streams) log, err := configuredLogger(cfg, watcherName) if err != nil { fmt.Fprintf(streams.Err, "Error configuring logger: %v\n%s\n", err, troubleshootMessage()) os.Exit(3) } // Make sure to flush any buffered logs before we're done. defer log.Sync() //nolint:errcheck // flushing buffered logs is best effort. if err := watchCmd(log, cfg); err != nil { log.Errorw("Watch command failed", "error.message", err) fmt.Fprintf(streams.Err, "Watch command failed: %v\n%s\n", err, troubleshootMessage()) os.Exit(4) } }, } return cmd } func watchCmd(log *logp.Logger, cfg *configuration.Configuration) error { log.Infow("Upgrade Watcher started", "process.pid", os.Getpid(), "agent.version", version.GetAgentPackageVersion()) marker, err := upgrade.LoadMarker(paths.Data()) if err != nil { log.Error("failed to load marker", err) return err } if marker == nil { // no marker found we're not in upgrade process log.Infof("update marker not present at '%s'", paths.Data()) return nil } log.Infof("Loaded update marker %+v", marker) locker := filelock.NewAppLocker(paths.Top(), watcherLockFile) if err := locker.TryLock(); err != nil { if errors.Is(err, filelock.ErrAppAlreadyRunning) { log.Info("exiting, lock already exists") return nil } log.Error("failed to acquire lock", err) return err } defer func() { _ = locker.Unlock() }() isWithinGrace, tilGrace := gracePeriod(marker, cfg.Settings.Upgrade.Watcher.GracePeriod) if !isWithinGrace { log.Infof("not within grace [updatedOn %v] %v", marker.UpdatedOn.String(), time.Since(marker.UpdatedOn).String()) // if it is started outside of upgrade loop // if we're not within grace and marker is still there it might mean // that cleanup was not performed ok, cleanup everything except current version // hash is the same as hash of agent which initiated watcher. if err := upgrade.Cleanup(log, paths.Top(), paths.VersionedHome(paths.Top()), release.ShortCommit(), true, false); err != nil { log.Error("clean up of prior watcher run failed", err) } // exit nicely return nil } // About to start watching the upgrade. Initialize upgrade details and save them in the // upgrade marker. upgradeDetails := initUpgradeDetails(marker, upgrade.SaveMarker, log) errorCheckInterval := cfg.Settings.Upgrade.Watcher.ErrorCheck.Interval ctx := context.Background() if err := watch(ctx, tilGrace, errorCheckInterval, log); err != nil { log.Error("Error detected, proceeding to rollback: %v", err) upgradeDetails.SetState(details.StateRollback) err = upgrade.Rollback(ctx, log, client.New(), paths.Top(), marker.PrevVersionedHome, marker.PrevHash) if err != nil { log.Error("rollback failed", err) upgradeDetails.Fail(err) } return err } // watch succeeded - upgrade was successful! upgradeDetails.SetState(details.StateCompleted) // cleanup older versions, // in windows it might leave self untouched, this will get cleaned up // later at the start, because for windows we leave marker untouched. // // Why is this being skipped on Windows? The comment above is not clear. // issue: https://github.com/elastic/elastic-agent/issues/3027 removeMarker := !isWindows() err = upgrade.Cleanup(log, paths.Top(), marker.VersionedHome, marker.Hash, removeMarker, false) if err != nil { log.Error("cleanup after successful watch failed", err) } return err } func isWindows() bool { return runtime.GOOS == "windows" } func watch(ctx context.Context, tilGrace time.Duration, errorCheckInterval time.Duration, log *logger.Logger) error { errChan := make(chan error) ctx, cancel := context.WithCancel(ctx) //cleanup defer func() { cancel() close(errChan) }() agentWatcher := upgrade.NewAgentWatcher(errChan, log, errorCheckInterval) go agentWatcher.Run(ctx) signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP) t := time.NewTimer(tilGrace) defer t.Stop() WATCHLOOP: for { select { case <-signals: // ignore continue case <-ctx.Done(): break WATCHLOOP // grace period passed, agent is considered stable case <-t.C: log.Info("Grace period passed, not watching") break WATCHLOOP // Agent in degraded state. case err := <-errChan: log.Errorf("Agent Error detected: %s", err.Error()) return err } } return nil } // gracePeriod returns true if it is within grace period and time until grace period ends. // otherwise it returns false and 0 func gracePeriod(marker *upgrade.UpdateMarker, gracePeriodDuration time.Duration) (bool, time.Duration) { sinceUpdate := time.Since(marker.UpdatedOn) if 0 < sinceUpdate && sinceUpdate < gracePeriodDuration { return true, gracePeriodDuration - sinceUpdate } return false, gracePeriodDuration } func configuredLogger(cfg *configuration.Configuration, name string) (*logger.Logger, error) { cfg.Settings.LoggingConfig.Beat = name cfg.Settings.LoggingConfig.Level = logp.DebugLevel internal, err := logger.MakeInternalFileOutput(cfg.Settings.LoggingConfig) if err != nil { return nil, err } libC, err := logger.ToCommonConfig(cfg.Settings.LoggingConfig) if err != nil { return nil, err } if err := configure.LoggingWithOutputs("", libC, internal); err != nil { return nil, fmt.Errorf("error initializing logging: %w", err) } return logp.NewLogger(""), nil } func getConfig(streams *cli.IOStreams) *configuration.Configuration { defaultCfg := configuration.DefaultConfiguration() pathConfigFile := paths.ConfigFile() rawConfig, err := config.LoadFile(pathConfigFile) if err != nil { fmt.Fprintf(streams.Err, "could not read configuration file %s", pathConfigFile) return defaultCfg } cfg, err := configuration.NewFromConfig(rawConfig) if err != nil { fmt.Fprintf(streams.Err, "could not parse configuration file %s", pathConfigFile) return defaultCfg } return cfg } func initUpgradeDetails(marker *upgrade.UpdateMarker, saveMarker func(*upgrade.UpdateMarker, bool) error, log *logp.Logger) *details.Details { upgradeDetails := details.NewDetails(version.GetAgentPackageVersion(), details.StateWatching, marker.GetActionID()) upgradeDetails.RegisterObserver(func(details *details.Details) { marker.Details = details if err := saveMarker(marker, true); err != nil { if details != nil { log.Errorf("unable to save upgrade marker after setting upgrade details (state = %s): %s", details.State, err.Error()) } else { log.Errorf("unable to save upgrade marker after clearing upgrade details: %s", err.Error()) } } }) return upgradeDetails }