internal/pkg/agent/application/upgrade/step_download.go (198 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package upgrade import ( "context" "fmt" "net/url" "os" "strings" "time" "github.com/cenkalti/backoff/v4" "go.elastic.co/apm/v2" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/composed" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/fs" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/http" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/localremote" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download/snapshot" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" agtversion "github.com/elastic/elastic-agent/pkg/version" ) const ( defaultUpgradeFallbackPGP = "https://artifacts.elastic.co/GPG-KEY-elastic-agent" fleetUpgradeFallbackPGPFormat = "/api/agents/upgrades/%d.%d.%d/pgp-public-key" ) type downloaderFactory func(*agtversion.ParsedSemVer, *logger.Logger, *artifact.Config, *details.Details) (download.Downloader, error) type downloader func(context.Context, downloaderFactory, *agtversion.ParsedSemVer, *artifact.Config, *details.Details) (string, error) func (u *Upgrader) downloadArtifact(ctx context.Context, parsedVersion *agtversion.ParsedSemVer, sourceURI string, upgradeDetails *details.Details, skipVerifyOverride, skipDefaultPgp bool, pgpBytes ...string) (_ string, err error) { span, ctx := apm.StartSpan(ctx, "downloadArtifact", "app.internal") defer func() { apm.CaptureError(ctx, err).Send() span.End() }() pgpBytes = u.appendFallbackPGP(parsedVersion, pgpBytes) // do not update source config settings := *u.settings var downloaderFunc downloader var factory downloaderFactory var verifier download.Verifier if sourceURI != "" { if strings.HasPrefix(sourceURI, "file://") { // update the DropPath so the fs.Downloader can download from this // path instead of looking into the installed downloads directory settings.DropPath = strings.TrimPrefix(sourceURI, "file://") // use specific function that doesn't perform retries on download as its // local and no retry should be performed downloaderFunc = u.downloadOnce // set specific downloader, local file just uses the fs.NewDownloader // no fallback is allowed because it was requested that this specific source be used factory = func(ver *agtversion.ParsedSemVer, l *logger.Logger, config *artifact.Config, d *details.Details) (download.Downloader, error) { return fs.NewDownloader(config), nil } // set specific verifier, local file verifies locally only verifier, err = fs.NewVerifier(u.log, &settings, release.PGP()) if err != nil { return "", errors.New(err, "initiating verifier") } // log that a local upgrade artifact is being used u.log.Infow("Using local upgrade artifact", "version", parsedVersion, "drop_path", settings.DropPath, "target_path", settings.TargetDirectory, "install_path", settings.InstallPath) } else { settings.SourceURI = sourceURI } } if factory == nil { // set the factory to the newDownloader factory factory = newDownloader u.log.Infow("Downloading upgrade artifact", "version", parsedVersion, "source_uri", settings.SourceURI, "drop_path", settings.DropPath, "target_path", settings.TargetDirectory, "install_path", settings.InstallPath) } if downloaderFunc == nil { downloaderFunc = u.downloadWithRetries } if err := os.MkdirAll(paths.Downloads(), 0750); err != nil { return "", errors.New(err, fmt.Sprintf("failed to create download directory at %s", paths.Downloads())) } path, err := downloaderFunc(ctx, factory, parsedVersion, &settings, upgradeDetails) if err != nil { return "", errors.New(err, "failed download of agent binary") } if skipVerifyOverride { return path, nil } if verifier == nil { verifier, err = newVerifier(parsedVersion, u.log, &settings) if err != nil { return "", errors.New(err, "initiating verifier") } } if err := verifier.Verify(ctx, agentArtifact, *parsedVersion, skipDefaultPgp, pgpBytes...); err != nil { return "", errors.New(err, "failed verification of agent binary") } return path, nil } func (u *Upgrader) appendFallbackPGP(targetVersion *agtversion.ParsedSemVer, pgpBytes []string) []string { if pgpBytes == nil { pgpBytes = make([]string, 0, 1) } fallbackPGP := download.PgpSourceURIPrefix + defaultUpgradeFallbackPGP pgpBytes = append(pgpBytes, fallbackPGP) // add a secondary fallback if fleet server is configured u.log.Debugf("Considering fleet server uri for pgp check fallback %q", u.fleetServerURI) if u.fleetServerURI != "" { secondaryPath, err := url.JoinPath( u.fleetServerURI, fmt.Sprintf(fleetUpgradeFallbackPGPFormat, targetVersion.Major(), targetVersion.Minor(), targetVersion.Patch()), ) if err != nil { u.log.Warnf("failed to compose Fleet Server URI: %v", err) } else { secondaryFallback := download.PgpSourceURIPrefix + secondaryPath pgpBytes = append(pgpBytes, secondaryFallback) } } return pgpBytes } func newDownloader(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config, upgradeDetails *details.Details) (download.Downloader, error) { if !version.IsSnapshot() { return localremote.NewDownloader(log, settings, upgradeDetails) } // TODO since we know if it's a snapshot or not, shouldn't we add EITHER the snapshot downloader OR the release one ? // try snapshot repo before official snapDownloader, err := snapshot.NewDownloader(log, settings, version, upgradeDetails) if err != nil { return nil, err } httpDownloader, err := http.NewDownloader(log, settings, upgradeDetails) if err != nil { return nil, err } return composed.NewDownloader(fs.NewDownloader(settings), snapDownloader, httpDownloader), nil } func newVerifier(version *agtversion.ParsedSemVer, log *logger.Logger, settings *artifact.Config) (download.Verifier, error) { pgp := release.PGP() if !version.IsSnapshot() { return localremote.NewVerifier(log, settings, pgp) } fsVerifier, err := fs.NewVerifier(log, settings, pgp) if err != nil { return nil, err } snapshotVerifier, err := snapshot.NewVerifier(log, settings, pgp, version) if err != nil { return nil, err } remoteVerifier, err := http.NewVerifier(log, settings, pgp) if err != nil { return nil, err } return composed.NewVerifier(log, fsVerifier, snapshotVerifier, remoteVerifier), nil } func (u *Upgrader) downloadOnce( ctx context.Context, factory downloaderFactory, version *agtversion.ParsedSemVer, settings *artifact.Config, upgradeDetails *details.Details, ) (string, error) { downloader, err := factory(version, u.log, settings, upgradeDetails) if err != nil { return "", fmt.Errorf("unable to create fetcher: %w", err) } // All download artifacts expect a name that includes <major>.<minor.<patch>[-SNAPSHOT] so we have to // make sure not to include build metadata we might have in the parsed version (for snapshots we already // used that to configure the URL we download the files from) path, err := downloader.Download(ctx, agentArtifact, version) if err != nil { return "", fmt.Errorf("unable to download package: %w", err) } // Download successful return path, nil } func (u *Upgrader) downloadWithRetries( ctx context.Context, factory downloaderFactory, version *agtversion.ParsedSemVer, settings *artifact.Config, upgradeDetails *details.Details, ) (string, error) { cancelDeadline := time.Now().Add(settings.Timeout) cancelCtx, cancel := context.WithDeadline(ctx, cancelDeadline) defer cancel() upgradeDetails.SetRetryUntil(&cancelDeadline) expBo := backoff.NewExponentialBackOff() expBo.InitialInterval = settings.RetrySleepInitDuration boCtx := backoff.WithContext(expBo, cancelCtx) var path string var attempt uint opFn := func() error { attempt++ u.log.Infof("download attempt %d", attempt) var err error path, err = u.downloadOnce(cancelCtx, factory, version, settings, upgradeDetails) if err != nil { return err } return nil } opFailureNotificationFn := func(err error, retryAfter time.Duration) { u.log.Warnf("download attempt %d failed: %s; retrying in %s.", attempt, err.Error(), retryAfter) upgradeDetails.SetRetryableError(err) } if err := backoff.RetryNotify(opFn, boCtx, opFailureNotificationFn); err != nil { return "", err } // Clear retry details upon success upgradeDetails.SetRetryableError(nil) upgradeDetails.SetRetryUntil(nil) return path, nil }