in testing/upgradetest/upgrader.go [160:439]
func PerformUpgrade(
ctx context.Context,
startFixture *atesting.Fixture,
endFixture *atesting.Fixture,
logger Logger,
opts ...UpgradeOpt,
) error {
// use the passed in options to perform the upgrade
// `skipVerify` is by default enabled, because default is to perform a local
// upgrade to a built version of the Elastic Agent.
var upgradeOpts upgradeOpts
upgradeOpts.skipVerify = true
for _, o := range opts {
o(&upgradeOpts)
}
// ensure that both the starting and ending fixtures are prepared
err := startFixture.EnsurePrepared(ctx)
if err != nil {
return fmt.Errorf("failed to prepare the startFixture: %w", err)
}
err = endFixture.EnsurePrepared(ctx)
if err != nil {
return fmt.Errorf("failed to prepare the endFixture: %w", err)
}
// start fixture gets the agent configured to use a faster watcher
if upgradeOpts.customWatcherCfg != "" {
err = startFixture.Configure(ctx, []byte(upgradeOpts.customWatcherCfg))
} else {
err = ConfigureFastWatcher(ctx, startFixture)
}
if err != nil {
return fmt.Errorf("failed configuring the start agent with faster watcher configuration: %w", err)
}
// get the versions from each fixture (that ensures that it's always the
// same version that the fixture is working with)
startVersionInfo, err := startFixture.ExecVersion(ctx)
if err != nil {
return fmt.Errorf("failed to get start agent build version info: %w", err)
}
startParsedVersion, err := version.ParseVersion(startVersionInfo.Binary.String())
if err != nil {
return fmt.Errorf("failed to get parsed start agent build version (%s): %w", startVersionInfo.Binary.String(), err)
}
startVersion, err := version.ParseVersion(startVersionInfo.Binary.Version)
if err != nil {
return fmt.Errorf("failed to parse version of starting Agent binary: %w", err)
}
endVersionInfo, err := endFixture.ExecVersion(ctx)
if err != nil {
return fmt.Errorf("failed to get end agent build version info: %w", err)
}
endVersion, err := version.ParseVersion(endVersionInfo.Binary.Version)
if err != nil {
return fmt.Errorf("failed to parse version of upgraded Agent binary: %w", err)
}
// in the unprivileged is unset we adjust it to use unprivileged when the version allows it
// in the case that its explicitly set then we ensure the version supports it
if upgradeOpts.unprivileged == nil {
if SupportsUnprivileged(startVersion, endVersion) {
unprivileged := true
upgradeOpts.unprivileged = &unprivileged
logger.Logf("installation of Elastic Agent will use --unprivileged as both start and end version support --unprivileged mode")
} else {
// must be privileged
unprivileged := false
upgradeOpts.unprivileged = &unprivileged
}
} else if *upgradeOpts.unprivileged {
if !SupportsUnprivileged(startVersion, endVersion) {
return fmt.Errorf("cannot install with forced --unprivileged because either start version %s or end version %s doesn't support --unprivileged mode", startVersion.String(), endVersion.String())
}
}
if !upgradeOpts.disableHashCheck && startVersionInfo.Binary.Commit == endVersionInfo.Binary.Commit {
return fmt.Errorf("target version has the same commit hash %q", endVersionInfo.Binary.Commit)
}
// For asserting on the effects of any Upgrade Watcher changes made in 8.12.0, we need
// the endVersion to be >= 8.12.0. Otherwise, these assertions will fail as those changes
// won't be present in the Upgrade Watcher. So we disable these assertions if the endVersion
// is < 8.12.0.
//
// The start version also needs to be >= 8.10.0. Versions before 8.10.0 will launch the watcher
// process from the starting version of the agent and not the ending version of the agent. So
// even though an 8.12.0 watcher knows to write the upgrade details, prior to 8.10.0 the 8.12.0
// watcher version never executes and the upgrade details are never populated.
upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck = upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck ||
endVersion.Less(*version.NewParsedSemVer(8, 12, 0, "", "")) ||
startParsedVersion.Less(*version.NewParsedSemVer(8, 10, 0, "", ""))
if upgradeOpts.preInstallHook != nil {
if err := upgradeOpts.preInstallHook(); err != nil {
return fmt.Errorf("pre install hook failed: %w", err)
}
}
logger.Logf("Installing version %q", startParsedVersion.VersionWithPrerelease())
// install the start agent
var nonInteractiveFlag bool
if Version_8_2_0.Less(*startParsedVersion) {
nonInteractiveFlag = true
}
installOpts := atesting.InstallOpts{
NonInteractive: nonInteractiveFlag,
Force: true,
Privileged: !(*upgradeOpts.unprivileged),
InstallServers: upgradeOpts.installServers,
}
output, err := startFixture.Install(ctx, &installOpts)
if err != nil {
return fmt.Errorf("failed to install start agent (err: %w) [output: %s]", err, string(output))
}
if upgradeOpts.postInstallHook != nil {
if err := upgradeOpts.postInstallHook(); err != nil {
return fmt.Errorf("post install hook failed: %w", err)
}
}
// wait for the agent to be healthy and correct version
err = WaitHealthyAndVersion(ctx, startFixture, startVersionInfo.Binary, 2*time.Minute, 10*time.Second, logger)
if err != nil {
// context added by WaitHealthyAndVersion
return err
}
// validate installation is correct
if InstallChecksAllowed(!installOpts.Privileged, startVersion) {
err = installtest.CheckSuccess(ctx, startFixture, installOpts.BasePath, &installtest.CheckOpts{Privileged: installOpts.Privileged})
if err != nil {
return fmt.Errorf("pre-upgrade installation checks failed: %w", err)
}
}
if upgradeOpts.preUpgradeHook != nil {
if err := upgradeOpts.preUpgradeHook(); err != nil {
return fmt.Errorf("pre upgrade hook failed: %w", err)
}
}
logger.Logf("Upgrading from version \"%s-%s\" to version \"%s-%s\"", startParsedVersion, startVersionInfo.Binary.Commit, endVersionInfo.Binary.String(), endVersionInfo.Binary.Commit)
upgradeCmdArgs := []string{"upgrade", endVersionInfo.Binary.String()}
if upgradeOpts.sourceURI == nil {
// no --source-uri set so it comes from the endFixture
sourceURI, err := getSourceURI(ctx, endFixture, *upgradeOpts.unprivileged)
if err != nil {
return fmt.Errorf("failed to get end agent source package path: %w", err)
}
upgradeCmdArgs = append(upgradeCmdArgs, "--source-uri", sourceURI)
} else if *upgradeOpts.sourceURI != "" {
// specific --source-uri
upgradeCmdArgs = append(upgradeCmdArgs, "--source-uri", *upgradeOpts.sourceURI)
}
if upgradeOpts.customPgp != nil {
if len(upgradeOpts.customPgp.PGP) > 0 {
upgradeCmdArgs = append(upgradeCmdArgs, "--pgp", upgradeOpts.customPgp.PGP)
}
if len(upgradeOpts.customPgp.PGPUri) > 0 {
upgradeCmdArgs = append(upgradeCmdArgs, "--pgp-uri", upgradeOpts.customPgp.PGPUri)
}
if len(upgradeOpts.customPgp.PGPPath) > 0 {
upgradeCmdArgs = append(upgradeCmdArgs, "--pgp-path", upgradeOpts.customPgp.PGPPath)
}
}
if upgradeOpts.skipVerify {
upgradeCmdArgs = append(upgradeCmdArgs, "--skip-verify")
}
if upgradeOpts.skipDefaultPgp && !startParsedVersion.Less(*Version_8_10_0_SNAPSHOT) {
upgradeCmdArgs = append(upgradeCmdArgs, "--skip-default-pgp")
}
upgradeOutput, err := startFixture.Exec(ctx, upgradeCmdArgs)
if err != nil {
// Sometimes the gRPC server shuts down before replying to the command which is expected
// we can determine this state by the EOF error coming from the server.
// If the server is just unavailable/not running, we should not succeed.
// Starting with version 8.13.2, this is handled by the upgrade command itself.
outputString := string(upgradeOutput)
isConnectionInterrupted := strings.Contains(outputString, "Unavailable") && strings.Contains(outputString, "EOF")
if !isConnectionInterrupted {
return fmt.Errorf("failed to start agent upgrade to version %q: %w\n%s", endVersionInfo.Binary.Version, err, upgradeOutput)
}
}
// check status
if status := getStatus(ctx, startFixture); status != nil {
if status.State == 2 && status.UpgradeDetails == nil {
logger.Logf("Agent status indicates no upgrade is in progress.")
return nil
}
}
// wait for the watcher to show up
logger.Logf("waiting for upgrade watcher to start")
err = WaitForWatcher(ctx, 5*time.Minute, 10*time.Second)
if err != nil {
return fmt.Errorf("failed to find watcher: %w", err)
}
logger.Logf("upgrade watcher started")
// Check that, while the Upgrade Watcher is running, the upgrade details in Agent status
// show the state as UPG_WATCHING.
if !upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck {
logger.Logf("Checking upgrade details state while Upgrade Watcher is running")
if err := waitUpgradeDetailsState(ctx, startFixture, details.StateWatching, 2*time.Minute, 10*time.Second, logger); err != nil {
// error context added by waitUpgradeDetailsState
return err
}
}
if upgradeOpts.postUpgradeHook != nil {
if err := upgradeOpts.postUpgradeHook(); err != nil {
return fmt.Errorf("post upgrade hook failed: %w", err)
}
}
// wait for the agent to be healthy and correct version
err = WaitHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary, 2*time.Minute, 10*time.Second, logger)
if err != nil {
// agent never got healthy, but we need to ensure the watcher is stopped before continuing
// this kills the watcher instantly and waits for it to be gone before continuing
watcherErr := WaitForNoWatcher(ctx, 1*time.Minute, time.Second, 100*time.Millisecond)
if watcherErr != nil {
logger.Logf("failed to kill watcher due to agent not becoming healthy: %s", watcherErr)
}
// error context added by WaitHealthyAndVersion
return err
}
// it is unstable to continue until the watcher is done
// the maximum wait time is 10 minutes (12 minutes for grace) some older versions
// do not respect the `ConfigureFastWatcher` so we have to kill the watcher after the
// 10 minute window (10 min 15 seconds for grace) has passed.
logger.Logf("waiting for upgrade watcher to finish")
err = WaitForNoWatcher(ctx, 12*time.Minute, 10*time.Second, 10*time.Minute+15*time.Second)
if err != nil {
return fmt.Errorf("watcher never stopped running: %w", err)
}
logger.Logf("upgrade watcher finished")
// Check that, upon successful upgrade, the upgrade details have been cleared out
// from Agent status.
if !upgradeOpts.disableUpgradeWatcherUpgradeDetailsCheck {
logger.Logf("Checking upgrade details state after successful upgrade")
if err := waitUpgradeDetailsState(ctx, startFixture, "", 2*time.Minute, 10*time.Second, logger); err != nil {
// error context added by checkUpgradeDetailsState
return err
}
}
// now that the watcher has stopped lets ensure that it's still the expected
// version, otherwise it's possible that it was rolled back to the original version
err = CheckHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary)
if err != nil {
// error context added by CheckHealthyAndVersion
return err
}
// validate again that the installation is correct, upgrade should not have changed installation validation
if InstallChecksAllowed(!installOpts.Privileged, startVersion, endVersion) {
err = installtest.CheckSuccess(ctx, startFixture, installOpts.BasePath, &installtest.CheckOpts{Privileged: installOpts.Privileged})
if err != nil {
return fmt.Errorf("post-upgrade installation checks failed: %w", err)
}
}
return nil
}