lib/dockerregistry/storage_driver.go (261 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dockerregistry
import (
"context"
"errors"
"fmt"
"io"
"os"
"github.com/uber/kraken/lib/dockerregistry/transfer"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/utils/log"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/uber-go/tally"
)
// The path layout in the storage backend is roughly as follows:
//
// <root>/v2
// -> repositories/
// -><name>/
// -> _manifests/
// revisions
// -> <manifest digest path>
// -> link
// tags/<tag>
// -> current/link
// -> index
// -> <algorithm>/<hex digest>/link
// -> _layers/
// <layer links to blob store>
// -> _uploads/<id>
// data
// startedat
// hashstates/<algorithm>/<offset>
// -> blobs/<algorithm>
// <split directory content addressable storage>
// Name of storage driver.
const Name = "kraken"
func init() {
factory.Register(Name, &krakenStorageDriverFactory{})
}
// InvalidRequestError implements error and contains the path that is not supported
type InvalidRequestError struct {
path string
}
func (e InvalidRequestError) Error() string {
return fmt.Sprintf("invalid request: %s", e.path)
}
func toDriverError(err error, path string) error {
if errors.Is(err, os.ErrNotExist) ||
errors.Is(err, transfer.ErrBlobNotFound) ||
errors.Is(err, transfer.ErrTagNotFound) {
return driver.PathNotFoundError{
DriverName: Name,
Path: path,
}
}
return err
}
type krakenStorageDriverFactory struct{}
func getParam(params map[string]interface{}, name string) interface{} {
p, ok := params[name]
if !ok || p == nil {
log.Fatalf("Required parameter %s not found", name)
}
return p
}
func (factory *krakenStorageDriverFactory) Create(
params map[string]interface{}) (driver.StorageDriver, error) {
// Common parameters.
constructor := getParam(params, "constructor").(string)
config := getParam(params, "config").(Config)
transferer := getParam(params, "transferer").(transfer.ImageTransferer)
metrics := getParam(params, "metrics").(tally.Scope)
switch constructor {
case _rw:
castore := getParam(params, "castore").(*store.CAStore)
return NewReadWriteStorageDriver(config, castore, transferer, metrics), nil
case _ro:
blobstore := getParam(params, "blobstore").(BlobStore)
return NewReadOnlyStorageDriver(config, blobstore, transferer, metrics), nil
default:
return nil, fmt.Errorf("unknown constructor %s", constructor)
}
}
// KrakenStorageDriver is a storage driver
type KrakenStorageDriver struct {
config Config
transferer transfer.ImageTransferer
blobs *blobs
uploads uploads
manifests *manifests
metrics tally.Scope
}
// NewReadWriteStorageDriver creates a KrakenStorageDriver which can push / pull blobs.
func NewReadWriteStorageDriver(
config Config,
cas *store.CAStore,
transferer transfer.ImageTransferer,
metrics tally.Scope) *KrakenStorageDriver {
return &KrakenStorageDriver{
config: config,
transferer: transferer,
blobs: newBlobs(cas, transferer),
uploads: newCASUploads(cas, transferer),
manifests: newManifests(transferer),
metrics: metrics,
}
}
// NewReadOnlyStorageDriver creates a KrakenStorageDriver which can only pull blobs.
func NewReadOnlyStorageDriver(
config Config,
bs BlobStore,
transferer transfer.ImageTransferer,
metrics tally.Scope) *KrakenStorageDriver {
return &KrakenStorageDriver{
config: config,
transferer: transferer,
blobs: newBlobs(bs, transferer),
uploads: disabledUploads{},
manifests: newManifests(transferer),
metrics: metrics,
}
}
// Name returns driver namae
func (d *KrakenStorageDriver) Name() string {
return Name
}
// GetContent returns content in the path
// sample path: /docker/registry/v2/repositories/external/ubuntu/_layers/sha256/a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4/link
func (d *KrakenStorageDriver) GetContent(ctx context.Context, path string) ([]byte, error) {
log.Debugf("(*KrakenStorageDriver).GetContent %s", path)
pathType, pathSubType, err := ParsePath(path)
if err != nil {
return nil, err
}
var data []byte
switch pathType {
case _manifests:
data, err = d.manifests.getDigest(path, pathSubType)
case _uploads:
data, err = d.uploads.getContent(path, pathSubType)
case _layers:
data, err = d.blobs.getDigest(path)
case _blobs:
data, err = d.blobs.getContent(ctx, path)
default:
return nil, InvalidRequestError{path}
}
if err != nil {
return nil, toDriverError(err, path)
}
return data, nil
}
// Reader returns a reader of path at offset
func (d *KrakenStorageDriver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
log.Debugf("(*KrakenStorageDriver).Reader %s", path)
pathType, pathSubType, err := ParsePath(path)
if err != nil {
return nil, err
}
var reader io.ReadCloser
switch pathType {
case _uploads:
reader, err = d.uploads.reader(path, pathSubType, offset)
case _blobs:
reader, err = d.blobs.reader(ctx, path, offset)
default:
return nil, InvalidRequestError{path}
}
if err != nil {
return nil, toDriverError(err, path)
}
return reader, nil
}
// PutContent writes content to path
func (d *KrakenStorageDriver) PutContent(ctx context.Context, path string, content []byte) error {
log.Debugf("(*KrakenStorageDriver).PutContent %s", path)
pathType, pathSubType, err := ParsePath(path)
if err != nil {
return err
}
switch pathType {
case _manifests:
err = d.manifests.putContent(path, pathSubType)
case _uploads:
err = d.uploads.putContent(path, pathSubType, content)
case _layers:
// noop
return nil
case _blobs:
err = d.uploads.putBlobContent(path, content)
default:
return InvalidRequestError{path}
}
if err != nil {
return toDriverError(err, path)
}
return nil
}
// Writer returns a writer of path
func (d *KrakenStorageDriver) Writer(ctx context.Context, path string, append bool) (driver.FileWriter, error) {
log.Debugf("(*KrakenStorageDriver).Writer %s", path)
pathType, pathSubType, err := ParsePath(path)
if err != nil {
return nil, err
}
switch pathType {
case _uploads:
w, err := d.uploads.writer(path, pathSubType)
if err != nil {
return nil, toDriverError(err, path)
}
if append {
if _, err := w.Seek(0, io.SeekEnd); err != nil {
return nil, err
}
}
return w, nil
default:
return nil, InvalidRequestError{path}
}
}
// Stat returns fileinfo of path
func (d *KrakenStorageDriver) Stat(ctx context.Context, path string) (driver.FileInfo, error) {
log.Debugf("(*KrakenStorageDriver).Stat %s", path)
pathType, _, err := ParsePath(path)
if err != nil {
return nil, err
}
var info driver.FileInfo
switch pathType {
case _uploads:
info, err = d.uploads.stat(path)
case _blobs:
info, err = d.blobs.stat(ctx, path)
case _manifests:
info, err = d.manifests.stat(path)
default:
return nil, InvalidRequestError{path}
}
if err != nil {
return nil, toDriverError(err, path)
}
return info, nil
}
// List returns a list of content given path
func (d *KrakenStorageDriver) List(ctx context.Context, path string) ([]string, error) {
log.Debugf("(*KrakenStorageDriver).List %s", path)
pathType, pathSubType, err := ParsePath(path)
if err != nil {
return nil, err
}
var l []string
switch pathType {
case _uploads:
l, err = d.uploads.list(path, pathSubType)
case _manifests:
l, err = d.manifests.list(path)
default:
return nil, InvalidRequestError{path}
}
if err != nil {
return nil, toDriverError(err, path)
}
return l, nil
}
// Move moves sourcePath to destPath
func (d *KrakenStorageDriver) Move(ctx context.Context, sourcePath string, destPath string) error {
log.Debugf("(*KrakenStorageDriver).Move %s %s", sourcePath, destPath)
pathType, _, err := ParsePath(sourcePath)
if err != nil {
return err
}
switch pathType {
case _uploads:
err = d.uploads.move(sourcePath, destPath)
default:
return InvalidRequestError{sourcePath + " to " + destPath}
}
if err != nil {
return toDriverError(err, sourcePath)
}
return nil
}
// Delete deletes path
func (d *KrakenStorageDriver) Delete(ctx context.Context, path string) error {
log.Debugf("(*KrakenStorageDriver).Delete %s", path)
return driver.PathNotFoundError{
DriverName: Name,
Path: path,
}
}
// URLFor returns url for path
func (d *KrakenStorageDriver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
log.Debugf("(*KrakenStorageDriver).URLFor %s", path)
return "", fmt.Errorf("Not implemented")
}
// Walk is not implemented.
func (d *KrakenStorageDriver) Walk(ctx context.Context, path string, f driver.WalkFn) error {
return errors.New("walk not implemented")
}