in gcs-fetcher/pkg/fetcher/fetcher.go [234:317]
func (gf *Fetcher) fetchObject(ctx context.Context, j job) *jobReport {
report := &jobReport{job: j, started: time.Now()}
defer func() {
report.completed = time.Now()
}()
var tmpfile string
var backoff time.Duration
// Within a manifest, multiple files may have the same SHA. This can lead
// to a race condition within the goworkers that are downloading the files
// concurrently. To mitigate this issue, we add some randomness to the name
// of the temp file being pulled.
fuzz := rand.Intn(999999)
for retrynum := 0; retrynum <= gf.Retries; retrynum++ {
// Apply appropriate retry backoff.
if retrynum > 0 {
if retrynum == 1 {
backoff = gf.Backoff
} else {
backoff *= 2
}
time.Sleep(backoff)
}
started := time.Now()
// Download to temp location [DestDir]/[StagingDir]/[Bucket]-[Object]-[fuzz]-[retry]
// If fetchObjectOnceWithTimeout() times out, this file will be orphaned and we can
// clean it up later.
tmpfile = filepath.Join(gf.StagingDir, fmt.Sprintf("%s-%s-%d-%d", j.bucket, j.object, fuzz, retrynum))
if err := gf.ensureFolders(tmpfile); err != nil {
e := fmt.Errorf("creating folders for temp file %q: %v", tmpfile, err)
gf.recordFailure(j, started, noTimeout, e, report)
continue
}
allowedGCSTimeout := gf.timeout(j.filename, retrynum)
size, err := gf.fetchObjectOnceWithTimeout(ctx, j, allowedGCSTimeout, tmpfile)
if err != nil {
// Allow permissionError to bubble up.
e := err
if _, ok := err.(*permissionError); !ok {
e = fmt.Errorf("fetching %q with timeout %v to temp file %q: %v", formatGCSName(j.bucket, j.object, j.generation), allowedGCSTimeout, tmpfile, err)
}
gf.recordFailure(j, started, allowedGCSTimeout, e, report)
continue
}
// Rename the temp file to the final filename
dest := gf.DestDir
if j.destDirOverride != "" {
dest = j.destDirOverride
}
finalname := filepath.Join(dest, j.filename)
if err := gf.ensureFolders(finalname); err != nil {
e := fmt.Errorf("creating folders for final file %q: %v", finalname, err)
gf.recordFailure(j, started, noTimeout, e, report)
continue
}
if err := gf.OS.Rename(tmpfile, finalname); err != nil {
e := fmt.Errorf("renaming %q to %q: %v", tmpfile, finalname, err)
gf.recordFailure(j, started, noTimeout, e, report)
continue
}
// TODO(jasonco): make the posix attributes match the source
// This will only work if the original upload sends the posix
// attributes to GCS. For now, we'll just give the user full
// access.
mode := os.FileMode(0555)
if err := gf.OS.Chmod(finalname, mode); err != nil {
e := fmt.Errorf("chmod %q to %v: %v", finalname, mode, err)
gf.recordFailure(j, started, noTimeout, e, report)
continue
}
gf.recordSuccess(j, started, size, finalname, report)
break // Success! No more retries needed.
}
return report
}