registry/datastore/repositorylease.go (153 lines of code) (raw):
package datastore
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/docker/distribution/log"
"github.com/docker/distribution/registry/datastore/models"
iredis "github.com/docker/distribution/registry/internal/redis"
"github.com/opencontainers/go-digest"
"github.com/redis/go-redis/v9"
)
var (
errLeaseNotFound = errors.New("repository lease not found")
errLeaseUpsertIsEmpty = errors.New("repository lease to be upserted can not be empty")
)
const (
// renameLeaseTTL defines how long a rename lease should last in Redis
renameLeaseTTL = 60 * time.Second
)
// enforces repositoryLeaseStore implements RepositoryLeaseStore
var _ RepositoryLeaseStore = &repositoryLeaseStore{}
// RepositoryLeaseStoreOption allows customizing a repositoryLeaseStore with additional options.
type RepositoryLeaseStoreOption func(*repositoryLeaseStore)
// WithRepositoryLeaseCache instantiates the repositoryStore with a cache for lease management
func WithRepositoryLeaseCache(cache RepositoryLeaseCache) RepositoryLeaseStoreOption {
return func(rlstore *repositoryLeaseStore) {
rlstore.cache = cache
}
}
// RepositoryLeaseStore is the interface that a repository store should conform to.
type RepositoryLeaseStore interface {
// FindRenameByPath searches the underlying store for the rename lease.
FindRenameByPath(ctx context.Context, path string) (*models.RepositoryLease, error)
// GetTTL returns the amount of time left till a lease expires.
GetTTL(ctx context.Context, lease *models.RepositoryLease) (time.Duration, error)
// UpsertRename creates a new repository lease (or updates an existing one) with ttl = `renameLeaseTTL`.
UpsertRename(ctx context.Context, r *models.RepositoryLease) (*models.RepositoryLease, error)
// Destroy removes a repository lease from the cache.
Destroy(ctx context.Context, r *models.RepositoryLease) error
}
// repositoryLeaseStore is the concrete implementation of a RepositoryLeaseStore.
type repositoryLeaseStore struct {
// this struct can be extended to have a db field
// which can be used as a drop in replacement for the cache
// or to work in tandem with the cache
cache RepositoryLeaseCache
}
// NewRepositoryLeaseStore builds a new repositoryLeaseStore.
func NewRepositoryLeaseStore(opts ...RepositoryLeaseStoreOption) RepositoryLeaseStore {
rlStore := &repositoryLeaseStore{cache: &noOpRepositoryLeaseCache{}}
for _, o := range opts {
o(rlStore)
}
return rlStore
}
// RepositoryLeaseCache is a cache for *models.RepositoryLease objects.
type RepositoryLeaseCache interface {
Get(ctx context.Context, path string, leaseType models.LeaseType) (*models.RepositoryLease, error)
Set(ctx context.Context, lease *models.RepositoryLease, ttl time.Duration) error
Invalidate(ctx context.Context, path string) error
TTL(ctx context.Context, lease *models.RepositoryLease) (time.Duration, error)
}
// noOpRepositoryLeaseCache satisfies the RepositoryLeaseCache, but does not do anything.
// Useful as a default and for testing.
type noOpRepositoryLeaseCache struct{}
// NewNoOpRepositoryLeaseCache creates a new non-operational cache for a repository lease object.
// This implementation does nothing and returns nothing for all its methods.
func NewNoOpRepositoryLeaseCache() RepositoryLeaseCache {
return &noOpRepositoryLeaseCache{}
}
func (*noOpRepositoryLeaseCache) Get(_ context.Context, _ string, _ models.LeaseType) (*models.RepositoryLease, error) {
return nil, nil
}
func (*noOpRepositoryLeaseCache) Set(_ context.Context, _ *models.RepositoryLease, _ time.Duration) error {
return nil
}
func (*noOpRepositoryLeaseCache) TTL(_ context.Context, _ *models.RepositoryLease) (time.Duration, error) {
return 0, nil
}
func (*noOpRepositoryLeaseCache) Invalidate(_ context.Context, _ string) error {
return nil
}
// centralRepositoryLeaseCache is the interface for the centralized repository lease cache backed by Redis.
type centralRepositoryLeaseCache struct {
cache *iredis.Cache
}
// NewCentralRepositoryLeaseCache creates an interface for the centralized repository object cache backed by Redis.
func NewCentralRepositoryLeaseCache(cache *iredis.Cache) RepositoryLeaseCache {
return ¢ralRepositoryLeaseCache{cache}
}
// key generates a valid Redis key string for a given repository lease object. The used key format is described in
// https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs/redis-dev-guidelines.md#key-format.
func (*centralRepositoryLeaseCache) key(path string) string {
nsPrefix := strings.Split(path, "/")[0]
hex := digest.FromString(path).Hex()
return fmt.Sprintf("registry:api:{repository-lease:%s:%s}", nsPrefix, hex)
}
// Get a repository lease from the cache.
func (c *centralRepositoryLeaseCache) Get(ctx context.Context, path string, leaseType models.LeaseType) (*models.RepositoryLease, error) {
l := log.GetLogger(log.WithContext(ctx))
getCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
var lease models.RepositoryLease
if err := c.cache.UnmarshalGet(getCtx, c.key(path), &lease); err != nil {
l.WithError(err).Warn("repository lease cache: failed to read lease from cache")
// redis.Nil is returned when the key is not found in Redis
if errors.Is(err, redis.Nil) {
return nil, nil
}
return nil, err
}
if lease.Type != leaseType {
l.Warn("failed to find the repository lease matching the lease type")
return nil, nil
}
return &lease, nil
}
// Set a repository lease in the cache.
func (c *centralRepositoryLeaseCache) Set(ctx context.Context, lease *models.RepositoryLease, ttl time.Duration) error {
setCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
if err := c.cache.MarshalSet(setCtx, c.key(lease.Path), lease, iredis.WithTTL(ttl)); err != nil {
return fmt.Errorf("failed to write repository lease to cache: %w", err)
}
return nil
}
// TTL gets the object's TTL from the cache.
func (c *centralRepositoryLeaseCache) TTL(ctx context.Context, lease *models.RepositoryLease) (time.Duration, error) {
l := log.GetLogger(log.WithContext(ctx))
// find any existing ttl for the lease path
var cachedLease models.RepositoryLease
ttl, err := c.cache.UnmarshalGetWithTTL(ctx, c.key(lease.Path), &cachedLease)
if err != nil {
if errors.Is(err, redis.Nil) {
return 0, errLeaseNotFound
}
return ttl, fmt.Errorf("failed to read lease TTL from cache: %w", err)
}
// verify the lease was granted to the same repository requesting the TTL
if cachedLease.GrantedTo != lease.GrantedTo {
l.Warn("the lease retrieved has a different grantor from the one requested")
return 0, nil
}
return ttl, nil
}
// Invalidate the lease for a given path in the cache.
func (c *centralRepositoryLeaseCache) Invalidate(ctx context.Context, path string) error {
invalCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
if err := c.cache.Delete(invalCtx, c.key(path)); err != nil {
detail := "failed to invalidate repository lease in cache for leased path: " + path
log.GetLogger(log.WithContext(ctx)).WithError(err).Warn(detail)
return fmt.Errorf("failed to invalidate repository lease: %w", err)
}
return nil
}
// FindRenameByPath returns a rename lease object if it exists.
func (s *repositoryLeaseStore) FindRenameByPath(ctx context.Context, path string) (*models.RepositoryLease, error) {
rl, err := s.cache.Get(ctx, path, models.RenameLease)
if err != nil {
return nil, fmt.Errorf("finding rename lease: %w", err)
}
return rl, nil
}
// UpsertRename creates (or updates the ttl of) a rename lease object with ttl = `renameLeaseTTL`
func (s *repositoryLeaseStore) UpsertRename(ctx context.Context, rl *models.RepositoryLease) (*models.RepositoryLease, error) {
if rl == nil {
return nil, errLeaseUpsertIsEmpty
}
rl.Type = models.RenameLease
err := s.cache.Set(ctx, rl, renameLeaseTTL)
if err != nil {
log.GetLogger(log.WithContext(ctx)).Warn("upserting rename lease failed")
return nil, fmt.Errorf("upserting rename lease: %w", err)
}
return rl, nil
}
// GetTTL returns the TTL of an existing lease
func (s *repositoryLeaseStore) GetTTL(ctx context.Context, lease *models.RepositoryLease) (time.Duration, error) {
return s.cache.TTL(ctx, lease)
}
// Destroy invalidates a lease object
func (s *repositoryLeaseStore) Destroy(ctx context.Context, lease *models.RepositoryLease) error {
return s.cache.Invalidate(ctx, lease.Path)
}