func PerformUpgrade()

in testing/upgradetest/upgrader.go [160:439]


func PerformUpgrade(
	ctx context.Context,
	startFixture *atesting.Fixture,
	endFixture *atesting.Fixture,
	logger Logger,
	opts ...UpgradeOpt,
) error {
	// use the passed in options to perform the upgrade
	// `skipVerify` is by default enabled, because default is to perform a local
	// upgrade to a built version of the Elastic Agent.
	var upgradeOpts upgradeOpts
	upgradeOpts.skipVerify = true
	for _, o := range opts {
		o(&upgradeOpts)
	}

	// ensure that both the starting and ending fixtures are prepared
	err := startFixture.EnsurePrepared(ctx)
	if err != nil {
		return fmt.Errorf("failed to prepare the startFixture: %w", err)
	}
	err = endFixture.EnsurePrepared(ctx)
	if err != nil {
		return fmt.Errorf("failed to prepare the endFixture: %w", err)
	}

	// start fixture gets the agent configured to use a faster watcher
	if upgradeOpts.customWatcherCfg != "" {
		err = startFixture.Configure(ctx, []byte(upgradeOpts.customWatcherCfg))
	} else {
		err = ConfigureFastWatcher(ctx, startFixture)
	}
	if err != nil {
		return fmt.Errorf("failed configuring the start agent with faster watcher configuration: %w", err)
	}

	// get the versions from each fixture (that ensures that it's always the
	// same version that the fixture is working with)
	startVersionInfo, err := startFixture.ExecVersion(ctx)
	if err != nil {
		return fmt.Errorf("failed to get start agent build version info: %w", err)
	}
	startParsedVersion, err := version.ParseVersion(startVersionInfo.Binary.String())
	if err != nil {
		return fmt.Errorf("failed to get parsed start agent build version (%s): %w", startVersionInfo.Binary.String(), err)
	}
	startVersion, err := version.ParseVersion(startVersionInfo.Binary.Version)
	if err != nil {
		return fmt.Errorf("failed to parse version of starting Agent binary: %w", err)
	}
	endVersionInfo, err := endFixture.ExecVersion(ctx)
	if err != nil {
		return fmt.Errorf("failed to get end agent build version info: %w", err)
	}
	endVersion, err := version.ParseVersion(endVersionInfo.Binary.Version)
	if err != nil {
		return fmt.Errorf("failed to parse version of upgraded Agent binary: %w", err)
	}

	// in the unprivileged is unset we adjust it to use unprivileged when the version allows it
	// in the case that its explicitly set then we ensure the version supports it
	if upgradeOpts.unprivileged == nil {
		if SupportsUnprivileged(startVersion, endVersion) {
			unprivileged := true
			upgradeOpts.unprivileged = &unprivileged
			logger.Logf("installation of Elastic Agent will use --unprivileged as both start and end version support --unprivileged mode")
		} else {
			// must be privileged
			unprivileged := false
			upgradeOpts.unprivileged = &unprivileged
		}
	} else if *upgradeOpts.unprivileged {
		if !SupportsUnprivileged(startVersion, endVersion) {
			return fmt.Errorf("cannot install with forced --unprivileged because either start version %s or end version %s doesn't support --unprivileged mode", startVersion.String(), endVersion.String())
		}
	}

	if !upgradeOpts.disableHashCheck && startVersionInfo.Binary.Commit == endVersionInfo.Binary.Commit {
		return fmt.Errorf("target version has the same commit hash %q", endVersionInfo.Binary.Commit)
	}

	// For asserting on the effects of any Upgrade Watcher changes made in 8.12.0, we need
	// the endVersion to be >= 8.12.0.  Otherwise, these assertions will fail as those changes
	// won't be present in the Upgrade Watcher. So we disable these assertions if the endVersion
	// is < 8.12.0.
	//
	// The start version also needs to be >= 8.10.0. Versions before 8.10.0 will launch the watcher
	// process from the starting version of the agent and not the ending version of the agent. So
	// even though an 8.12.0 watcher knows to write the upgrade details, prior to 8.10.0 the 8.12.0
	// watcher version never executes and the upgrade details are never populated.
	upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck = upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck ||
		endVersion.Less(*version.NewParsedSemVer(8, 12, 0, "", "")) ||
		startParsedVersion.Less(*version.NewParsedSemVer(8, 10, 0, "", ""))

	if upgradeOpts.preInstallHook != nil {
		if err := upgradeOpts.preInstallHook(); err != nil {
			return fmt.Errorf("pre install hook failed: %w", err)
		}
	}

	logger.Logf("Installing version %q", startParsedVersion.VersionWithPrerelease())

	// install the start agent
	var nonInteractiveFlag bool
	if Version_8_2_0.Less(*startParsedVersion) {
		nonInteractiveFlag = true
	}
	installOpts := atesting.InstallOpts{
		NonInteractive: nonInteractiveFlag,
		Force:          true,
		Privileged:     !(*upgradeOpts.unprivileged),
		InstallServers: upgradeOpts.installServers,
	}
	output, err := startFixture.Install(ctx, &installOpts)
	if err != nil {
		return fmt.Errorf("failed to install start agent (err: %w) [output: %s]", err, string(output))
	}

	if upgradeOpts.postInstallHook != nil {
		if err := upgradeOpts.postInstallHook(); err != nil {
			return fmt.Errorf("post install hook failed: %w", err)
		}
	}

	// wait for the agent to be healthy and correct version
	err = WaitHealthyAndVersion(ctx, startFixture, startVersionInfo.Binary, 2*time.Minute, 10*time.Second, logger)
	if err != nil {
		// context added by WaitHealthyAndVersion
		return err
	}

	// validate installation is correct
	if InstallChecksAllowed(!installOpts.Privileged, startVersion) {
		err = installtest.CheckSuccess(ctx, startFixture, installOpts.BasePath, &installtest.CheckOpts{Privileged: installOpts.Privileged})
		if err != nil {
			return fmt.Errorf("pre-upgrade installation checks failed: %w", err)
		}
	}

	if upgradeOpts.preUpgradeHook != nil {
		if err := upgradeOpts.preUpgradeHook(); err != nil {
			return fmt.Errorf("pre upgrade hook failed: %w", err)
		}
	}

	logger.Logf("Upgrading from version \"%s-%s\" to version \"%s-%s\"", startParsedVersion, startVersionInfo.Binary.Commit, endVersionInfo.Binary.String(), endVersionInfo.Binary.Commit)

	upgradeCmdArgs := []string{"upgrade", endVersionInfo.Binary.String()}
	if upgradeOpts.sourceURI == nil {
		// no --source-uri set so it comes from the endFixture
		sourceURI, err := getSourceURI(ctx, endFixture, *upgradeOpts.unprivileged)
		if err != nil {
			return fmt.Errorf("failed to get end agent source package path: %w", err)
		}
		upgradeCmdArgs = append(upgradeCmdArgs, "--source-uri", sourceURI)
	} else if *upgradeOpts.sourceURI != "" {
		// specific --source-uri
		upgradeCmdArgs = append(upgradeCmdArgs, "--source-uri", *upgradeOpts.sourceURI)
	}

	if upgradeOpts.customPgp != nil {
		if len(upgradeOpts.customPgp.PGP) > 0 {
			upgradeCmdArgs = append(upgradeCmdArgs, "--pgp", upgradeOpts.customPgp.PGP)
		}

		if len(upgradeOpts.customPgp.PGPUri) > 0 {
			upgradeCmdArgs = append(upgradeCmdArgs, "--pgp-uri", upgradeOpts.customPgp.PGPUri)
		}

		if len(upgradeOpts.customPgp.PGPPath) > 0 {
			upgradeCmdArgs = append(upgradeCmdArgs, "--pgp-path", upgradeOpts.customPgp.PGPPath)
		}
	}

	if upgradeOpts.skipVerify {
		upgradeCmdArgs = append(upgradeCmdArgs, "--skip-verify")
	}

	if upgradeOpts.skipDefaultPgp && !startParsedVersion.Less(*Version_8_10_0_SNAPSHOT) {
		upgradeCmdArgs = append(upgradeCmdArgs, "--skip-default-pgp")
	}

	upgradeOutput, err := startFixture.Exec(ctx, upgradeCmdArgs)
	if err != nil {
		// Sometimes the gRPC server shuts down before replying to the command which is expected
		// we can determine this state by the EOF error coming from the server.
		// If the server is just unavailable/not running, we should not succeed.
		// Starting with version 8.13.2, this is handled by the upgrade command itself.
		outputString := string(upgradeOutput)
		isConnectionInterrupted := strings.Contains(outputString, "Unavailable") && strings.Contains(outputString, "EOF")
		if !isConnectionInterrupted {
			return fmt.Errorf("failed to start agent upgrade to version %q: %w\n%s", endVersionInfo.Binary.Version, err, upgradeOutput)
		}
	}

	// check status
	if status := getStatus(ctx, startFixture); status != nil {
		if status.State == 2 && status.UpgradeDetails == nil {
			logger.Logf("Agent status indicates no upgrade is in progress.")
			return nil
		}
	}

	// wait for the watcher to show up
	logger.Logf("waiting for upgrade watcher to start")
	err = WaitForWatcher(ctx, 5*time.Minute, 10*time.Second)
	if err != nil {
		return fmt.Errorf("failed to find watcher: %w", err)
	}
	logger.Logf("upgrade watcher started")

	// Check that, while the Upgrade Watcher is running, the upgrade details in Agent status
	// show the state as UPG_WATCHING.
	if !upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck {
		logger.Logf("Checking upgrade details state while Upgrade Watcher is running")
		if err := waitUpgradeDetailsState(ctx, startFixture, details.StateWatching, 2*time.Minute, 10*time.Second, logger); err != nil {
			// error context added by waitUpgradeDetailsState
			return err
		}
	}

	if upgradeOpts.postUpgradeHook != nil {
		if err := upgradeOpts.postUpgradeHook(); err != nil {
			return fmt.Errorf("post upgrade hook failed: %w", err)
		}
	}

	// wait for the agent to be healthy and correct version
	err = WaitHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary, 2*time.Minute, 10*time.Second, logger)
	if err != nil {
		// agent never got healthy, but we need to ensure the watcher is stopped before continuing
		// this kills the watcher instantly and waits for it to be gone before continuing
		watcherErr := WaitForNoWatcher(ctx, 1*time.Minute, time.Second, 100*time.Millisecond)
		if watcherErr != nil {
			logger.Logf("failed to kill watcher due to agent not becoming healthy: %s", watcherErr)
		}

		// error context added by WaitHealthyAndVersion
		return err
	}

	// it is unstable to continue until the watcher is done
	// the maximum wait time is 10 minutes (12 minutes for grace) some older versions
	// do not respect the `ConfigureFastWatcher` so we have to kill the watcher after the
	// 10 minute window (10 min 15 seconds for grace) has passed.
	logger.Logf("waiting for upgrade watcher to finish")
	err = WaitForNoWatcher(ctx, 12*time.Minute, 10*time.Second, 10*time.Minute+15*time.Second)
	if err != nil {
		return fmt.Errorf("watcher never stopped running: %w", err)
	}
	logger.Logf("upgrade watcher finished")

	// Check that, upon successful upgrade, the upgrade details have been cleared out
	// from Agent status.
	if !upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck {
		logger.Logf("Checking upgrade details state after successful upgrade")
		if err := waitUpgradeDetailsState(ctx, startFixture, "", 2*time.Minute, 10*time.Second, logger); err != nil {
			// error context added by checkUpgradeDetailsState
			return err
		}
	}

	// now that the watcher has stopped lets ensure that it's still the expected
	// version, otherwise it's possible that it was rolled back to the original version
	err = CheckHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary)
	if err != nil {
		// error context added by CheckHealthyAndVersion
		return err
	}

	// validate again that the installation is correct, upgrade should not have changed installation validation
	if InstallChecksAllowed(!installOpts.Privileged, startVersion, endVersion) {
		err = installtest.CheckSuccess(ctx, startFixture, installOpts.BasePath, &installtest.CheckOpts{Privileged: installOpts.Privileged})
		if err != nil {
			return fmt.Errorf("post-upgrade installation checks failed: %w", err)
		}
	}

	return nil
}