registry/storage/registry.go (310 lines of code) (raw):
package storage
import (
"context"
"fmt"
"regexp"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/datastore"
"github.com/docker/distribution/registry/storage/cache"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/validation"
"github.com/docker/libtrust"
)
// registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object.
type registry struct {
blobStore *blobStore
blobServer *blobServer
statter *blobStatter // global statter service.
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool
schema1Enabled bool
schema1PullsDisabled bool
resumableDigestEnabled bool
useDatabase bool
schema1SigningKey libtrust.PrivateKey
blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory
manifestURLs validation.ManifestURLs
manifestsRefLimit int
manifestsPayloadSizeLimit int
driver storagedriver.StorageDriver
db datastore.LoadBalancer
lockers *lockers
redirectExceptions []*regexp.Regexp
}
// RegistryOption is the type used for functional options for NewRegistry.
type RegistryOption func(*registry) error
// EnableRedirect is a functional option for NewRegistry. It causes the backend
// blob server to attempt using (StorageDriver).URLFor to serve all blobs.
func EnableRedirect(registry *registry) error {
registry.blobServer.redirect.enabled = true
return nil
}
// EnableRedirectWithExceptions enables redirections except for repositories
// whose paths match any of the exceptions.
func EnableRedirectWithExceptions(exceptions []string) RegistryOption {
return func(registry *registry) error {
registry.blobServer.redirect.enabled = true
for _, e := range exceptions {
r, err := regexp.Compile(e)
if err != nil {
return fmt.Errorf("configuring storage redirect exception: %v", err)
}
registry.redirectExceptions = append(registry.redirectExceptions, r)
}
return nil
}
}
// WithRedirectExpiryDelay sets a custom expiry delay for presigned URLs used for redirecting clients to supported
// storage backends for blob downloads.
func WithRedirectExpiryDelay(d time.Duration) RegistryOption {
return func(registry *registry) error {
registry.blobServer.redirect.expiryDelay = d
return nil
}
}
// EnableDelete is a functional option for NewRegistry. It enables deletion on
// the registry.
func EnableDelete(registry *registry) error {
registry.deleteEnabled = true
return nil
}
// EnableSchema1 is a functional option for NewRegistry. It enables pushing of
// schema1 manifests.
func EnableSchema1(registry *registry) error {
registry.schema1Enabled = true
return nil
}
// DisableSchema1Pulls is a functional option for NewRegistry. It disables
// pulling of schema1 manifests.
func DisableSchema1Pulls(registry *registry) error {
registry.schema1PullsDisabled = true
return nil
}
// ManifestURLsAllowRegexp is a functional option for NewRegistry.
func ManifestURLsAllowRegexp(r *regexp.Regexp) RegistryOption {
return func(registry *registry) error {
registry.manifestURLs.Allow = r
return nil
}
}
// ManifestURLsDenyRegexp is a functional option for NewRegistry.
func ManifestURLsDenyRegexp(r *regexp.Regexp) RegistryOption {
return func(registry *registry) error {
registry.manifestURLs.Deny = r
return nil
}
}
// ManifestReferenceLimit is a functional option for NewRegistry.
func ManifestReferenceLimit(n int) RegistryOption {
return func(registry *registry) error {
registry.manifestsRefLimit = n
return nil
}
}
// ManifestPayloadSizeLimit is a functional option for NewRegistry. It sets the
// maximum payload size of a manifest or manifest list.
func ManifestPayloadSizeLimit(n int) RegistryOption {
return func(registry *registry) error {
registry.manifestsPayloadSizeLimit = n
return nil
}
}
// Schema1SigningKey returns a functional option for NewRegistry. It sets the
// key for signing all schema1 manifests.
func Schema1SigningKey(key libtrust.PrivateKey) RegistryOption {
return func(registry *registry) error {
registry.schema1SigningKey = key
return nil
}
}
// BlobDescriptorServiceFactory returns a functional option for NewRegistry. It sets the
// factory to create BlobDescriptorServiceFactory middleware.
func BlobDescriptorServiceFactory(factory distribution.BlobDescriptorServiceFactory) RegistryOption {
return func(registry *registry) error {
registry.blobDescriptorServiceFactory = factory
return nil
}
}
// BlobDescriptorCacheProvider returns a functional option for
// NewRegistry. It creates a cached blob statter for use by the
// registry.
func BlobDescriptorCacheProvider(blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider) RegistryOption {
return func(registry *registry) error {
if blobDescriptorCacheProvider != nil {
statter := cache.NewCachedBlobStatter(blobDescriptorCacheProvider, registry.statter)
registry.blobStore.statter = statter
registry.blobServer.statter = statter
registry.blobDescriptorCacheProvider = blobDescriptorCacheProvider
}
return nil
}
}
// Database configures the registry to use the passed database.
func Database(db datastore.LoadBalancer) RegistryOption {
return func(registry *registry) error {
registry.db = db
return nil
}
}
// UseDatabase sets the flag to true so that we can take different
// code paths based on this value.
func UseDatabase(registry *registry) error {
registry.useDatabase = true
return nil
}
// NewRegistry creates a new registry instance from the provided driver. The
// resulting registry may be shared by multiple goroutines but is cheap to
// allocate. If the Redirect option is specified, the backend blob server will
// attempt to use (StorageDriver).URLFor to serve all blobs.
func NewRegistry(_ context.Context, driver storagedriver.StorageDriver, options ...RegistryOption) (distribution.Namespace, error) {
// create global statter
statter := &blobStatter{
driver: driver,
}
bs := &blobStore{
driver: driver,
statter: statter,
}
registry := ®istry{
blobStore: bs,
blobServer: &blobServer{
driver: driver,
statter: statter,
pathFn: bs.path,
},
statter: statter,
resumableDigestEnabled: true,
driver: driver,
lockers: &lockers{
FS: &FilesystemInUseLocker{Driver: driver},
DB: &DatabaseInUseLocker{Driver: driver},
},
}
for _, option := range options {
if err := option(registry); err != nil {
return nil, err
}
}
return registry, nil
}
// Scope returns the namespace scope for a registry. The registry
// will only serve repositories contained within this scope.
func (*registry) Scope() distribution.Scope {
return distribution.GlobalScope
}
// Repository returns an instance of the repository tied to the registry.
// Instances should not be shared between goroutines but are cheap to
// allocate. In general, they should be request scoped.
func (reg *registry) Repository(ctx context.Context, canonicalName reference.Named) (distribution.Repository, error) {
var descriptorCache distribution.BlobDescriptorService
if reg.blobDescriptorCacheProvider != nil {
var err error
descriptorCache, err = reg.blobDescriptorCacheProvider.RepositoryScoped(canonicalName.Name())
if err != nil {
return nil, err
}
}
return &repository{
ctx: ctx,
registry: reg,
name: canonicalName,
descriptorCache: descriptorCache,
}, nil
}
func (reg *registry) Blobs() distribution.BlobEnumerator {
return reg.blobStore
}
func (reg *registry) BlobStatter() distribution.BlobStatter {
return reg.statter
}
// repository provides name-scoped access to various services.
type repository struct {
*registry
ctx context.Context
name reference.Named
descriptorCache distribution.BlobDescriptorService
}
// Name returns the name of the repository.
func (repo *repository) Named() reference.Named {
return repo.name
}
func (repo *repository) Tags(_ context.Context) distribution.TagService {
tags := &tagStore{
repository: repo,
blobStore: repo.registry.blobStore,
}
return tags
}
// Manifests returns an instance of ManifestService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
manifestDirectoryPathSpec := manifestRevisionsPathSpec{name: repo.name.Name()}
var statter distribution.BlobDescriptorService = &linkedBlobStatter{
blobStore: repo.blobStore,
repository: repo,
linkPath: manifestRevisionLinkPath,
}
if repo.registry.blobDescriptorServiceFactory != nil {
statter = repo.registry.blobDescriptorServiceFactory.BlobAccessController(statter)
}
blobStore := &linkedBlobStore{
ctx: ctx,
blobStore: repo.blobStore,
repository: repo,
deleteEnabled: repo.registry.deleteEnabled,
blobAccessController: statter,
// LinkPath limits this blob store to only manifests. This instance cannot
// be used for blob checks.
linkPath: manifestRevisionLinkPath,
linkDirectoryPathSpec: manifestDirectoryPathSpec,
}
var v1Handler ManifestHandler
if repo.schema1Enabled {
v1Handler = &signedManifestHandler{
ctx: ctx,
schema1SigningKey: repo.schema1SigningKey,
repository: repo,
blobStore: blobStore,
}
} else {
v1Handler = &v1UnsupportedHandler{
pullsDisabled: repo.schema1PullsDisabled,
innerHandler: &signedManifestHandler{
ctx: ctx,
schema1SigningKey: repo.schema1SigningKey,
repository: repo,
blobStore: blobStore,
},
}
}
ms := &manifestStore{
ctx: ctx,
repository: repo,
blobStore: blobStore,
schema1Handler: v1Handler,
schema2Handler: &schema2ManifestHandler{
ctx: ctx,
repository: repo,
blobStore: blobStore,
manifestURLs: repo.registry.manifestURLs,
manifestRefLimit: repo.registry.manifestsRefLimit,
manifestPayloadSizeLimit: repo.registry.manifestsPayloadSizeLimit,
},
manifestListHandler: &manifestListHandler{
ctx: ctx,
repository: repo,
blobStore: blobStore,
manifestRefLimit: repo.registry.manifestsRefLimit,
manifestPayloadSizeLimit: repo.registry.manifestsPayloadSizeLimit,
},
ocischemaHandler: &ocischemaManifestHandler{
ctx: ctx,
repository: repo,
blobStore: blobStore,
manifestURLs: repo.registry.manifestURLs,
manifestRefLimit: repo.registry.manifestsRefLimit,
manifestPayloadSizeLimit: repo.registry.manifestsPayloadSizeLimit,
},
}
// Apply options
for _, option := range options {
err := option.Apply(ms)
if err != nil {
return nil, err
}
}
return ms, nil
}
// Lockers returns the lockers for this registry.
func (reg *registry) Lockers() distribution.Lockers {
return reg.lockers
}
// Blobs returns an instance of the BlobStore. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
var statter distribution.BlobDescriptorService = &linkedBlobStatter{
blobStore: repo.blobStore,
repository: repo,
linkPath: blobLinkPath,
}
if repo.descriptorCache != nil {
statter = cache.NewCachedBlobStatter(repo.descriptorCache, statter)
}
if repo.registry.blobDescriptorServiceFactory != nil {
statter = repo.registry.blobDescriptorServiceFactory.BlobAccessController(statter)
}
// Shallow copy is fine, we only care about the value of blobServer.redirect
// for the purpose of conditionally disabling redirection for repositories.
blobServer := *repo.blobServer
if blobServer.redirect.enabled {
for _, exception := range repo.redirectExceptions {
if exception.MatchString(repo.name.Name()) {
blobServer.redirect.enabled = false
break
}
}
}
return &linkedBlobStore{
registry: repo.registry,
blobStore: repo.blobStore,
blobServer: &blobServer,
blobAccessController: statter,
repository: repo,
ctx: ctx,
// LinkPath limits this blob store to only layers. This instance cannot be
// used for manifest checks.
linkPath: blobLinkPath,
deleteEnabled: repo.registry.deleteEnabled,
resumableDigestEnabled: repo.resumableDigestEnabled,
useDatabase: repo.useDatabase,
}
}