in distribution/push_v2.go [242:407]
func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
diffID := pd.DiffID()
pd.pushState.Lock()
if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
// it is already known that the push is not needed and
// therefore doing a stat is unnecessary
pd.pushState.Unlock()
progress.Update(progressOutput, pd.ID(), "Layer already exists")
return descriptor, nil
}
pd.pushState.Unlock()
// Do we have any metadata associated with this layer's DiffID?
v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
if err == nil {
descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
if err != nil {
progress.Update(progressOutput, pd.ID(), "Image push failed")
return distribution.Descriptor{}, retryOnError(err)
}
if exists {
progress.Update(progressOutput, pd.ID(), "Layer already exists")
pd.pushState.Lock()
pd.pushState.remoteLayers[diffID] = descriptor
pd.pushState.Unlock()
return descriptor, nil
}
}
logrus.Debugf("Pushing layer: %s", diffID)
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then push the blob.
bs := pd.repo.Blobs(ctx)
var layerUpload distribution.BlobWriter
mountAttemptsRemaining := 3
// Attempt to find another repository in the same registry to mount the layer
// from to avoid an unnecessary upload.
// Note: metadata is stored from oldest to newest, so we iterate through this
// slice in reverse to maximize our chances of the blob still existing in the
// remote repository.
for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- {
mountFrom := v2Metadata[i]
sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository)
if err != nil {
continue
}
if pd.repoInfo.Hostname() != sourceRepo.Hostname() {
// don't mount blobs from another registry
continue
}
namedRef, err := reference.WithName(mountFrom.SourceRepository)
if err != nil {
continue
}
// TODO (brianbland): We need to construct a reference where the Name is
// only the full remote name, so clean this up when distribution has a
// richer reference package
remoteRef, err := distreference.WithName(namedRef.RemoteName())
if err != nil {
continue
}
canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
if err != nil {
continue
}
logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName())
layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef))
switch err := err.(type) {
case distribution.ErrBlobMounted:
progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
err.Descriptor.MediaType = schema2.MediaTypeLayer
pd.pushState.Lock()
pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = err.Descriptor
pd.pushState.Unlock()
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}
return err.Descriptor, nil
case nil:
// blob upload session created successfully, so begin the upload
mountAttemptsRemaining = 0
default:
// unable to mount layer from this repository, so this source mapping is no longer valid
logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
pd.v2MetadataService.Remove(mountFrom)
mountAttemptsRemaining--
}
}
if layerUpload == nil {
layerUpload, err = bs.Create(ctx)
if err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
}
defer layerUpload.Close()
arch, err := pd.layer.TarStream()
if err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}
// don't care if this fails; best effort
size, _ := pd.layer.DiffSize()
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
compressedReader, compressionDone := compress(reader)
defer func() {
reader.Close()
<-compressionDone
}()
digester := digest.Canonical.New()
tee := io.TeeReader(compressedReader, digester.Hash())
nn, err := layerUpload.ReadFrom(tee)
compressedReader.Close()
if err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
pushDigest := digester.Digest()
if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
progress.Update(progressOutput, pd.ID(), "Pushed")
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}
pd.pushState.Lock()
// If Commit succeeded, that's an indication that the remote registry
// speaks the v2 protocol.
pd.pushState.confirmedV2 = true
descriptor := distribution.Descriptor{
Digest: pushDigest,
MediaType: schema2.MediaTypeLayer,
Size: nn,
}
pd.pushState.remoteLayers[diffID] = descriptor
pd.pushState.Unlock()
return descriptor, nil
}