lib/dockerregistry/uploads.go (209 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dockerregistry
import (
"bytes"
"errors"
"fmt"
"io"
stdpath "path"
"time"
"github.com/uber/kraken/lib/dockerregistry/transfer"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/store/metadata"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
type uploads interface {
reader(path string, subtype PathSubType, offset int64) (io.ReadCloser, error)
getContent(path string, subtype PathSubType) ([]byte, error)
putContent(path string, subtype PathSubType, content []byte) error
putBlobContent(path string, content []byte) error
writer(path string, subtype PathSubType) (store.FileReadWriter, error)
stat(path string) (storagedriver.FileInfo, error)
list(path string, subtype PathSubType) ([]string, error)
move(uploadPath, blobPath string) error
}
type casUploads struct {
cas *store.CAStore
transferer transfer.ImageTransferer
}
func newCASUploads(cas *store.CAStore, transferer transfer.ImageTransferer) *casUploads {
return &casUploads{cas, transferer}
}
func (u *casUploads) getContent(path string, subtype PathSubType) ([]byte, error) {
uuid, err := GetUploadUUID(path)
if err != nil {
return nil, err
}
switch subtype {
case _startedat:
var s startedAtMetadata
if err := u.cas.GetUploadFileMetadata(uuid, &s); err != nil {
return nil, err
}
return s.Serialize()
case _hashstates:
algo, offset, err := GetUploadAlgoAndOffset(path)
if err != nil {
return nil, err
}
hs := newHashStateMetadata(algo, offset)
if err := u.cas.GetUploadFileMetadata(uuid, hs); err != nil {
return nil, err
}
return hs.Serialize()
}
return nil, InvalidRequestError{path}
}
func (u *casUploads) reader(path string, subtype PathSubType, offset int64) (io.ReadCloser, error) {
switch subtype {
case _data:
uuid, err := GetUploadUUID(path)
if err != nil {
return nil, fmt.Errorf("get upload uuid: %s", err)
}
r, err := u.cas.GetUploadFileReader(uuid)
if err != nil {
return nil, fmt.Errorf("get reader: %w", err)
}
if _, err := r.Seek(offset, io.SeekStart); err != nil {
return nil, fmt.Errorf("seek: %w", err)
}
return r, nil
}
return nil, InvalidRequestError{path}
}
func (u *casUploads) putContent(path string, subtype PathSubType, content []byte) error {
uuid, err := GetUploadUUID(path)
if err != nil {
return err
}
switch subtype {
case _startedat:
if err := u.cas.CreateUploadFile(uuid, 0); err != nil {
return fmt.Errorf("create upload file: %w", err)
}
s := newStartedAtMetadata(time.Now())
if err := u.cas.SetUploadFileMetadata(uuid, s); err != nil {
return fmt.Errorf("set started at: %w", err)
}
return nil
case _hashstates:
algo, offset, err := GetUploadAlgoAndOffset(path)
if err != nil {
return err
}
hs := newHashStateMetadata(algo, offset)
if err := hs.Deserialize(content); err != nil {
return fmt.Errorf("deserialize hash state: %s", err)
}
return u.cas.SetUploadFileMetadata(uuid, hs)
}
return InvalidRequestError{path}
}
func (u *casUploads) putBlobContent(path string, content []byte) error {
d, err := GetBlobDigest(path)
if err != nil {
return fmt.Errorf("get digest: %s", err)
}
if err := u.cas.CreateCacheFile(d.Hex(), bytes.NewReader(content)); err != nil {
return fmt.Errorf("create cache file: %w", err)
}
if err := u.transferer.Upload("TODO", d, store.NewBufferFileReader(content)); err != nil {
return fmt.Errorf("upload: %w", err)
}
return nil
}
func (u *casUploads) writer(path string, subtype PathSubType) (store.FileReadWriter, error) {
uuid, err := GetUploadUUID(path)
if err != nil {
return nil, err
}
switch subtype {
case _data:
return u.cas.GetUploadFileReadWriter(uuid)
}
return nil, InvalidRequestError{path}
}
func (u *casUploads) stat(path string) (storagedriver.FileInfo, error) {
uuid, err := GetUploadUUID(path)
if err != nil {
return nil, err
}
info, err := u.cas.GetUploadFileStat(uuid)
if err != nil {
return nil, err
}
// Hacking the path, since kraken storage driver is also the consumer of this info.
// Instead of the relative path from root that docker registry expected, just use uuid.
return storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
Path: uuid,
Size: info.Size(),
ModTime: info.ModTime(),
IsDir: info.IsDir(),
},
}, nil
}
func (u *casUploads) list(path string, subtype PathSubType) ([]string, error) {
uuid, err := GetUploadUUID(path)
if err != nil {
return nil, err
}
switch subtype {
case _hashstates:
var paths []string
u.cas.RangeUploadMetadata(uuid, func(md metadata.Metadata) error {
if hs, ok := md.(*hashStateMetadata); ok {
p := stdpath.Join("localstore", "_uploads", uuid, hs.dockerPath())
paths = append(paths, p)
}
return nil
})
return paths, nil
}
return nil, InvalidRequestError{path}
}
func (u *casUploads) move(uploadPath, blobPath string) error {
uuid, err := GetUploadUUID(uploadPath)
if err != nil {
return fmt.Errorf("get upload uuid: %s", err)
}
d, err := GetBlobDigest(blobPath)
if err != nil {
return fmt.Errorf("get blob uuid: %s", err)
}
if err := u.cas.MoveUploadFileToCache(uuid, d.Hex()); err != nil {
return fmt.Errorf("move upload file to cache: %w", err)
}
f, err := u.cas.GetCacheFileReader(d.Hex())
if err != nil {
return fmt.Errorf("get cache file: %w", err)
}
if err := u.transferer.Upload("TODO", d, f); err != nil {
return fmt.Errorf("upload: %w", err)
}
return nil
}
var errUploadsDisabled = errors.New("uploads are disabled")
type disabledUploads struct{}
func (u disabledUploads) reader(path string, subtype PathSubType, offset int64) (io.ReadCloser, error) {
return nil, errUploadsDisabled
}
func (u disabledUploads) getContent(path string, subtype PathSubType) ([]byte, error) {
return nil, errUploadsDisabled
}
func (u disabledUploads) putContent(path string, subtype PathSubType, content []byte) error {
return errUploadsDisabled
}
func (u disabledUploads) putBlobContent(path string, content []byte) error {
return errUploadsDisabled
}
func (u disabledUploads) writer(path string, subtype PathSubType) (store.FileReadWriter, error) {
return nil, errUploadsDisabled
}
func (u disabledUploads) stat(path string) (storagedriver.FileInfo, error) {
return nil, errUploadsDisabled
}
func (u disabledUploads) list(path string, subtype PathSubType) ([]string, error) {
return nil, errUploadsDisabled
}
func (u disabledUploads) move(uploadPath, blobPath string) error {
return errUploadsDisabled
}