registry/handlers/blobupload.go (396 lines of code) (raw):
package handlers
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"github.com/docker/distribution"
"github.com/docker/distribution/log"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/api/errcode"
v2 "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/datastore"
"github.com/docker/distribution/registry/datastore/models"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers"
"github.com/opencontainers/go-digest"
)
// blobUploadDispatcher constructs and returns the blob upload handler for the
// given request context.
func blobUploadDispatcher(ctx *Context, _ *http.Request) http.Handler {
buh := &blobUploadHandler{
Context: ctx,
UUID: getUploadUUID(ctx),
}
handler := handlers.MethodHandler{
http.MethodGet: http.HandlerFunc(buh.HandleGetUploadStatus),
http.MethodHead: http.HandlerFunc(buh.HandleGetUploadStatus),
}
if !ctx.readOnly {
handler[http.MethodPost] = http.HandlerFunc(buh.StartBlobUpload)
handler[http.MethodPatch] = http.HandlerFunc(buh.PatchBlobData)
handler[http.MethodPut] = http.HandlerFunc(buh.PutBlobUploadComplete)
handler[http.MethodDelete] = http.HandlerFunc(buh.CancelBlobUpload)
}
return buh.validateUpload(handler)
}
func (buh *blobUploadHandler) validateUpload(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if buh.UUID != "" {
h := buh.ResumeBlobUpload(buh.Context, r)
if h == nil {
h = closeResources(handler, buh.Upload)
}
checkOngoingRename(h, buh.Context).ServeHTTP(w, r)
return
}
checkOngoingRename(handler, buh.Context).ServeHTTP(w, r)
})
}
// blobUploadHandler handles the http blob upload process.
type blobUploadHandler struct {
*Context
// UUID identifies the upload instance for the current request. Using UUID
// to key blob writers since this implementation uses UUIDs.
UUID string
Upload distribution.BlobWriter
State blobUploadState
}
func dbMountBlob(ctx context.Context, rStore datastore.RepositoryStore, fromRepoPath, toRepoPath string, d digest.Digest) error {
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"source": fromRepoPath,
"destination": toRepoPath,
"digest": d,
})
l.Debug("cross repository blob mounting")
// find source blob from source repository
b, err := dbFindRepositoryBlob(ctx, rStore, distribution.Descriptor{Digest: d}, fromRepoPath)
if err != nil {
return err
}
destRepo, err := rStore.CreateOrFindByPath(ctx, toRepoPath)
if err != nil {
return err
}
// link blob (does nothing if already linked)
return rStore.LinkBlob(ctx, destRepo, b.Digest)
}
// StartBlobUpload begins the blob upload process and allocates a server-side
// blob writer session, optionally mounting the blob from a separate repository.
func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) {
var options []distribution.BlobCreateOption
var rStore datastore.RepositoryStore
if buh.useDatabase {
var opts []datastore.RepositoryStoreOption
if buh.GetRepoCache() != nil {
opts = append(opts, datastore.WithRepositoryCache(buh.GetRepoCache()))
}
rStore = datastore.NewRepositoryStore(buh.db.Primary(), opts...)
}
fromRepo := r.FormValue("from")
mountDigest := r.FormValue("mount")
if mountDigest != "" && fromRepo != "" {
opt, err := buh.createBlobMountOption(fromRepo, mountDigest, rStore)
if opt != nil && err == nil {
options = append(options, opt)
}
}
blobs := buh.Repository.Blobs(buh)
upload, err := blobs.Create(buh, options...)
// nolint: revive // max-control-nesting
if err != nil {
var ebm distribution.ErrBlobMounted
switch {
case errors.As(err, &ebm):
if buh.useDatabase {
errIn := dbMountBlob(buh.Context, rStore, ebm.From.Name(), buh.Repository.Named().Name(), ebm.Descriptor.Digest)
if errIn != nil {
e := fmt.Errorf("failed to mount blob in database: %w", errIn)
buh.Errors = append(buh.Errors, errcode.FromUnknownError(e))
return
}
}
errIn := buh.writeBlobCreatedHeaders(w, ebm.Descriptor)
if errIn != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(errIn))
}
case errors.Is(err, distribution.ErrUnsupported):
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported)
default:
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
buh.Upload = upload
if err := buh.blobUploadResponse(w); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.Header().Set("Docker-Upload-UUID", buh.Upload.ID())
w.WriteHeader(http.StatusAccepted)
}
// HandleGetUploadStatus returns the status of a given upload, identified by id.
func (buh *blobUploadHandler) HandleGetUploadStatus(w http.ResponseWriter, _ *http.Request) {
if buh.Upload == nil {
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadUnknown)
return
}
if err := buh.blobUploadResponse(w); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.Header().Set("Docker-Upload-UUID", buh.UUID)
w.WriteHeader(http.StatusNoContent)
}
// PatchBlobData writes data to an upload.
func (buh *blobUploadHandler) PatchBlobData(w http.ResponseWriter, r *http.Request) {
if buh.Upload == nil {
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadUnknown)
return
}
ct := r.Header.Get("Content-Type")
if ct != "" && ct != "application/octet-stream" {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(fmt.Errorf("bad Content-Type")))
return
}
chunkRange := r.Header.Get("Content-Range")
if chunkRange != "" {
startRange, _, err := parseContentRange(chunkRange)
if err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error()))
return
}
// chunks MUST be uploaded in order, with the first byte of a chunk
// being the last chunk's end offset + 1 (which is equivalent to ` buh.State.Offset`)
if startRange != buh.State.Offset {
buh.Errors = append(buh.Errors, v2.ErrorCodeInvalidContentRange)
return
}
}
if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PATCH"); err != nil {
buh.Errors = append(buh.Errors, errcode.FromUnknownError(err))
return
}
if err := buh.blobUploadResponse(w); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.WriteHeader(http.StatusAccepted)
}
func dbPutBlobUploadComplete(ctx context.Context, db *datastore.DB, repoPath string, desc distribution.Descriptor, repoCache datastore.RepositoryCache) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginning database transaction: %w", err)
}
defer tx.Rollback()
// create or find blob
bs := datastore.NewBlobStore(tx)
b := &models.Blob{
MediaType: desc.MediaType,
Digest: desc.Digest,
Size: desc.Size,
}
if err := bs.CreateOrFind(ctx, b); err != nil {
return err
}
// create or find repository
var opts []datastore.RepositoryStoreOption
if repoCache != nil {
opts = append(opts, datastore.WithRepositoryCache(repoCache))
}
rStore := datastore.NewRepositoryStore(tx, opts...)
r, err := rStore.CreateOrFindByPath(ctx, repoPath)
if err != nil {
return err
}
// link blob to repository
if err := rStore.LinkBlob(ctx, r, b.Digest); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing database transaction: %w", err)
}
return nil
}
// PutBlobUploadComplete takes the final request of a blob upload. The
// request may include all the blob data or no blob data. Any data
// provided is received and verified. If successful, the blob is linked
// into the blob store and 201 Created is returned with the canonical
// url of the blob.
func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *http.Request) {
if buh.Upload == nil {
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadUnknown)
return
}
dgstStr := r.FormValue("digest")
if dgstStr == "" {
// no digest? return error, but allow retry.
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail("digest missing"))
return
}
dgst, err := digest.Parse(dgstStr)
if err != nil {
// no digest? return error, but allow retry.
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail("digest parsing failed"))
return
}
if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PUT"); err != nil {
buh.Errors = append(buh.Errors, errcode.FromUnknownError(err))
return
}
desc, err := buh.Upload.Commit(buh, distribution.Descriptor{
Digest: dgst,
})
l := log.GetLogger(log.WithContext(buh))
if err != nil {
switch err := err.(type) {
case distribution.ErrBlobInvalidDigest:
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
case errcode.Error:
buh.Errors = append(buh.Errors, err)
default:
switch err {
case distribution.ErrAccessDenied:
buh.Errors = append(buh.Errors, errcode.ErrorCodeDenied)
case distribution.ErrUnsupported:
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported)
case distribution.ErrBlobInvalidLength, distribution.ErrBlobDigestUnsupported:
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadInvalid.WithDetail(err))
default:
l.WithError(err).Error("unknown error completing upload")
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
}
// Clean up the backend blob data if there was an error.
if err := buh.Upload.Cancel(buh); err != nil {
// If the cleanup fails, all we can do is observe and report.
l.WithError(err).Error("error canceling upload after error")
}
return
}
if buh.useDatabase {
if err := dbPutBlobUploadComplete(buh.Context, buh.db.Primary(), buh.Repository.Named().Name(), desc, buh.GetRepoCache()); err != nil {
e := fmt.Errorf("failed to create blob in database: %w", err)
buh.Errors = append(buh.Errors, errcode.FromUnknownError(e))
return
}
}
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
l.WithFields(log.Fields{
"media_type": desc.MediaType,
"size_bytes": desc.Size,
"digest": desc.Digest,
}).Info("blob uploaded")
}
// CancelBlobUpload cancels an in-progress upload of a blob.
func (buh *blobUploadHandler) CancelBlobUpload(w http.ResponseWriter, _ *http.Request) {
if buh.Upload == nil {
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadUnknown)
return
}
w.Header().Set("Docker-Upload-UUID", buh.UUID)
if err := buh.Upload.Cancel(buh); err != nil {
log.GetLogger(log.WithContext(buh)).WithError(err).Error("error encountered canceling upload")
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
w.WriteHeader(http.StatusNoContent)
}
func (buh *blobUploadHandler) ResumeBlobUpload(ctx *Context, r *http.Request) http.Handler {
state, err := hmacKey(ctx.Config.HTTP.Secret).unpackUploadState(r.FormValue("_state"))
if err != nil {
return http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
log.GetLogger(log.WithContext(ctx)).WithError(err).Info("error resolving upload")
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadInvalid.WithDetail(err))
})
}
buh.State = state
if state.Name != ctx.Repository.Named().Name() {
return http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"state_name": state.Name,
"repository": buh.Repository.Named().Name(),
}).Info("mismatched repository name in upload state")
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadInvalid.WithDetail(err))
})
}
if state.UUID != buh.UUID {
return http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"state_uuid": state.UUID,
"upload_uuid": buh.UUID,
}).Info("mismatched uuid in upload state")
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadInvalid.WithDetail(err))
})
}
blobs := ctx.Repository.Blobs(buh)
upload, err := blobs.Resume(buh, buh.UUID)
if err != nil {
log.GetLogger(log.WithContext(ctx)).WithError(err).Error("error resolving upload")
if err == distribution.ErrBlobUploadUnknown {
return http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadUnknown.WithDetail(err))
})
}
return http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
})
}
buh.Upload = upload
// The offset specified in the request's `state` query parameter is not useful when only querying for
// the status of the current blob upload and hence does not need be validated against.
if r.Method != http.MethodGet && r.Method != http.MethodHead {
if size := upload.Size(); size != buh.State.Offset {
defer upload.Close()
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"upload_size": size,
"state_offset": buh.State.Offset,
}).Error("upload resumed at wrong offset")
return http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
buh.Errors = append(buh.Errors, v2.ErrorCodeResumableBlobUploadInvalid.WithDetail(err))
})
}
}
return nil
}
// blobUploadResponse provides a standard request for uploading blobs and
// chunk responses. This sets the correct headers but the response status is
// left to the caller.
func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter) error {
buh.State.Name = buh.Repository.Named().Name()
buh.State.UUID = buh.Upload.ID()
_ = buh.Upload.Close()
buh.State.Offset = buh.Upload.Size()
buh.State.StartedAt = buh.Upload.StartedAt()
token, err := hmacKey(buh.Config.HTTP.Secret).packUploadState(buh.State)
if err != nil {
log.GetLogger(log.WithContext(buh)).WithError(err).Info("error building upload state token")
return err
}
uploadURL, err := buh.urlBuilder.BuildBlobUploadChunkURL(
buh.Repository.Named(), buh.Upload.ID(),
url.Values{
"_state": []string{token},
})
if err != nil {
log.GetLogger(log.WithContext(buh)).WithError(err).Info("error building upload url")
return err
}
endRange := buh.Upload.Size()
if endRange > 0 {
endRange--
}
w.Header().Set("Docker-Upload-UUID", buh.UUID)
w.Header().Set("Location", uploadURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Range", fmt.Sprintf("0-%d", endRange))
return nil
}
// mountBlob attempts to mount a blob from another repository by its digest. If
// successful, the blob is linked into the blob store and 201 Created is
// returned with the canonical url of the blob.
func (buh *blobUploadHandler) createBlobMountOption(fromRepo, mountDigest string, rStore datastore.RepositoryStore) (distribution.BlobCreateOption, error) {
dgst, err := digest.Parse(mountDigest)
if err != nil {
return nil, err
}
ref, err := reference.WithName(fromRepo)
if err != nil {
return nil, err
}
canonical, err := reference.WithDigest(ref, dgst)
if err != nil {
return nil, err
}
if !buh.useDatabase {
return storage.WithMountFrom(canonical), nil
}
// Check for blob access on the database and pass that information via the
// BlobCreateOption.
b, err := dbFindRepositoryBlob(buh, rStore, distribution.Descriptor{Digest: dgst}, ref.Name())
if err != nil {
return nil, err
}
return storage.WithMountFromStat(canonical, &distribution.Descriptor{Digest: b.Digest, Size: b.Size, MediaType: b.MediaType}), nil
}
// writeBlobCreatedHeaders writes the standard headers describing a newly
// created blob. A 201 Created is written as well as the canonical URL and
// blob digest.
func (buh *blobUploadHandler) writeBlobCreatedHeaders(w http.ResponseWriter, desc distribution.Descriptor) error {
ref, err := reference.WithDigest(buh.Repository.Named(), desc.Digest)
if err != nil {
return err
}
blobURL, err := buh.urlBuilder.BuildBlobURL(ref)
if err != nil {
return err
}
w.Header().Set("Location", blobURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
w.WriteHeader(http.StatusCreated)
return nil
}