func()

in proxy/proxyserver/prefetch.go [265:304]


func (ph *PrefetchHandler) downloadBlobs(input *prefetchInput) {
	var wg sync.WaitGroup
	var mu sync.Mutex
	var errList []error

	for _, b := range input.blobs {
		if ph.shouldSkipPrefetch(b, input.logger) {
			continue
		}

		wg.Add(1)
		go func(blob blobInfo) {
			defer wg.Done()
			blobStart := time.Now()
			err := ph.clusterClient.DownloadBlob(context.Background(), input.namespace, blob.digest, io.Discard)
			blobDuration := time.Since(blobStart)
			ph.metrics.Timer("blob_download_time").Record(blobDuration)
			ph.metrics.Counter("bytes_downloaded").Inc(blob.size)
			if err != nil {
				if serr, ok := err.(httputil.StatusError); ok && serr.Status == http.StatusAccepted {
					return
				}
				mu.Lock()
				errList = append(errList, fmt.Errorf("digest %s, namespace %s, error downloading blob: %w", blob.digest, input.namespace, err))
				mu.Unlock()
			} else {
				ph.metrics.Counter("blobs_downloaded").Inc(1)
			}
		}(b)
	}

	wg.Wait()

	if len(errList) > 0 {
		ph.metrics.Counter("failed").Inc(1)
		for _, err := range errList {
			input.logger.With("error", err).Error("Error downloading blob")
		}
	}
}