in gcs-fetcher/pkg/fetcher/fetcher.go [558:668]
func (gf *Fetcher) fetchFromManifest(ctx context.Context) (err error) {
started := time.Now()
gf.log("Fetching manifest %s.", formatGCSName(gf.Bucket, gf.Object, gf.Generation))
// Download the manifest file from GCS.
manifestDir := gf.StagingDir
j := job{
filename: gf.Object,
bucket: gf.Bucket,
object: gf.Object,
generation: gf.Generation,
destDirOverride: manifestDir,
}
// Override the retry/backoff to span an up-to-11 second eventual consistency
// issue on new project creation. We'll only do this for the first file
// (the manifest), and then drop back to the original retry/backoff.
oretries, obackoff := gf.Retries, gf.Backoff
gf.Retries, gf.Backoff = 6, 1*time.Second // Yields 1s, 2s, 4s, 8s, 16s
report := gf.fetchObject(ctx, j)
gf.Retries, gf.Backoff = oretries, obackoff
if !report.success {
if err, ok := report.err.(*permissionError); ok {
gf.logErr(err.Error())
os.Exit(permissionDeniedExitStatus)
}
return fmt.Errorf("failed to download manifest %s: %v", formatGCSName(gf.Bucket, gf.Object, gf.Generation), report.err)
}
// Decode the JSON manifest
manifestFile := filepath.Join(manifestDir, j.filename)
r, err := gf.OS.Open(manifestFile)
if err != nil {
return fmt.Errorf("opening manifest file %q: %v", manifestFile, err)
}
defer func() {
if cerr := r.Close(); cerr != nil {
err = fmt.Errorf("Failed to close file %q: %v", manifestFile, cerr)
}
}()
var files map[string]common.ManifestItem
if err := json.NewDecoder(r).Decode(&files); err != nil {
return fmt.Errorf("decoding JSON from manifest file %q: %v", manifestFile, err)
}
// Create the jobs
var jobs []job
for filename, info := range files {
bucket, object, generation, err := common.ParseBucketObject(info.SourceURL)
if err != nil {
return fmt.Errorf("parsing bucket/object from %q: %v", info.SourceURL, err)
}
j := job{
filename: filename,
bucket: bucket,
object: object,
generation: generation,
sha1sum: info.Sha1Sum,
}
jobs = append(jobs, j)
}
gf.log("Processing %v files.", len(jobs))
stats := gf.processJobs(ctx, jobs)
// Final cleanup of failed downloads. We won't miss any files; these vestiges
// are from go routines that have timed out and would otherwise check their
// circuit breaker and die. However, we won't wait for these remaining
// go routines to finish because out goal is to get done as fast as possible!
if err := gf.OS.RemoveAll(gf.StagingDir); err != nil {
gf.log("Failed to remove staging dir %v, continuing: %v", gf.StagingDir, err)
}
// Emit final stats.
mib := float64(stats.size) / 1024 / 1024
var mibps float64
if stats.duration > 0 {
mibps = mib / stats.duration.Seconds()
}
manifestDuration := report.attempts[len(report.attempts)-1].duration
status := "SUCCESS"
if !stats.success {
status = "FAILURE"
}
gf.log("******************************************************")
gf.log("Status: %s", status)
gf.log("Started: %s", started.Format(time.RFC3339))
gf.log("Completed: %s", time.Now().Format(time.RFC3339))
gf.log("Requested workers: %6d", gf.WorkerCount)
gf.log("Actual workers: %6d", stats.workers)
gf.log("Total files: %6d", stats.files)
gf.log("Total retries: %6d", stats.retries)
if gf.TimeoutGCS {
gf.log("GCS timeouts: %6d", stats.gcsTimeouts)
}
gf.log("MiB downloaded: %9.2f MiB", mib)
gf.log("MiB/s throughput: %9.2f MiB/s", mibps)
gf.log("Time for manifest: %9.2f ms", float64(manifestDuration)/float64(time.Millisecond))
gf.log("Total time: %9.2f s", time.Since(started).Seconds())
gf.log("******************************************************")
if len(stats.errs) > 0 {
var es []string
es = append(es, fmt.Sprintf("Errors (%d):", len(stats.errs)))
for _, e := range stats.errs {
es = append(es, fmt.Sprintf(" - %s", e))
}
return fmt.Errorf(strings.Join(es, "\n"))
}
return nil
}