func()

in gcs-fetcher/pkg/fetcher/fetcher.go [234:317]


func (gf *Fetcher) fetchObject(ctx context.Context, j job) *jobReport {
	report := &jobReport{job: j, started: time.Now()}
	defer func() {
		report.completed = time.Now()
	}()

	var tmpfile string
	var backoff time.Duration

	// Within a manifest, multiple files may have the same SHA. This can lead
	// to a race condition within the goworkers that are downloading the files
	// concurrently. To mitigate this issue, we add some randomness to the name
	// of the temp file being pulled.
	fuzz := rand.Intn(999999)

	for retrynum := 0; retrynum <= gf.Retries; retrynum++ {
		// Apply appropriate retry backoff.
		if retrynum > 0 {
			if retrynum == 1 {
				backoff = gf.Backoff
			} else {
				backoff *= 2
			}
			time.Sleep(backoff)
		}

		started := time.Now()

		// Download to temp location [DestDir]/[StagingDir]/[Bucket]-[Object]-[fuzz]-[retry]
		// If fetchObjectOnceWithTimeout() times out, this file will be orphaned and we can
		// clean it up later.
		tmpfile = filepath.Join(gf.StagingDir, fmt.Sprintf("%s-%s-%d-%d", j.bucket, j.object, fuzz, retrynum))
		if err := gf.ensureFolders(tmpfile); err != nil {
			e := fmt.Errorf("creating folders for temp file %q: %v", tmpfile, err)
			gf.recordFailure(j, started, noTimeout, e, report)
			continue
		}

		allowedGCSTimeout := gf.timeout(j.filename, retrynum)
		size, err := gf.fetchObjectOnceWithTimeout(ctx, j, allowedGCSTimeout, tmpfile)
		if err != nil {
			// Allow permissionError to bubble up.
			e := err
			if _, ok := err.(*permissionError); !ok {
				e = fmt.Errorf("fetching %q with timeout %v to temp file %q: %v", formatGCSName(j.bucket, j.object, j.generation), allowedGCSTimeout, tmpfile, err)
			}
			gf.recordFailure(j, started, allowedGCSTimeout, e, report)
			continue
		}

		// Rename the temp file to the final filename
		dest := gf.DestDir
		if j.destDirOverride != "" {
			dest = j.destDirOverride
		}
		finalname := filepath.Join(dest, j.filename)
		if err := gf.ensureFolders(finalname); err != nil {
			e := fmt.Errorf("creating folders for final file %q: %v", finalname, err)
			gf.recordFailure(j, started, noTimeout, e, report)
			continue
		}
		if err := gf.OS.Rename(tmpfile, finalname); err != nil {
			e := fmt.Errorf("renaming %q to %q: %v", tmpfile, finalname, err)
			gf.recordFailure(j, started, noTimeout, e, report)
			continue
		}

		// TODO(jasonco): make the posix attributes match the source
		// This will only work if the original upload sends the posix
		// attributes to GCS. For now, we'll just give the user full
		// access.
		mode := os.FileMode(0555)
		if err := gf.OS.Chmod(finalname, mode); err != nil {
			e := fmt.Errorf("chmod %q to %v: %v", finalname, mode, err)
			gf.recordFailure(j, started, noTimeout, e, report)
			continue
		}

		gf.recordSuccess(j, started, size, finalname, report)
		break // Success! No more retries needed.
	}

	return report
}