func()

in gcs-fetcher/pkg/fetcher/fetcher.go [558:668]


func (gf *Fetcher) fetchFromManifest(ctx context.Context) (err error) {
	started := time.Now()
	gf.log("Fetching manifest %s.", formatGCSName(gf.Bucket, gf.Object, gf.Generation))

	// Download the manifest file from GCS.
	manifestDir := gf.StagingDir
	j := job{
		filename:        gf.Object,
		bucket:          gf.Bucket,
		object:          gf.Object,
		generation:      gf.Generation,
		destDirOverride: manifestDir,
	}
	// Override the retry/backoff to span an up-to-11 second eventual consistency
	// issue on new project creation. We'll only do this for the first file
	// (the manifest), and then drop back to the original retry/backoff.
	oretries, obackoff := gf.Retries, gf.Backoff
	gf.Retries, gf.Backoff = 6, 1*time.Second // Yields 1s, 2s, 4s, 8s, 16s
	report := gf.fetchObject(ctx, j)
	gf.Retries, gf.Backoff = oretries, obackoff
	if !report.success {
		if err, ok := report.err.(*permissionError); ok {
			gf.logErr(err.Error())
			os.Exit(permissionDeniedExitStatus)
		}
		return fmt.Errorf("failed to download manifest %s: %v", formatGCSName(gf.Bucket, gf.Object, gf.Generation), report.err)
	}

	// Decode the JSON manifest
	manifestFile := filepath.Join(manifestDir, j.filename)
	r, err := gf.OS.Open(manifestFile)
	if err != nil {
		return fmt.Errorf("opening manifest file %q: %v", manifestFile, err)
	}
	defer func() {
		if cerr := r.Close(); cerr != nil {
			err = fmt.Errorf("Failed to close file %q: %v", manifestFile, cerr)
		}
	}()
	var files map[string]common.ManifestItem
	if err := json.NewDecoder(r).Decode(&files); err != nil {
		return fmt.Errorf("decoding JSON from manifest file %q: %v", manifestFile, err)
	}

	// Create the jobs
	var jobs []job
	for filename, info := range files {
		bucket, object, generation, err := common.ParseBucketObject(info.SourceURL)
		if err != nil {
			return fmt.Errorf("parsing bucket/object from %q: %v", info.SourceURL, err)
		}
		j := job{
			filename:   filename,
			bucket:     bucket,
			object:     object,
			generation: generation,
			sha1sum:    info.Sha1Sum,
		}
		jobs = append(jobs, j)
	}

	gf.log("Processing %v files.", len(jobs))
	stats := gf.processJobs(ctx, jobs)

	// Final cleanup of failed downloads. We won't miss any files; these vestiges
	// are from go routines that have timed out and would otherwise check their
	// circuit breaker and die. However, we won't wait for these remaining
	// go routines to finish because out goal is to get done as fast as possible!
	if err := gf.OS.RemoveAll(gf.StagingDir); err != nil {
		gf.log("Failed to remove staging dir %v, continuing: %v", gf.StagingDir, err)
	}

	// Emit final stats.
	mib := float64(stats.size) / 1024 / 1024
	var mibps float64
	if stats.duration > 0 {
		mibps = mib / stats.duration.Seconds()
	}
	manifestDuration := report.attempts[len(report.attempts)-1].duration
	status := "SUCCESS"
	if !stats.success {
		status = "FAILURE"
	}
	gf.log("******************************************************")
	gf.log("Status:                      %s", status)
	gf.log("Started:                     %s", started.Format(time.RFC3339))
	gf.log("Completed:                   %s", time.Now().Format(time.RFC3339))
	gf.log("Requested workers: %6d", gf.WorkerCount)
	gf.log("Actual workers:    %6d", stats.workers)
	gf.log("Total files:       %6d", stats.files)
	gf.log("Total retries:     %6d", stats.retries)
	if gf.TimeoutGCS {
		gf.log("GCS timeouts:      %6d", stats.gcsTimeouts)
	}
	gf.log("MiB downloaded:    %9.2f MiB", mib)
	gf.log("MiB/s throughput:  %9.2f MiB/s", mibps)

	gf.log("Time for manifest: %9.2f ms", float64(manifestDuration)/float64(time.Millisecond))
	gf.log("Total time:        %9.2f s", time.Since(started).Seconds())
	gf.log("******************************************************")

	if len(stats.errs) > 0 {
		var es []string
		es = append(es, fmt.Sprintf("Errors (%d):", len(stats.errs)))
		for _, e := range stats.errs {
			es = append(es, fmt.Sprintf(" - %s", e))
		}
		return fmt.Errorf(strings.Join(es, "\n"))
	}
	return nil
}