in tools/prefetch_cache_gcsfuse/prefetch.go [77:144]
func prefetchCache(cacheDir, bucketName, prefix string) (err error) {
start := time.Now()
filesAttempted := 0
var filesDownloaded int32
var wg sync.WaitGroup
var downloadTasks = make(chan *storage.ObjectAttrs)
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("storage.NewClient: %w", err)
}
defer client.Close()
// Should we set a higher timeout or let this be configurable by the user
ctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
it := client.Bucket(bucketName).Objects(ctx, &storage.Query{
Prefix: prefix,
})
// Concurrently download files from specified gcs bucket with optional prefix
// Producer
go func() {
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Printf("Bucket(%q).Objects: %v", bucketName, err)
break
}
filesAttempted++
downloadTasks <- attrs
}
close(downloadTasks)
}()
// Consumers
for i := 0; i < NUM_WORKERS; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for attrs := range downloadTasks {
log.Printf("Worker %d: downloading file %v", i, attrs.Name)
err := downloadFile(ctx, client, attrs, cacheDir)
if err != nil {
log.Printf("prefetchCache: %v", err)
} else {
atomic.AddInt32(&filesDownloaded, 1)
}
}
}(i)
}
// Wait for all goroutines downloading files to finish
wg.Wait()
elapsed := time.Since(start)
log.Printf("Prefetch cache took %s", elapsed)
log.Printf("Number of files downloaded successfully %v", filesDownloaded)
log.Printf("Number of files attempted to download %v", filesAttempted)
return
}