in internal/pkg/agent/application/upgrade/upgrade.go [188:369]
func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, det *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
u.log.Infow("Upgrading agent", "version", version, "source_uri", sourceURI)
currentVersion := agentVersion{
version: release.Version(),
snapshot: release.Snapshot(),
hash: release.Commit(),
fips: release.FIPSDistribution(),
}
// Compare versions and exit before downloading anything if the upgrade
// is for the same release version that is currently running
if isSameReleaseVersion(u.log, currentVersion, version) {
u.log.Warnf("Upgrade action skipped because agent is already at version %s", currentVersion)
return nil, ErrUpgradeSameVersion
}
// Inform the Upgrade Marker Watcher that we've started upgrading. Note that this
// is only possible to do in-memory since, today, the process that's initiating
// the upgrade is the same as the Agent process in which the Upgrade Marker Watcher is
// running. If/when, in the future, the process initiating the upgrade is separated
// from the Agent process in which the Upgrade Marker Watcher is running, such in-memory
// communication will need to be replaced with inter-process communication (e.g. via
// a file, e.g. the Upgrade Marker file or something else).
u.markerWatcher.SetUpgradeStarted()
span, ctx := apm.StartSpan(ctx, "upgrade", "app.internal")
defer span.End()
err = cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version())
if err != nil {
u.log.Errorw("Unable to clean downloads before update", "error.message", err, "downloads.path", paths.Downloads())
}
det.SetState(details.StateDownloading)
sourceURI = u.sourceURI(sourceURI)
parsedVersion, err := agtversion.ParseVersion(version)
if err != nil {
return nil, fmt.Errorf("error parsing version %q: %w", version, err)
}
archivePath, err := u.downloadArtifact(ctx, parsedVersion, sourceURI, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...)
if err != nil {
// Run the same pre-upgrade cleanup task to get rid of any newly downloaded files
// This may have an issue if users are upgrading to the same version number.
if dErr := cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version()); dErr != nil {
u.log.Errorw("Unable to remove file after verification failure", "error.message", dErr)
}
return nil, err
}
det.SetState(details.StateExtracting)
metadata, err := u.getPackageMetadata(archivePath)
if err != nil {
return nil, fmt.Errorf("reading metadata for elastic agent version %s package %q: %w", version, archivePath, err)
}
newVersion := extractAgentVersion(metadata, version)
if err := checkUpgrade(u.log, currentVersion, newVersion, metadata); err != nil {
return nil, fmt.Errorf("cannot upgrade the agent: %w", err)
}
u.log.Infow("Unpacking agent package", "version", newVersion)
// Nice to have: add check that no archive files end up in the current versioned home
// default to no flavor to avoid breaking behavior
// no default flavor, keep everything in case flavor is not specified
// in case of error fallback to keep-all
detectedFlavor, err := install.UsedFlavor(paths.Top(), "")
if err != nil {
u.log.Warnf("error encountered when detecting used flavor with top path %q: %w", paths.Top(), err)
}
u.log.Debugf("detected used flavor: %q", detectedFlavor)
unpackRes, err := u.unpack(version, archivePath, paths.Data(), detectedFlavor)
if err != nil {
return nil, err
}
newHash := unpackRes.Hash
if newHash == "" {
return nil, errors.New("unknown hash")
}
if unpackRes.VersionedHome == "" {
return nil, fmt.Errorf("versionedhome is empty: %v", unpackRes)
}
newHome := filepath.Join(paths.Top(), unpackRes.VersionedHome)
if err := copyActionStore(u.log, newHome); err != nil {
return nil, errors.New(err, "failed to copy action store")
}
newRunPath := filepath.Join(newHome, "run")
oldRunPath := filepath.Join(paths.Run())
if err := copyRunDirectory(u.log, oldRunPath, newRunPath); err != nil {
return nil, errors.New(err, "failed to copy run directory")
}
det.SetState(details.StateReplacing)
// create symlink to the <new versioned-home>/elastic-agent
hashedDir := unpackRes.VersionedHome
symlinkPath := filepath.Join(paths.Top(), agentName)
// paths.BinaryPath properly derives the binary directory depending on the platform. The path to the binary for macOS is inside of the app bundle.
newPath := paths.BinaryPath(filepath.Join(paths.Top(), hashedDir), agentName)
currentVersionedHome, err := filepath.Rel(paths.Top(), paths.Home())
if err != nil {
return nil, fmt.Errorf("calculating home path relative to top, home: %q top: %q : %w", paths.Home(), paths.Top(), err)
}
if err := changeSymlink(u.log, paths.Top(), symlinkPath, newPath); err != nil {
u.log.Errorw("Rolling back: changing symlink failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}
// We rotated the symlink successfully: prepare the current and previous agent installation details for the update marker
// In update marker the `current` agent install is the one where the symlink is pointing (the new one we didn't start yet)
// while the `previous` install is the currently executing elastic-agent that is no longer reachable via the symlink.
// After the restart at the end of the function, everything lines up correctly.
current := agentInstall{
parsedVersion: parsedVersion,
version: version,
hash: unpackRes.Hash,
versionedHome: unpackRes.VersionedHome,
}
previousParsedVersion := currentagtversion.GetParsedAgentPackageVersion()
previous := agentInstall{
parsedVersion: previousParsedVersion,
version: release.VersionWithSnapshot(),
hash: release.Commit(),
versionedHome: currentVersionedHome,
}
if err := markUpgrade(u.log,
paths.Data(), // data dir to place the marker in
current, // new agent version data
previous, // old agent version data
action, det); err != nil {
u.log.Errorw("Rolling back: marking upgrade failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}
watcherExecutable := selectWatcherExecutable(paths.Top(), previous, current)
var watcherCmd *exec.Cmd
if watcherCmd, err = InvokeWatcher(u.log, watcherExecutable); err != nil {
u.log.Errorw("Rolling back: starting watcher failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}
watcherWaitErr := waitForWatcher(ctx, u.log, markerFilePath(paths.Data()), watcherMaxWaitTime)
if watcherWaitErr != nil {
killWatcherErr := watcherCmd.Process.Kill()
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(watcherWaitErr, killWatcherErr, rollbackErr)
}
cb := shutdownCallback(u.log, paths.Home(), release.Version(), version, filepath.Join(paths.Top(), unpackRes.VersionedHome))
// Clean everything from the downloads dir
u.log.Infow("Removing downloads directory", "file.path", paths.Downloads())
err = os.RemoveAll(paths.Downloads())
if err != nil {
u.log.Errorw("Unable to clean downloads after update", "error.message", err, "file.path", paths.Downloads())
}
return cb, nil
}