func()

in internal/pkg/agent/application/upgrade/upgrade.go [188:369]


func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, det *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
	u.log.Infow("Upgrading agent", "version", version, "source_uri", sourceURI)

	currentVersion := agentVersion{
		version:  release.Version(),
		snapshot: release.Snapshot(),
		hash:     release.Commit(),
		fips:     release.FIPSDistribution(),
	}

	// Compare versions and exit before downloading anything if the upgrade
	// is for the same release version that is currently running
	if isSameReleaseVersion(u.log, currentVersion, version) {
		u.log.Warnf("Upgrade action skipped because agent is already at version %s", currentVersion)
		return nil, ErrUpgradeSameVersion
	}

	// Inform the Upgrade Marker Watcher that we've started upgrading. Note that this
	// is only possible to do in-memory since, today, the  process that's initiating
	// the upgrade is the same as the Agent process in which the Upgrade Marker Watcher is
	// running. If/when, in the future, the process initiating the upgrade is separated
	// from the Agent process in which the Upgrade Marker Watcher is running, such in-memory
	// communication will need to be replaced with inter-process communication (e.g. via
	// a file, e.g. the Upgrade Marker file or something else).
	u.markerWatcher.SetUpgradeStarted()

	span, ctx := apm.StartSpan(ctx, "upgrade", "app.internal")
	defer span.End()

	err = cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version())
	if err != nil {
		u.log.Errorw("Unable to clean downloads before update", "error.message", err, "downloads.path", paths.Downloads())
	}

	det.SetState(details.StateDownloading)

	sourceURI = u.sourceURI(sourceURI)

	parsedVersion, err := agtversion.ParseVersion(version)
	if err != nil {
		return nil, fmt.Errorf("error parsing version %q: %w", version, err)
	}

	archivePath, err := u.downloadArtifact(ctx, parsedVersion, sourceURI, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...)
	if err != nil {
		// Run the same pre-upgrade cleanup task to get rid of any newly downloaded files
		// This may have an issue if users are upgrading to the same version number.
		if dErr := cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version()); dErr != nil {
			u.log.Errorw("Unable to remove file after verification failure", "error.message", dErr)
		}

		return nil, err
	}

	det.SetState(details.StateExtracting)

	metadata, err := u.getPackageMetadata(archivePath)
	if err != nil {
		return nil, fmt.Errorf("reading metadata for elastic agent version %s package %q: %w", version, archivePath, err)
	}

	newVersion := extractAgentVersion(metadata, version)
	if err := checkUpgrade(u.log, currentVersion, newVersion, metadata); err != nil {
		return nil, fmt.Errorf("cannot upgrade the agent: %w", err)
	}

	u.log.Infow("Unpacking agent package", "version", newVersion)

	// Nice to have: add check that no archive files end up in the current versioned home
	// default to no flavor to avoid breaking behavior

	// no default flavor, keep everything in case flavor is not specified
	// in case of error fallback to keep-all
	detectedFlavor, err := install.UsedFlavor(paths.Top(), "")
	if err != nil {
		u.log.Warnf("error encountered when detecting used flavor with top path %q: %w", paths.Top(), err)
	}
	u.log.Debugf("detected used flavor: %q", detectedFlavor)
	unpackRes, err := u.unpack(version, archivePath, paths.Data(), detectedFlavor)
	if err != nil {
		return nil, err
	}

	newHash := unpackRes.Hash
	if newHash == "" {
		return nil, errors.New("unknown hash")
	}

	if unpackRes.VersionedHome == "" {
		return nil, fmt.Errorf("versionedhome is empty: %v", unpackRes)
	}

	newHome := filepath.Join(paths.Top(), unpackRes.VersionedHome)

	if err := copyActionStore(u.log, newHome); err != nil {
		return nil, errors.New(err, "failed to copy action store")
	}

	newRunPath := filepath.Join(newHome, "run")
	oldRunPath := filepath.Join(paths.Run())

	if err := copyRunDirectory(u.log, oldRunPath, newRunPath); err != nil {
		return nil, errors.New(err, "failed to copy run directory")
	}

	det.SetState(details.StateReplacing)

	// create symlink to the <new versioned-home>/elastic-agent
	hashedDir := unpackRes.VersionedHome

	symlinkPath := filepath.Join(paths.Top(), agentName)

	// paths.BinaryPath properly derives the binary directory depending on the platform. The path to the binary for macOS is inside of the app bundle.
	newPath := paths.BinaryPath(filepath.Join(paths.Top(), hashedDir), agentName)

	currentVersionedHome, err := filepath.Rel(paths.Top(), paths.Home())
	if err != nil {
		return nil, fmt.Errorf("calculating home path relative to top, home: %q top: %q : %w", paths.Home(), paths.Top(), err)
	}

	if err := changeSymlink(u.log, paths.Top(), symlinkPath, newPath); err != nil {
		u.log.Errorw("Rolling back: changing symlink failed", "error.message", err)
		rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
		return nil, goerrors.Join(err, rollbackErr)
	}

	// We rotated the symlink successfully: prepare the current and previous agent installation details for the update marker
	// In update marker the `current` agent install is the one where the symlink is pointing (the new one we didn't start yet)
	// while the `previous` install is the currently executing elastic-agent that is no longer reachable via the symlink.
	// After the restart at the end of the function, everything lines up correctly.
	current := agentInstall{
		parsedVersion: parsedVersion,
		version:       version,
		hash:          unpackRes.Hash,
		versionedHome: unpackRes.VersionedHome,
	}

	previousParsedVersion := currentagtversion.GetParsedAgentPackageVersion()
	previous := agentInstall{
		parsedVersion: previousParsedVersion,
		version:       release.VersionWithSnapshot(),
		hash:          release.Commit(),
		versionedHome: currentVersionedHome,
	}

	if err := markUpgrade(u.log,
		paths.Data(), // data dir to place the marker in
		current,      // new agent version data
		previous,     // old agent version data
		action, det); err != nil {
		u.log.Errorw("Rolling back: marking upgrade failed", "error.message", err)
		rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
		return nil, goerrors.Join(err, rollbackErr)
	}

	watcherExecutable := selectWatcherExecutable(paths.Top(), previous, current)

	var watcherCmd *exec.Cmd
	if watcherCmd, err = InvokeWatcher(u.log, watcherExecutable); err != nil {
		u.log.Errorw("Rolling back: starting watcher failed", "error.message", err)
		rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
		return nil, goerrors.Join(err, rollbackErr)
	}

	watcherWaitErr := waitForWatcher(ctx, u.log, markerFilePath(paths.Data()), watcherMaxWaitTime)
	if watcherWaitErr != nil {
		killWatcherErr := watcherCmd.Process.Kill()
		rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
		return nil, goerrors.Join(watcherWaitErr, killWatcherErr, rollbackErr)
	}

	cb := shutdownCallback(u.log, paths.Home(), release.Version(), version, filepath.Join(paths.Top(), unpackRes.VersionedHome))

	// Clean everything from the downloads dir
	u.log.Infow("Removing downloads directory", "file.path", paths.Downloads())
	err = os.RemoveAll(paths.Downloads())
	if err != nil {
		u.log.Errorw("Unable to clean downloads after update", "error.message", err, "file.path", paths.Downloads())
	}

	return cb, nil
}