func()

in gcs-fetcher/pkg/fetcher/fetcher.go [459:531]


func (gf *Fetcher) processJobs(ctx context.Context, jobs []job) stats {
	workerCount := gf.WorkerCount
	if len(jobs) < workerCount {
		workerCount = len(jobs)
	}
	todo := make(chan job, workerCount)
	results := make(chan jobReport, workerCount)
	stats := stats{workers: workerCount, files: len(jobs), success: true}

	// Spin up our workers.
	var wg sync.WaitGroup
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go func() {
			gf.doWork(ctx, todo, results)
			wg.Done()
		}()
	}

	// Queue the jobs.
	started := time.Now()
	var qwg sync.WaitGroup
	qwg.Add(1)
	go func() {
		for _, j := range jobs {
			todo <- j
		}
		qwg.Done()
	}()

	// Consume the reports.
	failed := false
	hasPermissionError := false
	for n := 0; n < len(jobs); n++ {
		report := <-results
		if !report.success {
			if err, ok := report.err.(*permissionError); ok {
				hasPermissionError = true
				gf.logErr(err.Error())
			}
			failed = true
		}
		stats.size += report.size
		lastIndex := len(report.attempts) - 1
		stats.retries += lastIndex // First attempt is not considered a "retry".
		finalAttempt := report.attempts[lastIndex]
		stats.duration += finalAttempt.duration
		if finalAttempt.err != nil {
			stats.errs = append(stats.errs, finalAttempt.err)
		}
		for _, attempt := range report.attempts {
			if attempt.gcsTimeout > noTimeout {
				stats.gcsTimeouts++
			}
		}
	}
	qwg.Wait()
	close(results)
	close(todo)
	wg.Wait()

	if failed {
		stats.success = false
		gf.logErr("Failed to download at least one file. Cannot continue.")
		if hasPermissionError {
			os.Exit(permissionDeniedExitStatus)
		}
		os.Exit(errorExitStatus)
	}

	stats.duration = time.Since(started)
	return stats
}