in gcs-fetcher/pkg/fetcher/fetcher.go [459:531]
func (gf *Fetcher) processJobs(ctx context.Context, jobs []job) stats {
workerCount := gf.WorkerCount
if len(jobs) < workerCount {
workerCount = len(jobs)
}
todo := make(chan job, workerCount)
results := make(chan jobReport, workerCount)
stats := stats{workers: workerCount, files: len(jobs), success: true}
// Spin up our workers.
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
gf.doWork(ctx, todo, results)
wg.Done()
}()
}
// Queue the jobs.
started := time.Now()
var qwg sync.WaitGroup
qwg.Add(1)
go func() {
for _, j := range jobs {
todo <- j
}
qwg.Done()
}()
// Consume the reports.
failed := false
hasPermissionError := false
for n := 0; n < len(jobs); n++ {
report := <-results
if !report.success {
if err, ok := report.err.(*permissionError); ok {
hasPermissionError = true
gf.logErr(err.Error())
}
failed = true
}
stats.size += report.size
lastIndex := len(report.attempts) - 1
stats.retries += lastIndex // First attempt is not considered a "retry".
finalAttempt := report.attempts[lastIndex]
stats.duration += finalAttempt.duration
if finalAttempt.err != nil {
stats.errs = append(stats.errs, finalAttempt.err)
}
for _, attempt := range report.attempts {
if attempt.gcsTimeout > noTimeout {
stats.gcsTimeouts++
}
}
}
qwg.Wait()
close(results)
close(todo)
wg.Wait()
if failed {
stats.success = false
gf.logErr("Failed to download at least one file. Cannot continue.")
if hasPermissionError {
os.Exit(permissionDeniedExitStatus)
}
os.Exit(errorExitStatus)
}
stats.duration = time.Since(started)
return stats
}