in proxy/proxyserver/prefetch.go [265:304]
func (ph *PrefetchHandler) downloadBlobs(input *prefetchInput) {
var wg sync.WaitGroup
var mu sync.Mutex
var errList []error
for _, b := range input.blobs {
if ph.shouldSkipPrefetch(b, input.logger) {
continue
}
wg.Add(1)
go func(blob blobInfo) {
defer wg.Done()
blobStart := time.Now()
err := ph.clusterClient.DownloadBlob(context.Background(), input.namespace, blob.digest, io.Discard)
blobDuration := time.Since(blobStart)
ph.metrics.Timer("blob_download_time").Record(blobDuration)
ph.metrics.Counter("bytes_downloaded").Inc(blob.size)
if err != nil {
if serr, ok := err.(httputil.StatusError); ok && serr.Status == http.StatusAccepted {
return
}
mu.Lock()
errList = append(errList, fmt.Errorf("digest %s, namespace %s, error downloading blob: %w", blob.digest, input.namespace, err))
mu.Unlock()
} else {
ph.metrics.Counter("blobs_downloaded").Inc(1)
}
}(b)
}
wg.Wait()
if len(errList) > 0 {
ph.metrics.Counter("failed").Inc(1)
for _, err := range errList {
input.logger.With("error", err).Error("Error downloading blob")
}
}
}