func prefetchCache()

in tools/prefetch_cache_gcsfuse/prefetch.go [77:144]


func prefetchCache(cacheDir, bucketName, prefix string) (err error) {
	start := time.Now()
	filesAttempted := 0
	var filesDownloaded int32
	var wg sync.WaitGroup
	var downloadTasks = make(chan *storage.ObjectAttrs)

	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	// Should we set a higher timeout or let this be configurable by the user
	ctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	it := client.Bucket(bucketName).Objects(ctx, &storage.Query{
		Prefix: prefix,
	})

	// Concurrently download files from specified gcs bucket with optional prefix

	// Producer
	go func() {
		for {
			attrs, err := it.Next()
			if err == iterator.Done {
				break
			}
			if err != nil {
				log.Printf("Bucket(%q).Objects: %v", bucketName, err)
				break
			}
			filesAttempted++
			downloadTasks <- attrs
		}
		close(downloadTasks)
	}()

	// Consumers
	for i := 0; i < NUM_WORKERS; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			for attrs := range downloadTasks {
				log.Printf("Worker %d: downloading file %v", i, attrs.Name)
				err := downloadFile(ctx, client, attrs, cacheDir)
				if err != nil {
					log.Printf("prefetchCache: %v", err)
				} else {
					atomic.AddInt32(&filesDownloaded, 1)
				}
			}
		}(i)
	}

	// Wait for all goroutines downloading files to finish
	wg.Wait()

	elapsed := time.Since(start)
	log.Printf("Prefetch cache took %s", elapsed)
	log.Printf("Number of files downloaded successfully %v", filesDownloaded)
	log.Printf("Number of files attempted to download %v", filesAttempted)

	return
}