in lib/dockerregistry/transfer/rw_transferer.go [129:174]
func (t *ReadWriteTransferer) downloadFromOrigin(ctx context.Context, namespace string, d core.Digest) (store.FileReader, error) {
ctx, span := t.tracer.Start(ctx, "registry.download_from_origin",
trace.WithAttributes(
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
),
)
defer span.End()
tmp := fmt.Sprintf("%s.%s", d.Hex(), uuid.Generate().String())
if err := t.cas.CreateUploadFile(tmp, 0); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to create upload file")
return nil, fmt.Errorf("create upload file: %s", err)
}
w, err := t.cas.GetUploadFileReadWriter(tmp)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to get upload writer")
return nil, fmt.Errorf("get upload writer: %s", err)
}
defer closers.Close(w)
if err := t.originCluster.DownloadBlob(ctx, namespace, d, w); err != nil {
if err == blobclient.ErrBlobNotFound {
span.SetStatus(codes.Error, "blob not found")
return nil, ErrBlobNotFound
}
span.RecordError(err)
span.SetStatus(codes.Error, "origin download failed")
return nil, fmt.Errorf("origin: %s", err)
}
if err := t.cas.MoveUploadFileToCache(tmp, d.Hex()); err != nil && !os.IsExist(err) {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to move to cache")
return nil, fmt.Errorf("move upload file to cache: %s", err)
}
blob, err := t.cas.GetCacheFileReader(d.Hex())
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to read cached blob")
return nil, fmt.Errorf("get cache file: %s", err)
}
span.SetStatus(codes.Ok, "download completed")
return blob, nil
}