testing/upgradetest/upgrader.go (521 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package upgradetest import ( "context" "encoding/json" "errors" "fmt" "os" "path/filepath" "runtime" "strings" "time" "github.com/elastic/elastic-agent/testing/installtest" "github.com/otiai10/copy" "github.com/elastic/elastic-agent/internal/pkg/acl" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" v1client "github.com/elastic/elastic-agent/pkg/control/v1/client" v2proto "github.com/elastic/elastic-agent/pkg/control/v2/cproto" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/version" ) // CustomPGP allows for custom PGP options on upgrade. type CustomPGP struct { PGP string PGPUri string PGPPath string } type upgradeOpts struct { sourceURI *string unprivileged *bool skipVerify bool skipDefaultPgp bool customPgp *CustomPGP customWatcherCfg string installServers bool // Used to disable upgrade details checks for versions that don't support them, like 7.17.x. // See also WithDisableUpgradeWatcherUpgradeDetailsCheck. disableUpgradeWatcherUpgradeDetailsCheck bool // Disable check that enforces different hashed between the to and from version of upgrade disableHashCheck bool preInstallHook func() error postInstallHook func() error preUpgradeHook func() error postUpgradeHook func() error } type UpgradeOpt func(opts *upgradeOpts) // WithSourceURI sets a specific --source-uri for the upgrade // command. This doesn't change the verification of the upgrade // the resulting upgrade must still be the same agent provided // in the endFixture variable. func WithSourceURI(sourceURI string) UpgradeOpt { return func(opts *upgradeOpts) { opts.sourceURI = &sourceURI } } // WithUnprivileged sets the install to be explicitly unprivileged. func WithUnprivileged(unprivileged bool) UpgradeOpt { return func(opts *upgradeOpts) { opts.unprivileged = &unprivileged } } // WithSkipVerify sets the skip verify option for upgrade. func WithSkipVerify(skipVerify bool) UpgradeOpt { return func(opts *upgradeOpts) { opts.skipVerify = skipVerify } } // WithSkipDefaultPgp sets the skip default pgp option for upgrade. func WithSkipDefaultPgp(skipDefaultPgp bool) UpgradeOpt { return func(opts *upgradeOpts) { opts.skipDefaultPgp = skipDefaultPgp } } // WithCustomPGP sets a custom pgp configuration for upgrade. func WithCustomPGP(customPgp CustomPGP) UpgradeOpt { return func(opts *upgradeOpts) { opts.customPgp = &customPgp } } // WithPreInstallHook sets a hook to be called before install. func WithPreInstallHook(hook func() error) UpgradeOpt { return func(opts *upgradeOpts) { opts.preInstallHook = hook } } // WithPostInstallHook sets a hook to be called before install. func WithPostInstallHook(hook func() error) UpgradeOpt { return func(opts *upgradeOpts) { opts.postInstallHook = hook } } // WithPreUpgradeHook sets a hook to be called before install. func WithPreUpgradeHook(hook func() error) UpgradeOpt { return func(opts *upgradeOpts) { opts.preUpgradeHook = hook } } // WithPostUpgradeHook sets a hook to be called before install. func WithPostUpgradeHook(hook func() error) UpgradeOpt { return func(opts *upgradeOpts) { opts.postUpgradeHook = hook } } // WithCustomWatcherConfig sets a custom watcher configuration to use. func WithCustomWatcherConfig(cfg string) UpgradeOpt { return func(opts *upgradeOpts) { opts.customWatcherCfg = cfg } } // WithServers will use start version with servers flavor. func WithServers() UpgradeOpt { return func(opts *upgradeOpts) { opts.installServers = true } } // WithDisableUpgradeWatcherUpgradeDetailsCheck disables any assertions for // upgrade details that are being set by the Upgrade Watcher. This option is // useful in upgrade tests where the end Agent version does not contain changes // in the Upgrade Watcher whose effects are being asserted upon in PerformUpgrade. func WithDisableUpgradeWatcherUpgradeDetailsCheck() UpgradeOpt { return func(opts *upgradeOpts) { opts.disableUpgradeWatcherUpgradeDetailsCheck = true } } // WithDisableHashCheck disables hash check between start and end versions of upgrade func WithDisableHashCheck(disable bool) UpgradeOpt { return func(opts *upgradeOpts) { opts.disableHashCheck = disable } } // PerformUpgrade performs the upgrading of the Elastic Agent. func PerformUpgrade( ctx context.Context, startFixture *atesting.Fixture, endFixture *atesting.Fixture, logger Logger, opts ...UpgradeOpt, ) error { // use the passed in options to perform the upgrade // `skipVerify` is by default enabled, because default is to perform a local // upgrade to a built version of the Elastic Agent. var upgradeOpts upgradeOpts upgradeOpts.skipVerify = true for _, o := range opts { o(&upgradeOpts) } // ensure that both the starting and ending fixtures are prepared err := startFixture.EnsurePrepared(ctx) if err != nil { return fmt.Errorf("failed to prepare the startFixture: %w", err) } err = endFixture.EnsurePrepared(ctx) if err != nil { return fmt.Errorf("failed to prepare the endFixture: %w", err) } // start fixture gets the agent configured to use a faster watcher if upgradeOpts.customWatcherCfg != "" { err = startFixture.Configure(ctx, []byte(upgradeOpts.customWatcherCfg)) } else { err = ConfigureFastWatcher(ctx, startFixture) } if err != nil { return fmt.Errorf("failed configuring the start agent with faster watcher configuration: %w", err) } // get the versions from each fixture (that ensures that it's always the // same version that the fixture is working with) startVersionInfo, err := startFixture.ExecVersion(ctx) if err != nil { return fmt.Errorf("failed to get start agent build version info: %w", err) } startParsedVersion, err := version.ParseVersion(startVersionInfo.Binary.String()) if err != nil { return fmt.Errorf("failed to get parsed start agent build version (%s): %w", startVersionInfo.Binary.String(), err) } startVersion, err := version.ParseVersion(startVersionInfo.Binary.Version) if err != nil { return fmt.Errorf("failed to parse version of starting Agent binary: %w", err) } endVersionInfo, err := endFixture.ExecVersion(ctx) if err != nil { return fmt.Errorf("failed to get end agent build version info: %w", err) } endVersion, err := version.ParseVersion(endVersionInfo.Binary.Version) if err != nil { return fmt.Errorf("failed to parse version of upgraded Agent binary: %w", err) } // in the unprivileged is unset we adjust it to use unprivileged when the version allows it // in the case that its explicitly set then we ensure the version supports it if upgradeOpts.unprivileged == nil { if SupportsUnprivileged(startVersion, endVersion) { unprivileged := true upgradeOpts.unprivileged = &unprivileged logger.Logf("installation of Elastic Agent will use --unprivileged as both start and end version support --unprivileged mode") } else { // must be privileged unprivileged := false upgradeOpts.unprivileged = &unprivileged } } else if *upgradeOpts.unprivileged { if !SupportsUnprivileged(startVersion, endVersion) { return fmt.Errorf("cannot install with forced --unprivileged because either start version %s or end version %s doesn't support --unprivileged mode", startVersion.String(), endVersion.String()) } } if !upgradeOpts.disableHashCheck && startVersionInfo.Binary.Commit == endVersionInfo.Binary.Commit { return fmt.Errorf("target version has the same commit hash %q", endVersionInfo.Binary.Commit) } // For asserting on the effects of any Upgrade Watcher changes made in 8.12.0, we need // the endVersion to be >= 8.12.0. Otherwise, these assertions will fail as those changes // won't be present in the Upgrade Watcher. So we disable these assertions if the endVersion // is < 8.12.0. // // The start version also needs to be >= 8.10.0. Versions before 8.10.0 will launch the watcher // process from the starting version of the agent and not the ending version of the agent. So // even though an 8.12.0 watcher knows to write the upgrade details, prior to 8.10.0 the 8.12.0 // watcher version never executes and the upgrade details are never populated. upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck = upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck || endVersion.Less(*version.NewParsedSemVer(8, 12, 0, "", "")) || startParsedVersion.Less(*version.NewParsedSemVer(8, 10, 0, "", "")) if upgradeOpts.preInstallHook != nil { if err := upgradeOpts.preInstallHook(); err != nil { return fmt.Errorf("pre install hook failed: %w", err) } } logger.Logf("Installing version %q", startParsedVersion.VersionWithPrerelease()) // install the start agent var nonInteractiveFlag bool if Version_8_2_0.Less(*startParsedVersion) { nonInteractiveFlag = true } installOpts := atesting.InstallOpts{ NonInteractive: nonInteractiveFlag, Force: true, Privileged: !(*upgradeOpts.unprivileged), InstallServers: upgradeOpts.installServers, } output, err := startFixture.Install(ctx, &installOpts) if err != nil { return fmt.Errorf("failed to install start agent (err: %w) [output: %s]", err, string(output)) } if upgradeOpts.postInstallHook != nil { if err := upgradeOpts.postInstallHook(); err != nil { return fmt.Errorf("post install hook failed: %w", err) } } // wait for the agent to be healthy and correct version err = WaitHealthyAndVersion(ctx, startFixture, startVersionInfo.Binary, 2*time.Minute, 10*time.Second, logger) if err != nil { // context added by WaitHealthyAndVersion return err } // validate installation is correct if InstallChecksAllowed(!installOpts.Privileged, startVersion) { err = installtest.CheckSuccess(ctx, startFixture, installOpts.BasePath, &installtest.CheckOpts{Privileged: installOpts.Privileged}) if err != nil { return fmt.Errorf("pre-upgrade installation checks failed: %w", err) } } if upgradeOpts.preUpgradeHook != nil { if err := upgradeOpts.preUpgradeHook(); err != nil { return fmt.Errorf("pre upgrade hook failed: %w", err) } } logger.Logf("Upgrading from version \"%s-%s\" to version \"%s-%s\"", startParsedVersion, startVersionInfo.Binary.Commit, endVersionInfo.Binary.String(), endVersionInfo.Binary.Commit) upgradeCmdArgs := []string{"upgrade", endVersionInfo.Binary.String()} if upgradeOpts.sourceURI == nil { // no --source-uri set so it comes from the endFixture sourceURI, err := getSourceURI(ctx, endFixture, *upgradeOpts.unprivileged) if err != nil { return fmt.Errorf("failed to get end agent source package path: %w", err) } upgradeCmdArgs = append(upgradeCmdArgs, "--source-uri", sourceURI) } else if *upgradeOpts.sourceURI != "" { // specific --source-uri upgradeCmdArgs = append(upgradeCmdArgs, "--source-uri", *upgradeOpts.sourceURI) } if upgradeOpts.customPgp != nil { if len(upgradeOpts.customPgp.PGP) > 0 { upgradeCmdArgs = append(upgradeCmdArgs, "--pgp", upgradeOpts.customPgp.PGP) } if len(upgradeOpts.customPgp.PGPUri) > 0 { upgradeCmdArgs = append(upgradeCmdArgs, "--pgp-uri", upgradeOpts.customPgp.PGPUri) } if len(upgradeOpts.customPgp.PGPPath) > 0 { upgradeCmdArgs = append(upgradeCmdArgs, "--pgp-path", upgradeOpts.customPgp.PGPPath) } } if upgradeOpts.skipVerify { upgradeCmdArgs = append(upgradeCmdArgs, "--skip-verify") } if upgradeOpts.skipDefaultPgp && !startParsedVersion.Less(*Version_8_10_0_SNAPSHOT) { upgradeCmdArgs = append(upgradeCmdArgs, "--skip-default-pgp") } upgradeOutput, err := startFixture.Exec(ctx, upgradeCmdArgs) if err != nil { // Sometimes the gRPC server shuts down before replying to the command which is expected // we can determine this state by the EOF error coming from the server. // If the server is just unavailable/not running, we should not succeed. // Starting with version 8.13.2, this is handled by the upgrade command itself. outputString := string(upgradeOutput) isConnectionInterrupted := strings.Contains(outputString, "Unavailable") && strings.Contains(outputString, "EOF") if !isConnectionInterrupted { return fmt.Errorf("failed to start agent upgrade to version %q: %w\n%s", endVersionInfo.Binary.Version, err, upgradeOutput) } } // check status if status := getStatus(ctx, startFixture); status != nil { if status.State == 2 && status.UpgradeDetails == nil { logger.Logf("Agent status indicates no upgrade is in progress.") return nil } } // wait for the watcher to show up logger.Logf("waiting for upgrade watcher to start") err = WaitForWatcher(ctx, 5*time.Minute, 10*time.Second) if err != nil { return fmt.Errorf("failed to find watcher: %w", err) } logger.Logf("upgrade watcher started") // Check that, while the Upgrade Watcher is running, the upgrade details in Agent status // show the state as UPG_WATCHING. if !upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck { logger.Logf("Checking upgrade details state while Upgrade Watcher is running") if err := waitUpgradeDetailsState(ctx, startFixture, details.StateWatching, 2*time.Minute, 10*time.Second, logger); err != nil { // error context added by waitUpgradeDetailsState return err } } if upgradeOpts.postUpgradeHook != nil { if err := upgradeOpts.postUpgradeHook(); err != nil { return fmt.Errorf("post upgrade hook failed: %w", err) } } // wait for the agent to be healthy and correct version err = WaitHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary, 2*time.Minute, 10*time.Second, logger) if err != nil { // agent never got healthy, but we need to ensure the watcher is stopped before continuing // this kills the watcher instantly and waits for it to be gone before continuing watcherErr := WaitForNoWatcher(ctx, 1*time.Minute, time.Second, 100*time.Millisecond) if watcherErr != nil { logger.Logf("failed to kill watcher due to agent not becoming healthy: %s", watcherErr) } // error context added by WaitHealthyAndVersion return err } // it is unstable to continue until the watcher is done // the maximum wait time is 10 minutes (12 minutes for grace) some older versions // do not respect the `ConfigureFastWatcher` so we have to kill the watcher after the // 10 minute window (10 min 15 seconds for grace) has passed. logger.Logf("waiting for upgrade watcher to finish") err = WaitForNoWatcher(ctx, 12*time.Minute, 10*time.Second, 10*time.Minute+15*time.Second) if err != nil { return fmt.Errorf("watcher never stopped running: %w", err) } logger.Logf("upgrade watcher finished") // Check that, upon successful upgrade, the upgrade details have been cleared out // from Agent status. if !upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck { logger.Logf("Checking upgrade details state after successful upgrade") if err := waitUpgradeDetailsState(ctx, startFixture, "", 2*time.Minute, 10*time.Second, logger); err != nil { // error context added by checkUpgradeDetailsState return err } } // now that the watcher has stopped lets ensure that it's still the expected // version, otherwise it's possible that it was rolled back to the original version err = CheckHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary) if err != nil { // error context added by CheckHealthyAndVersion return err } // validate again that the installation is correct, upgrade should not have changed installation validation if InstallChecksAllowed(!installOpts.Privileged, startVersion, endVersion) { err = installtest.CheckSuccess(ctx, startFixture, installOpts.BasePath, &installtest.CheckOpts{Privileged: installOpts.Privileged}) if err != nil { return fmt.Errorf("post-upgrade installation checks failed: %w", err) } } return nil } var ErrVerMismatch = errors.New("versions don't match") func CheckHealthyAndVersion(ctx context.Context, f *atesting.Fixture, versionInfo atesting.AgentBinaryVersion) error { checkFunc := func() error { status, err := f.ExecStatus(ctx) if err != nil { return err } if status.Info.Version != versionInfo.Version { return fmt.Errorf("%w: got %s, want %s", ErrVerMismatch, status.Info.Version, versionInfo.Version) } if status.Info.Snapshot != versionInfo.Snapshot { return fmt.Errorf("snapshots don't match: got %t, want %t", status.Info.Snapshot, versionInfo.Snapshot) } if status.Info.Commit != versionInfo.Commit { return fmt.Errorf("commits don't match: got %s, want %s", status.Info.Commit, versionInfo.Commit) } if status.State != int(v2proto.State_HEALTHY) { return fmt.Errorf("agent state is not healthy: got %d", status.State) } return nil } parsedVersion, err := version.ParseVersion(versionInfo.Version) if err != nil { return fmt.Errorf("failed to get parsed version (%s): %w", versionInfo.Version, err) } if parsedVersion.Less(*Version_8_6_0) { // we have to handle v1 architecture of the Elastic Agent checkFunc = func() error { stateOut, err := f.Exec(ctx, []string{"status", "--output", "json"}) if err != nil { return err } var state v1client.AgentStatus err = json.Unmarshal(stateOut, &state) if err != nil { return err } versionOut, err := f.ExecVersion(ctx) if err != nil { return err } if versionOut.Binary.Version != versionInfo.Version { return fmt.Errorf("versions don't match: got %s, want %s", versionOut.Binary.Version, versionInfo.Version) } if versionOut.Binary.Snapshot != versionInfo.Snapshot { return fmt.Errorf("snapshots don't match: got %t, want %t", versionOut.Binary.Snapshot, versionInfo.Snapshot) } if versionOut.Binary.Commit != versionInfo.Commit { return fmt.Errorf("commits don't match: got %s, want %s", versionOut.Binary.Commit, versionInfo.Commit) } if state.Status != v1client.Healthy { return fmt.Errorf("agent state is not healthy: got %d", state.Status) } return nil } } return checkFunc() } func WaitHealthyAndVersion(ctx context.Context, f *atesting.Fixture, versionInfo atesting.AgentBinaryVersion, timeout time.Duration, interval time.Duration, logger Logger) error { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() // The deadline was set above, we don't need to check for it. deadline, _ := ctx.Deadline() t := time.NewTicker(interval) defer t.Stop() var lastErr error for { select { case <-ctx.Done(): if lastErr != nil { return fmt.Errorf("failed waiting for healthy agent and version (%w): %w", ctx.Err(), lastErr) } return ctx.Err() case <-t.C: err := CheckHealthyAndVersion(ctx, f, versionInfo) // If we're in an upgrade process, the versions might not match // so we wait to see if we get to a stable version if errors.Is(err, ErrVerMismatch) { logger.Logf("version mismatch, ignoring it, time until timeout: %s", time.Until(deadline)) continue } if err == nil { return nil } lastErr = err logger.Logf("waiting for healthy agent and proper version: %s", err) } } } func waitUpgradeDetailsState(ctx context.Context, f *atesting.Fixture, expectedState details.State, timeout time.Duration, interval time.Duration, logger Logger) error { versionStr, err := f.ExecVersion(ctx) if err != nil { return fmt.Errorf("failed to get Agent version: %w", err) } versionParsed, err := version.ParseVersion(versionStr.Binary.Version) if err != nil { return fmt.Errorf("failed to parse version [%s]: %w", versionStr.Binary.Version, err) } // Upgrade details are only available in Agent version >= 8.12.0 versionUpgradeDetailsAvailable := version.NewParsedSemVer(8, 12, 0, "", "") if versionParsed.Less(*versionUpgradeDetailsAvailable) { logger.Logf("upgrade details functionality not implemented in Agent version [%s]. Skipping check for upgrade details state.", versionParsed.String()) return nil } ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() t := time.NewTicker(interval) defer t.Stop() var lastErr error for { select { case <-ctx.Done(): if lastErr != nil { return fmt.Errorf("failed waiting for status: %w", errors.Join(ctx.Err(), lastErr)) } return ctx.Err() case <-t.C: status, err := f.ExecStatus(ctx) if err != nil && status.IsZero() { lastErr = err continue } if expectedState == "" { if status.UpgradeDetails == nil { // Expected and actual match, so we're good return nil } lastErr = errors.New("upgrade details found in status but they were expected to be absent") continue } if status.UpgradeDetails == nil { lastErr = fmt.Errorf("upgrade details not found in status but expected upgrade details state was [%s]", expectedState) continue } // Neither expected nor actual are nil, so compare the two if status.UpgradeDetails.State == expectedState { return nil } lastErr = fmt.Errorf("upgrade details state in status [%s] is not the same as expected upgrade details state [%s]", status.UpgradeDetails.State, expectedState) continue } } } func getSourceURI(ctx context.Context, f *atesting.Fixture, unprivileged bool) (string, error) { srcPkg, err := f.SrcPackage(ctx) if err != nil { return "", fmt.Errorf("failed to get source package: %w", err) } if unprivileged { // move the file to temp directory baseTmp := "" if runtime.GOOS == "windows" { // `elastic-agent-user` needs to have access to the file, default // will place this in C:\Users\windows\AppData\Local\Temp\ which // `elastic-agent-user` doesn't have access. // create C:\Temp with world read/write to use for temp directory baseTmp, err = windowsBaseTemp() if err != nil { return "", fmt.Errorf("failed to create windows base temp path: %w", err) } } dir, err := os.MkdirTemp(baseTmp, "agent-upgrade-*") if err != nil { return "", fmt.Errorf("failed to create temp directory: %w", err) } err = os.Chmod(dir, 0777) if err != nil { return "", fmt.Errorf("failed to chmod temp directory: %w", err) } for _, suffix := range []string{"", ".sha512"} { source := fmt.Sprintf("%s%s", srcPkg, suffix) dest := fmt.Sprintf("%s%s", filepath.Join(dir, filepath.Base(srcPkg)), suffix) err = copy.Copy(source, dest, copy.Options{ PermissionControl: copy.AddPermission(0777), }) if err != nil { return "", fmt.Errorf("failed to copy %s -> %s: %w", source, dest, err) } } srcPkg = filepath.Join(dir, filepath.Base(srcPkg)) } return "file://" + filepath.Dir(srcPkg), nil } func windowsBaseTemp() (string, error) { baseTmp := "C:\\Temp" _, err := os.Stat(baseTmp) if err != nil { if !errors.Is(err, os.ErrNotExist) { return "", fmt.Errorf("failed to stat %s: %w", baseTmp, err) } err = os.Mkdir(baseTmp, 0777) if err != nil { return "", fmt.Errorf("failed to mkdir %s: %w", baseTmp, err) } } err = acl.Chmod(baseTmp, 0777) if err != nil { return "", fmt.Errorf("failed to chmod %s: %w", baseTmp, err) } return baseTmp, nil } // getStatus will attempt to get the agent status with retries if enounters an error func getStatus(ctx context.Context, fixture *atesting.Fixture) *atesting.AgentStatusOutput { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: status, err := fixture.ExecStatus(ctx) if err != nil { continue } return &status } } }