registry/datastore/repository.go (1,849 lines of code) (raw):
//go:generate mockgen -package mocks -destination mocks/repository.go . RepositoryCache
package datastore
import (
"context"
"database/sql"
"errors"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/log"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/docker/distribution/registry/datastore/metrics"
"github.com/docker/distribution/registry/datastore/models"
iredis "github.com/docker/distribution/registry/internal/redis"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/redis/go-redis/v9"
"gitlab.com/gitlab-org/labkit/errortracking"
)
type SortOrder string
const (
// cacheOpTimeout defines the timeout applied to cache operations against Redis
cacheOpTimeout = 500 * time.Millisecond
// OrderDesc is the normalized string to be used for sorting results in descending order
OrderDesc SortOrder = "desc"
// OrderAsc is the normalized string to be used for sorting results in ascending order
OrderAsc SortOrder = "asc"
orderByName = "name"
lessThan = "<"
greaterThan = ">"
// sizeWithDescendantsKeyTTL is the TTL for the cached value of the "size with descendants" for a given repository.
sizeWithDescendantsKeyTTL = 5 * time.Minute
// lsnKeyTTL is the TTL for the cached primary database Log Sequence Number (LSN) for a given repository.
lsnKeyTTL = 1 * time.Hour
)
// FilterParams contains the specific filters used to get
// the request results from the repositoryStore.
type FilterParams struct {
SortOrder SortOrder
OrderBy string
Name string
ExactName string
BeforeEntry string
LastEntry string
PublishedAt string
MaxEntries int
IncludeReferrers bool
ReferrerTypes []string
}
// RepositoryReader is the interface that defines read operations for a repository store.
type RepositoryReader interface {
FindAll(ctx context.Context) (models.Repositories, error)
FindAllPaginated(ctx context.Context, filters FilterParams) (models.Repositories, error)
FindByPath(ctx context.Context, path string) (*models.Repository, error)
FindDescendantsOf(ctx context.Context, id int64) (models.Repositories, error)
FindAncestorsOf(ctx context.Context, id int64) (models.Repositories, error)
FindSiblingsOf(ctx context.Context, id int64) (models.Repositories, error)
Count(ctx context.Context) (int, error)
CountAfterPath(ctx context.Context, path string) (int, error)
CountPathSubRepositories(ctx context.Context, topLevelNamespaceID int64, path string) (int, error)
Manifests(ctx context.Context, r *models.Repository) (models.Manifests, error)
Tags(ctx context.Context, r *models.Repository) (models.Tags, error)
TagsPaginated(ctx context.Context, r *models.Repository, filters FilterParams) (models.Tags, error)
HasTagsAfterName(ctx context.Context, r *models.Repository, filters FilterParams) (bool, error)
HasTagsBeforeName(ctx context.Context, r *models.Repository, filters FilterParams) (bool, error)
ManifestTags(ctx context.Context, r *models.Repository, m *models.Manifest) (models.Tags, error)
FindManifestByDigest(ctx context.Context, r *models.Repository, d digest.Digest) (*models.Manifest, error)
FindManifestByTagName(ctx context.Context, r *models.Repository, tagName string) (*models.Manifest, error)
FindTagByName(ctx context.Context, r *models.Repository, name string) (*models.Tag, error)
Blobs(ctx context.Context, r *models.Repository) (models.Blobs, error)
FindBlob(ctx context.Context, r *models.Repository, d digest.Digest) (*models.Blob, error)
ExistsBlob(ctx context.Context, r *models.Repository, d digest.Digest) (bool, error)
Size(ctx context.Context, r *models.Repository) (RepositorySize, error)
SizeWithDescendants(ctx context.Context, r *models.Repository) (RepositorySize, error)
EstimatedSizeWithDescendants(ctx context.Context, r *models.Repository) (RepositorySize, error)
TagsDetailPaginated(ctx context.Context, r *models.Repository, filters FilterParams) ([]*models.TagDetail, error)
FindPaginatedRepositoriesForPath(ctx context.Context, r *models.Repository, filters FilterParams) (models.Repositories, error)
TagDetail(ctx context.Context, r *models.Repository, tagName string) (*models.TagDetail, error)
}
// RepositoryWriter is the interface that defines write operations for a repository store.
type RepositoryWriter interface {
Create(ctx context.Context, r *models.Repository) error
CreateByPath(ctx context.Context, path string, opts ...repositoryOption) (*models.Repository, error)
CreateOrFind(ctx context.Context, r *models.Repository) error
CreateOrFindByPath(ctx context.Context, path string, opts ...repositoryOption) (*models.Repository, error)
Update(ctx context.Context, r *models.Repository) error
LinkBlob(ctx context.Context, r *models.Repository, d digest.Digest) error
UnlinkBlob(ctx context.Context, r *models.Repository, d digest.Digest) (bool, error)
DeleteTagByName(ctx context.Context, r *models.Repository, name string) (bool, error)
DeleteManifest(ctx context.Context, r *models.Repository, d digest.Digest) (bool, error)
RenamePathForSubRepositories(ctx context.Context, topLevelNamespaceID int64, oldPath, newPath string) error
Rename(ctx context.Context, r *models.Repository, newPath, newName string) error
UpdateLastPublishedAt(ctx context.Context, r *models.Repository, t *models.Tag) error
}
type repositoryOption func(*models.Repository)
// RepositoryStoreOption allows customizing a repositoryStore with additional options.
type RepositoryStoreOption func(*repositoryStore)
// WithRepositoryCache instantiates the repositoryStore with a cache which will
// attempt to retrieve a *models.Repository from methods with return that type,
// rather than communicating with the database.
func WithRepositoryCache(cache RepositoryCache) RepositoryStoreOption {
return func(rstore *repositoryStore) {
rstore.cache = cache
}
}
// RepositoryStore is the interface that a repository store should conform to.
type RepositoryStore interface {
RepositoryReader
RepositoryWriter
}
// repositoryStore is the concrete implementation of a RepositoryStore.
type repositoryStore struct {
// db can be either a *sql.DB or *sql.Tx
db Queryer
cache RepositoryCache
}
// NewRepositoryStore builds a new repositoryStore.
func NewRepositoryStore(db Queryer, opts ...RepositoryStoreOption) RepositoryStore {
rStore := &repositoryStore{db: db, cache: &noOpRepositoryCache{}}
for _, o := range opts {
o(rStore)
}
return rStore
}
// RepositoryManifestService implements the validation.ManifestExister
// interface for repository-scoped manifests.
type RepositoryManifestService struct {
RepositoryReader
RepositoryPath string
}
// Exists returns true if the manifest is linked in the repository.
func (rms *RepositoryManifestService) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
r, err := rms.FindByPath(ctx, rms.RepositoryPath)
if err != nil {
return false, err
}
if r == nil {
return false, errors.New("unable to find repository in database")
}
m, err := rms.FindManifestByDigest(ctx, r, dgst)
if err != nil {
return false, err
}
return m != nil, nil
}
// RepositoryBlobService implements the distribution.BlobStatter interface for
// repository-scoped blobs.
type RepositoryBlobService struct {
RepositoryReader
RepositoryPath string
}
// Stat returns the descriptor of the blob with the provided digest, returns
// distribution.ErrBlobUnknown if not found.
func (rbs *RepositoryBlobService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
r, err := rbs.FindByPath(ctx, rbs.RepositoryPath)
if err != nil {
return distribution.Descriptor{}, err
}
if r == nil {
return distribution.Descriptor{}, errors.New("unable to find repository in database")
}
b, err := rbs.FindBlob(ctx, r, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
if b == nil {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
return distribution.Descriptor{Digest: b.Digest, Size: b.Size, MediaType: b.MediaType}, nil
}
// RepositoryCache is a cache for *models.Repository objects.
type RepositoryCache interface {
Get(ctx context.Context, path string) *models.Repository
Set(ctx context.Context, repo *models.Repository)
InvalidateSize(ctx context.Context, repo *models.Repository)
SizeWithDescendantsTimedOut(ctx context.Context, r *models.Repository)
HasSizeWithDescendantsTimedOut(ctx context.Context, r *models.Repository) bool
// SetSizeWithDescendants sets the computed "size with descendants" of a given repository with a TTL of
// sizeWithDescendantsKeyTTL.
SetSizeWithDescendants(ctx context.Context, r *models.Repository, size int64)
// GetSizeWithDescendants gets the computed "size with descendants" of a given repository. Returns whether the key
// was found and its value.
GetSizeWithDescendants(ctx context.Context, r *models.Repository) (bool, int64)
// SetLSN records the primary database Log Sequence Number (LSN) associated with a given repository.
// See https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs/spec/gitlab/database-load-balancing.md?ref_type=heads#primary-sticking
SetLSN(ctx context.Context, r *models.Repository, lsn string) error
// GetLSN gets the primary database Log Sequence Number (LSN) associated with a given repository.
// See https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs/spec/gitlab/database-load-balancing.md?ref_type=heads#primary-sticking
GetLSN(ctx context.Context, r *models.Repository) (string, error)
}
// noOpRepositoryCache satisfies the RepositoryCache, but does not cache anything.
// Useful as a default and for testing.
type noOpRepositoryCache struct{}
// NewNoOpRepositoryCache creates a new non-operational cache for a repository object.
// This implementation does nothing and returns nothing for all its methods.
func NewNoOpRepositoryCache() RepositoryCache {
return &noOpRepositoryCache{}
}
func (*noOpRepositoryCache) Get(context.Context, string) *models.Repository { return nil }
func (*noOpRepositoryCache) Set(context.Context, *models.Repository) {}
func (*noOpRepositoryCache) InvalidateSize(context.Context, *models.Repository) {}
func (*noOpRepositoryCache) SizeWithDescendantsTimedOut(context.Context, *models.Repository) {}
func (*noOpRepositoryCache) HasSizeWithDescendantsTimedOut(context.Context, *models.Repository) bool {
return false
}
func (*noOpRepositoryCache) SetSizeWithDescendants(context.Context, *models.Repository, int64) {}
func (*noOpRepositoryCache) GetSizeWithDescendants(context.Context, *models.Repository) (bool, int64) {
return false, 0
}
func (*noOpRepositoryCache) InvalidateRootSizeWithDescendants(context.Context, *models.Repository) {
}
func (*noOpRepositoryCache) SetLSN(context.Context, *models.Repository, string) error { return nil }
func (*noOpRepositoryCache) GetLSN(context.Context, *models.Repository) (string, error) {
return "", nil
}
// singleRepositoryCache caches a single repository in-memory. This implementation is not thread-safe. Deprecated in
// favor of centralRepositoryCache.
type singleRepositoryCache struct {
r *models.Repository
}
// NewSingleRepositoryCache creates a new local in-memory cache for a single repository object. This implementation is
// not thread-safe. Deprecated in favor of NewCentralRepositoryCache.
func NewSingleRepositoryCache() RepositoryCache {
return &singleRepositoryCache{}
}
func (c *singleRepositoryCache) Get(_ context.Context, path string) *models.Repository {
if c.r == nil || c.r.Path != path {
return nil
}
return c.r
}
func (c *singleRepositoryCache) Set(_ context.Context, r *models.Repository) {
if r != nil {
c.r = r
}
}
func (c *singleRepositoryCache) InvalidateSize(_ context.Context, r *models.Repository) {
if r != nil {
c.r.Size = nil
}
}
// SizeWithDescendantsTimedOut is a noop. We're phasing out the singleRepositoryCache cache implementation in favor of
// the centralRepositoryCache one, and the only place where we'll be making use of the related functionality (estimated
// size), the GitLab V1 API repositories handler, is explicitly making use of the latter.
func (*singleRepositoryCache) SizeWithDescendantsTimedOut(context.Context, *models.Repository) {}
// HasSizeWithDescendantsTimedOut is a noop. We're phasing out the singleRepositoryCache cache implementation in favor
// of the centralRepositoryCache one, and the only place where we'll be making use of the related functionality
// (estimated size), the GitLab V1 API repositories handler, is explicitly making use of the latter.
func (*singleRepositoryCache) HasSizeWithDescendantsTimedOut(context.Context, *models.Repository) bool {
return false
}
// SetSizeWithDescendants is a noop. We're phasing out the singleRepositoryCache cache implementation in favor of
// the centralRepositoryCache one, the only implementation where we'll be making use of the related functionality.
func (*singleRepositoryCache) SetSizeWithDescendants(context.Context, *models.Repository, int64) {}
// GetSizeWithDescendants is a noop. We're phasing out the singleRepositoryCache cache implementation in favor of
// the centralRepositoryCache one, the only implementation where we'll be making use of the related functionality.
func (*singleRepositoryCache) GetSizeWithDescendants(context.Context, *models.Repository) (bool, int64) {
return false, 0
}
// InvalidateRootSizeWithDescendants is a noop. We're phasing out the singleRepositoryCache cache implementation in favor
// of the centralRepositoryCache one, the only implementation where we'll be making use of the related functionality.
func (*singleRepositoryCache) InvalidateRootSizeWithDescendants(context.Context, *models.Repository) {
}
// SetLSN is a noop as this functionality depends on Redis.
func (*singleRepositoryCache) SetLSN(context.Context, *models.Repository, string) error { return nil }
// GetLSN is a noop as this functionality depends on Redis.
func (*singleRepositoryCache) GetLSN(context.Context, *models.Repository) (string, error) {
return "", nil
}
// centralRepositoryCache is the interface for the centralized repository object cache backed by Redis.
type centralRepositoryCache struct {
cache *iredis.Cache
}
// NewCentralRepositoryCache creates an interface for the centralized repository object cache backed by Redis.
func NewCentralRepositoryCache(cache *iredis.Cache) RepositoryCache {
return ¢ralRepositoryCache{cache}
}
// key generates a valid Redis key string for a given repository object. The used key format is described in
// https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs/redis-dev-guidelines.md#key-format.
func (*centralRepositoryCache) key(path string) string {
nsPrefix := strings.Split(path, "/")[0]
hex := digest.FromString(path).Hex()
return fmt.Sprintf("registry:db:{repository:%s:%s}", nsPrefix, hex)
}
// sizeWithDescendantsTimedOutKey generates a valid Redis key string for a flag used to indicate whether the last "size
// with descendants" query has timed out for a given repository object.
// This flag needs to be stored as a separate key instead of being embedded in the repository struct because we need it
// to expire after a specific TTL, independently of the repository object key TTL.
func (c *centralRepositoryCache) sizeWithDescendantsTimedOutKey(path string) string {
// "swd" stands for "size with descendants" for the sake of compactness (the name of these keys can get really long)
return fmt.Sprintf("%s:swd-timeout", c.key(path))
}
// Get implements RepositoryCache.
func (c *centralRepositoryCache) Get(ctx context.Context, path string) *models.Repository {
l := log.GetLogger(log.WithContext(ctx))
getCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
var repo models.Repository
if err := c.cache.UnmarshalGet(getCtx, c.key(path), &repo); err != nil {
// a wrapped redis.Nil is returned when the key is not found in Redis
if !errors.Is(err, redis.Nil) {
l.WithError(err).Error("failed to read repository from cache")
}
return nil
}
// Double check that the obtained and decoded repository object has the same path that we're looking for. This
// prevents leaking data from other repositories in case of a path hash collision.
if repo.Path != path {
l.WithFields(log.Fields{"path": path, "cached_path": repo.Path}).Warn("path hash collision detected when getting repository from cache")
return nil
}
return &repo
}
// Set implements RepositoryCache.
func (c *centralRepositoryCache) Set(ctx context.Context, r *models.Repository) {
if r == nil {
return
}
setCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
if err := c.cache.MarshalSet(setCtx, c.key(r.Path), r); err != nil {
log.GetLogger(log.WithContext(ctx)).WithError(err).Warn("failed to write repository to cache")
}
}
const sizeTimedOutKeyTTL = 24 * time.Hour
// SizeWithDescendantsTimedOut creates a key in Redis for repository r whenever the SizeWithDescendants query has timed
// out. This key has a TTL of sizeTimedOutKeyTTL to avoid consecutive failures.
func (c *centralRepositoryCache) SizeWithDescendantsTimedOut(ctx context.Context, r *models.Repository) {
if r == nil {
return
}
setCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
if err := c.cache.Set(setCtx, c.sizeWithDescendantsTimedOutKey(r.Path), "true", iredis.WithTTL(sizeTimedOutKeyTTL)); err != nil {
log.GetLogger(log.WithContext(ctx)).WithError(err).Warn("failed to create size with descendants timeout key in cache")
}
}
// HasSizeWithDescendantsTimedOut checks if a size with descendants timeout key exists in Redis for repository r. This
// is then used to avoid consecutive failures during the TTL of this key (sizeTimedOutKeyTTL).
func (c *centralRepositoryCache) HasSizeWithDescendantsTimedOut(ctx context.Context, r *models.Repository) bool {
setCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
if _, err := c.cache.Get(setCtx, c.sizeWithDescendantsTimedOutKey(r.Path)); err != nil {
// a wrapped redis.Nil is returned when the key is not found in Redis
if !errors.Is(err, redis.Nil) {
msg := "failed to read size with descendants timeout key from cache"
log.GetLogger(log.WithContext(ctx)).WithError(err).Error(msg)
errortracking.Capture(fmt.Errorf("%s: %w", msg, err), errortracking.WithContext(ctx), errortracking.WithStackTrace())
}
return false
}
// we don't care about the value of the key, only if it exists
return true
}
// InvalidateSize implements RepositoryCache.
func (c *centralRepositoryCache) InvalidateSize(ctx context.Context, r *models.Repository) {
inValCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
r.Size = nil
if err := c.cache.MarshalSet(inValCtx, c.key(r.Path), r); err != nil {
detail := "failed to invalidate repository size in cache for repo: " + r.Path
log.GetLogger(log.WithContext(ctx)).WithError(err).Warn(detail)
err := fmt.Errorf("%q: %q", detail, err)
errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace())
}
}
// sizeWithDescendantsKey generates a valid Redis key string for the cached result of the last "size with descendants"
// query for a given repository.
// This flag is stored as a separate key instead of being embedded in the repository struct because we need it to
// expire after a specific TTL, independently of the repository object key TTL.
func (c *centralRepositoryCache) sizeWithDescendantsKey(path string) string {
// "swd" stands for "size with descendants" for the sake of compactness
return fmt.Sprintf("%s:swd", c.key(path))
}
// SetSizeWithDescendants implements RepositoryCache.
func (c *centralRepositoryCache) SetSizeWithDescendants(ctx context.Context, r *models.Repository, size int64) {
if r == nil {
return
}
setCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
if err := c.cache.MarshalSet(setCtx, c.sizeWithDescendantsKey(r.Path), size, iredis.WithTTL(sizeWithDescendantsKeyTTL)); err != nil {
log.GetLogger(log.WithContext(ctx)).WithError(err).Warn("failed to create size with descendants key in cache")
}
}
// GetSizeWithDescendants implements RepositoryCache.
func (c *centralRepositoryCache) GetSizeWithDescendants(ctx context.Context, r *models.Repository) (bool, int64) {
getCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
var size int64
if err := c.cache.UnmarshalGet(getCtx, c.sizeWithDescendantsKey(r.Path), &size); err != nil {
// a wrapped redis.Nil is returned when the key is not found in Redis
if !errors.Is(err, redis.Nil) {
log.GetLogger(log.WithContext(ctx)).WithError(err).Error("failed to read size with descendants key from cache")
}
return false, 0
}
return true, size
}
// lsnKey generates a valid Redis key string for the cached primary database Log Sequence Number (LSN) for a
// given repository.
func (c *centralRepositoryCache) lsnKey(path string) string {
return fmt.Sprintf("%s:lsn", c.key(path))
}
// This Lua script atomically updates a PostgreSQL Log Sequence Number (LSN) stored in Redis. It compares the new LSN
// with the existing LSN (if any) in Redis and only updates if the new LSN is greater or new.
// See https://gitlab.com/gitlab-org/container-registry/-/blob/master/docs/spec/gitlab/database-load-balancing.md#primary-sticking
//
// Conversion process (based on how PostgreSQL represents LSNs using its XLogRecPtr type):
// 1. Split the LSN into major and minor parts using string manipulation;
// 2. Convert the major and minor parts from hexadecimal to decimal;
// 3. The final numeric representation is calculated as: (major_part * 2^32) + minor_part.
//
// Inputs:
// KEYS[1] - The Redis key where the LSN is stored.
// ARGV[1] - The new LSN to compare, in 'X/Y' format (hexadecimal).
// ARGV[2] - The TTL (in seconds) to set on the Redis key if the LSN is updated.
var lsnUpdateScript = redis.NewScript(`
local key, new_lsn, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2])
local current_lsn = redis.call('GET', key)
if not current_lsn then
return redis.call('SET', key, new_lsn, 'EX', ttl)
end
local function parse_lsn(lsn)
local slash_pos = string.find(lsn, '/')
local major_part = tonumber(string.sub(lsn, 1, slash_pos - 1), 16)
local minor_part = tonumber(string.sub(lsn, slash_pos + 1), 16)
return (major_part * 2^32) + minor_part
end
if parse_lsn(new_lsn) > parse_lsn(current_lsn) then
return redis.call('SET', key, new_lsn, 'EX', ttl)
end
return false
`)
// SetLSN implements RepositoryCache.
func (c *centralRepositoryCache) SetLSN(ctx context.Context, r *models.Repository, lsn string) error {
setCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
report := metrics.LSNCacheSet()
_, err := c.cache.RunScript(setCtx, lsnUpdateScript, []string{c.lsnKey(r.Path)}, []any{lsn, lsnKeyTTL.Seconds()})
// ignore a redis.Nil error, which is the result of returning false from the script (no update occurred)
if errors.Is(err, redis.Nil) {
err = nil
}
report(err)
return err
}
// GetLSN implements RepositoryCache.
func (c *centralRepositoryCache) GetLSN(ctx context.Context, r *models.Repository) (string, error) {
getCtx, cancel := context.WithTimeout(ctx, cacheOpTimeout)
defer cancel()
report := metrics.LSNCacheGet()
lsn, err := c.cache.Get(getCtx, c.lsnKey(r.Path))
report(err)
if err != nil {
// a wrapped redis.Nil is returned when the key is not found in Redis
if !errors.Is(err, redis.Nil) {
return "", fmt.Errorf("failed to read LSN key from cache: %w", err)
}
return "", nil
}
return lsn, nil
}
func scanFullRepository(row *Row) (*models.Repository, error) {
r := new(models.Repository)
if err := row.Scan(&r.ID, &r.NamespaceID, &r.Name, &r.Path, &r.ParentID, &r.CreatedAt, &r.UpdatedAt, &r.LastPublishedAt); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("scanning repository: %w", err)
}
return nil, nil
}
return r, nil
}
func scanFullRepositories(rows *sql.Rows) (models.Repositories, error) {
rr := make(models.Repositories, 0)
defer rows.Close()
for rows.Next() {
r := new(models.Repository)
if err := rows.Scan(&r.ID, &r.NamespaceID, &r.Name, &r.Path, &r.ParentID, &r.CreatedAt, &r.UpdatedAt); err != nil {
return nil, fmt.Errorf("scanning repository: %w", err)
}
rr = append(rr, r)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("scanning repositories: %w", err)
}
return rr, nil
}
// FindByPath finds a repository by path.
func (s *repositoryStore) FindByPath(ctx context.Context, path string) (*models.Repository, error) {
if cached := s.cache.Get(ctx, path); cached != nil {
return cached, nil
}
defer metrics.InstrumentQuery("repository_find_by_path")()
q := `SELECT
id,
top_level_namespace_id,
name,
path,
parent_id,
created_at,
updated_at,
last_published_at
FROM
repositories
WHERE
path = $1
AND deleted_at IS NULL` // temporary measure for the duration of https://gitlab.com/gitlab-org/container-registry/-/issues/570
row := s.db.QueryRowContext(ctx, q, path)
r, err := scanFullRepository(row)
if err != nil {
return r, err
}
s.cache.Set(ctx, r)
return r, nil
}
// FindAll finds all repositories.
func (s *repositoryStore) FindAll(ctx context.Context) (models.Repositories, error) {
defer metrics.InstrumentQuery("repository_find_all")()
q := `SELECT
id,
top_level_namespace_id,
name,
path,
parent_id,
created_at,
updated_at
FROM
repositories`
rows, err := s.db.QueryContext(ctx, q)
if err != nil {
return nil, fmt.Errorf("finding repositories: %w", err)
}
return scanFullRepositories(rows)
}
// FindAllPaginated finds up to `filters.MaxEntries` repositories with path lexicographically after `filters.LastEntry`. This is used exclusively
// for the GET /v2/_catalog API route, where pagination is done with a marker (`filters.LastEntry`). Empty repositories (which do
// not have at least a manifest) are ignored. Also, even if there is no repository with a path of `filters.LastEntry`, the returned
// repositories will always be those with a path lexicographically after `filters.LastEntry`. Finally, repositories are
// lexicographically sorted. These constraints exists to preserve the existing API behavior (when doing a filesystem
// walk based pagination).
func (s *repositoryStore) FindAllPaginated(ctx context.Context, filters FilterParams) (models.Repositories, error) {
defer metrics.InstrumentQuery("repository_find_all_paginated")()
q := `SELECT
r.id,
r.top_level_namespace_id,
r.name,
r.path,
r.parent_id,
r.created_at,
r.updated_at
FROM
repositories AS r
WHERE
EXISTS (
SELECT
FROM
manifests AS m
WHERE
m.top_level_namespace_id = r.top_level_namespace_id
AND m.repository_id = r.id)
AND r.path > $1
ORDER BY
r.path
LIMIT $2`
rows, err := s.db.QueryContext(ctx, q, filters.LastEntry, filters.MaxEntries)
if err != nil {
return nil, fmt.Errorf("finding repositories with pagination: %w", err)
}
return scanFullRepositories(rows)
}
// FindDescendantsOf finds all descendants of a given repository.
func (s *repositoryStore) FindDescendantsOf(ctx context.Context, id int64) (models.Repositories, error) {
defer metrics.InstrumentQuery("repository_find_descendants_of")()
q := `WITH RECURSIVE descendants AS (
SELECT
id,
top_level_namespace_id,
name,
path,
parent_id,
created_at,
updated_at
FROM
repositories
WHERE
id = $1
UNION ALL
SELECT
r.id,
r.top_level_namespace_id,
r.name,
r.path,
r.parent_id,
r.created_at,
r.updated_at
FROM
repositories AS r
JOIN descendants ON descendants.id = r.parent_id
)
SELECT
*
FROM
descendants
WHERE
descendants.id != $1`
rows, err := s.db.QueryContext(ctx, q, id)
if err != nil {
return nil, fmt.Errorf("finding descendants of repository: %w", err)
}
return scanFullRepositories(rows)
}
// FindAncestorsOf finds all ancestors of a given repository.
func (s *repositoryStore) FindAncestorsOf(ctx context.Context, id int64) (models.Repositories, error) {
defer metrics.InstrumentQuery("repository_find_ancestors_of")()
q := `WITH RECURSIVE ancestors AS (
SELECT
id,
top_level_namespace_id,
name,
path,
parent_id,
created_at,
updated_at
FROM
repositories
WHERE
id = $1
UNION ALL
SELECT
r.id,
r.top_level_namespace_id,
r.name,
r.path,
r.parent_id,
r.created_at,
r.updated_at
FROM
repositories AS r
JOIN ancestors ON ancestors.parent_id = r.id
)
SELECT
*
FROM
ancestors
WHERE
ancestors.id != $1`
rows, err := s.db.QueryContext(ctx, q, id)
if err != nil {
return nil, fmt.Errorf("finding ancestors of repository: %w", err)
}
return scanFullRepositories(rows)
}
// FindSiblingsOf finds all siblings of a given repository.
func (s *repositoryStore) FindSiblingsOf(ctx context.Context, id int64) (models.Repositories, error) {
defer metrics.InstrumentQuery("repository_find_siblings_of")()
q := `SELECT
siblings.id,
siblings.top_level_namespace_id,
siblings.name,
siblings.path,
siblings.parent_id,
siblings.created_at,
siblings.updated_at
FROM
repositories AS siblings
LEFT JOIN repositories AS anchor ON siblings.parent_id = anchor.parent_id
WHERE
anchor.id = $1
AND siblings.id != $1`
rows, err := s.db.QueryContext(ctx, q, id)
if err != nil {
return nil, fmt.Errorf("finding siblings of repository: %w", err)
}
return scanFullRepositories(rows)
}
// Tags finds all tags of a given repository.
func (s *repositoryStore) Tags(ctx context.Context, r *models.Repository) (models.Tags, error) {
defer metrics.InstrumentQuery("repository_tags")()
q := `SELECT
id,
top_level_namespace_id,
name,
repository_id,
manifest_id,
created_at,
updated_at
FROM
tags
WHERE
repository_id = $1`
rows, err := s.db.QueryContext(ctx, q, r.ID)
if err != nil {
return nil, fmt.Errorf("finding tags: %w", err)
}
return scanFullTags(rows)
}
// TagsPaginated finds up to `filters.MaxEntries` tags of a given repository with name lexicographically after `filters.LastEntry`. This is used
// exclusively for the GET /v2/<name>/tags/list API route, where pagination is done with a marker (`filters.LastEntry`). Even if
// there is no tag with a name of `filters.LastEntry`, the returned tags will always be those with a path lexicographically after
// `filters.LastEntry`. Finally, tags are lexicographically sorted. These constraints exists to preserve the existing API behavior
// (when doing a filesystem walk based pagination).
func (s *repositoryStore) TagsPaginated(ctx context.Context, r *models.Repository, filters FilterParams) (models.Tags, error) {
defer metrics.InstrumentQuery("repository_tags_paginated")()
q := `SELECT
id,
top_level_namespace_id,
name,
repository_id,
manifest_id,
created_at,
updated_at
FROM
tags
WHERE
top_level_namespace_id = $1
AND repository_id = $2
AND name > $3
ORDER BY
name
LIMIT $4`
rows, err := s.db.QueryContext(ctx, q, r.NamespaceID, r.ID, filters.LastEntry, filters.MaxEntries)
if err != nil {
return nil, fmt.Errorf("finding tags with pagination: %w", err)
}
return scanFullTags(rows)
}
func scanFullTagsDetail(rows *sql.Rows) ([]*models.TagDetail, error) {
tt := make([]*models.TagDetail, 0)
defer rows.Close()
for rows.Next() {
var dgst Digest
var cfgDgst sql.NullString
t := new(models.TagDetail)
if err := rows.Scan(&t.Name, &dgst, &cfgDgst, &t.MediaType, &t.Size, &t.CreatedAt, &t.UpdatedAt, &t.PublishedAt); err != nil {
return nil, fmt.Errorf("scanning tag details: %w", err)
}
var err error
t.Digest, err = dgst.Parse()
if err != nil {
return nil, err
}
t.ConfigDigest, err = parseConfigDigest(cfgDgst)
if err != nil {
return nil, err
}
tt = append(tt, t)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("scanning tag details: %w", err)
}
return tt, nil
}
func parseConfigDigest(cfgDgst sql.NullString) (models.NullDigest, error) {
var dgst models.NullDigest
if cfgDgst.Valid {
cd, err := Digest(cfgDgst.String).Parse()
if err != nil {
return dgst, err
}
dgst = models.NullDigest{
Digest: cd,
Valid: true,
}
}
return dgst, nil
}
// The query for this method takes a list of TagDetails and returns a list of
// manifests which are referrers to those tags - i.e. `subject_id` points to
// one of the tags. The method modifies the `tags` collection by populating
// the `Referrers` field with the query results.
func (s *repositoryStore) appendTagsDetailReferrers(ctx context.Context, r *models.Repository, tags []*models.TagDetail, artifactTypes []string) error {
if len(tags) == 0 {
return nil
}
sbjDigests := make([]string, 0, len(tags))
for _, tag := range tags {
nd, err := NewDigest(tag.Digest)
if err != nil {
return err
}
sbjDigests = append(sbjDigests, nd.HexDecode())
}
q := `SELECT
encode(m.digest, 'hex') AS digest,
COALESCE(at.media_type, cmt.media_type) AS artifact_type,
encode(ms.digest, 'hex') AS subject_digest
FROM
manifests AS m
JOIN manifests AS ms ON m.top_level_namespace_id = ms.top_level_namespace_id
AND m.subject_id = ms.id
LEFT JOIN media_types AS at ON at.id = m.artifact_media_type_id
LEFT JOIN media_types AS cmt ON cmt.id = m.configuration_media_type_id
WHERE
m.top_level_namespace_id = $1
AND m.repository_id = $2
AND m.subject_id IN (
SELECT
id
FROM
manifests
WHERE
top_level_namespace_id = $1
AND repository_id = $2
AND digest = ANY ($3))`
var rows *sql.Rows
var err error
if len(artifactTypes) > 0 {
ats, errInner := s.mediaTypeIDs(ctx, artifactTypes)
if errInner != nil {
return errInner
}
q += " AND (m.artifact_media_type_id = ANY ($4) OR m.configuration_media_type_id = ANY ($4))"
rows, err = s.db.QueryContext(ctx, q, r.NamespaceID, r.ID, sbjDigests, ats) // nolint: staticcheck // err is checked below
} else {
rows, err = s.db.QueryContext(ctx, q, r.NamespaceID, r.ID, sbjDigests)
}
if err != nil {
return err
}
defer rows.Close()
refMap := make(map[string][]models.TagReferrerDetail)
var (
dgst, sbjDgst Digest
at, sbjStr string
)
for rows.Next() {
if err = rows.Scan(&dgst, &at, &sbjDgst); err != nil {
return fmt.Errorf("scanning referrer: %w", err)
}
d, err := dgst.Parse()
if err != nil {
return err
}
sbj, err := sbjDgst.Parse()
if err != nil {
return err
}
sbjStr = sbj.String()
if refMap[sbjStr] == nil {
refMap[sbjStr] = make([]models.TagReferrerDetail, 0)
}
refMap[sbjStr] = append(refMap[sbjStr], models.TagReferrerDetail{
Digest: d.String(),
ArtifactType: at,
})
}
if err := rows.Err(); err != nil {
return fmt.Errorf("scanning referrers: %w", err)
}
for _, tag := range tags {
tag.Referrers = refMap[tag.Digest.String()]
}
return nil
}
// sqlPartialMatch builds a string that can be passed as value for a SQL `LIKE` expression. Besides surrounding the
// input value with `%` wildcard characters for a partial match, this function also escapes the `_` and `%`
// metacharacters supported in Postgres `LIKE` expressions.
// See https://www.postgresql.org/docs/current/functions-matching.html#FUNCTIONS-LIKE for more details.
func sqlPartialMatch(value string) string {
value = strings.ReplaceAll(value, "_", `\_`)
value = strings.ReplaceAll(value, "%", `\%`)
return fmt.Sprintf("%%%s%%", value)
}
// TagsDetailPaginated finds up to `filters.MaxEntries` tags of a given repository with name lexicographically after `filters.LastEntry`. This is
// used exclusively for the GET /gitlab/v1/<name>/tags/list API, where pagination is done with a marker (`filters.LastEntry`).
// Even if there is no tag with a name of `filters.LastEntry`, the returned tags will always be those with a path lexicographically
// after `filters.LastEntry`. Tags are lexicographically sorted.
// Optionally, it is possible to pass a string to be used as a partial match filter for tag names using `filters.Name` and exact match using
// `filters.ExactName`. The search is not filtered if both of these values are empty.
func (s *repositoryStore) TagsDetailPaginated(ctx context.Context, r *models.Repository, filters FilterParams) ([]*models.TagDetail, error) {
defer metrics.InstrumentQuery("repository_tags_detail_paginated")()
q, args, err := tagsDetailPaginatedQuery(r, filters)
if err != nil {
return nil, fmt.Errorf("constructing tags detail paginated query: %w", err)
}
rows, err := s.db.QueryContext(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("finding tags detail with pagination: %w", err)
}
tags, err := scanFullTagsDetail(rows)
if err != nil {
return nil, err
}
if filters.IncludeReferrers {
if err := s.appendTagsDetailReferrers(ctx, r, tags, filters.ReferrerTypes); err != nil {
return nil, fmt.Errorf("populating referrers: %w", err)
}
}
return tags, nil
}
// SingleTagDetail returns the detail of a tag with its manifest payload
// and its configuration payload for single manifests. The configuration
// payload will be empty for manifest lists.
func (s *repositoryStore) TagDetail(ctx context.Context, r *models.Repository, tagName string) (*models.TagDetail, error) {
defer metrics.InstrumentQuery("repository_tag_detail")()
q := `
SELECT
t.name,
encode(m.digest, 'hex') AS digest,
encode(m.configuration_blob_digest, 'hex') AS config_digest,
mt.media_type,
m.total_size,
t.created_at,
t.updated_at,
GREATEST(t.created_at, t.updated_at) as published_at,
m.id,
mtc.media_type as configuration_media_type,
m.configuration_payload
FROM tags t
JOIN manifests AS m ON m.top_level_namespace_id = t.top_level_namespace_id
AND m.repository_id = t.repository_id
AND m.id = t.manifest_id
JOIN media_types AS mt ON mt.id = m.media_type_id
LEFT JOIN media_types AS mtc ON mtc.id = m.configuration_media_type_id
WHERE
t.top_level_namespace_id = $1
AND t.repository_id = $2
AND t.name = $3
`
cfgPayload := new(models.Payload)
td := &models.TagDetail{}
var dgst Digest
var cfgDgst sql.NullString
var cfgMediaType sql.NullString
err := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.ID, tagName).Scan(
&td.Name, &dgst, &cfgDgst, &td.MediaType,
&td.Size, &td.CreatedAt, &td.UpdatedAt, &td.PublishedAt, &td.ManifestID, &cfgMediaType, &cfgPayload)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("finding single tag detail: %w", err)
}
td.Digest, err = dgst.Parse()
if err != nil {
return nil, err
}
td.ConfigDigest, err = parseConfigDigest(cfgDgst)
if err != nil {
return nil, err
}
switch td.MediaType {
case manifestlist.MediaTypeManifestList, v1.MediaTypeImageIndex:
// no op
default:
td.Configuration = &models.Configuration{
Digest: td.ConfigDigest.Digest,
MediaType: cfgMediaType.String,
Payload: *cfgPayload,
}
}
return td, nil
}
func (s *repositoryStore) mediaTypeIDs(ctx context.Context, types []string) ([]string, error) {
if len(types) == 0 {
return nil, nil
}
q := "SELECT id FROM media_types WHERE media_type = ANY ($1)"
rows, err := s.db.QueryContext(ctx, q, types)
if err != nil {
return nil, fmt.Errorf("selecting media types by name: %w", err)
}
defer rows.Close()
var ids []string
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
return nil, fmt.Errorf("scanning media type ids: %w", err)
}
ids = append(ids, strconv.FormatInt(id, 10))
}
return ids, nil
}
func tagsDetailPaginatedQuery(r *models.Repository, filters FilterParams) (string, []any, error) {
qb := NewQueryBuilder()
err := qb.Build(
`SELECT
t.name,
encode(m.digest, 'hex') AS digest,
encode(m.configuration_blob_digest, 'hex') AS config_digest,
mt.media_type,
m.total_size,
t.created_at,
t.updated_at,
GREATEST(t.created_at, t.updated_at) as published_at
FROM
tags AS t
JOIN manifests AS m ON m.top_level_namespace_id = t.top_level_namespace_id
AND m.repository_id = t.repository_id
AND m.id = t.manifest_id
JOIN media_types AS mt ON mt.id = m.media_type_id
WHERE
t.top_level_namespace_id = ?
AND t.repository_id = ?
`,
r.NamespaceID, r.ID,
)
if err != nil {
return "", nil, err
}
if filters.ExactName != "" {
// NOTE(prozlach): In the case when there is exact match requested,
// there is going to be only single entry in the response, or none. So
// there is no point in adding pagination and sorting keywords here.
err := qb.Build("AND t.name = ?", filters.ExactName)
if err != nil {
return "", nil, err
}
return qb.SQL(), qb.Params(), nil
}
// NOTE(prozlach): We handle both cases in this path - empty and not
// empty `Name` filter
err = qb.Build("AND t.name LIKE ?\n", sqlPartialMatch(filters.Name))
if err != nil {
return "", nil, err
}
// default to ascending order to keep backwards compatibility
if filters.SortOrder == "" {
filters.SortOrder = OrderAsc
}
if filters.OrderBy == "" {
filters.OrderBy = orderByName
}
switch {
case filters.LastEntry == "" && filters.BeforeEntry == "" && filters.PublishedAt == "":
// this should always return the first page up to filters.MaxEntries
if filters.OrderBy == "published_at" {
err = qb.Build(
fmt.Sprintf(`ORDER BY published_at %s, name %s LIMIT ?`, filters.SortOrder, filters.SortOrder),
filters.MaxEntries,
)
if err != nil {
return "", nil, err
}
} else {
err = qb.Build(
fmt.Sprintf(`ORDER BY name %s LIMIT ?`, filters.SortOrder),
filters.MaxEntries,
)
if err != nil {
return "", nil, err
}
}
case filters.LastEntry != "":
err := getLastEntryQuery(qb, filters)
if err != nil {
return "", nil, err
}
case filters.BeforeEntry != "":
err := getBeforeEntryQuery(qb, filters)
if err != nil {
return "", nil, err
}
case filters.PublishedAt != "":
err := getPublishedAtQuery(qb, filters)
if err != nil {
return "", nil, err
}
}
return qb.SQL(), qb.Params(), nil
}
func getPublishedAtQuery(qb *QueryBuilder, filters FilterParams) error {
f := func(comparisonSign, sortOrder SortOrder) string {
filterFmt := `AND GREATEST(t.created_at,t.updated_at) %s= ?
ORDER BY
published_at %s,
t.name %s
LIMIT ?`
return fmt.Sprintf(filterFmt, comparisonSign, sortOrder, sortOrder)
}
if filters.SortOrder == OrderDesc {
err := qb.Build(f(lessThan, OrderAsc), filters.PublishedAt, filters.MaxEntries)
if err != nil {
return err
}
// The results will be reversed, so we need to wrap the query in a
// SELECT statement that sorts the tags in the correct order
return qb.WrapIntoSubqueryOf(
fmt.Sprintf(`SELECT * FROM (%%s) AS tags ORDER BY tags.%s DESC`, filters.OrderBy),
)
}
return qb.Build(f(greaterThan, OrderAsc), filters.PublishedAt, filters.MaxEntries)
}
func getLastEntryQuery(qb *QueryBuilder, filters FilterParams) error {
var (
comparisonOperator string
orderDirection SortOrder
)
switch filters.SortOrder {
case OrderDesc:
orderDirection = OrderDesc
comparisonOperator = lessThan
case OrderAsc:
orderDirection = OrderAsc
comparisonOperator = greaterThan
}
if filters.PublishedAt != "" {
return qb.Build(
formatTagFilterWithPublishedAt(comparisonOperator, orderDirection),
filters.PublishedAt, filters.LastEntry, filters.MaxEntries,
)
}
return qb.Build(
formatTagFilter(comparisonOperator, filters.OrderBy, orderDirection),
filters.LastEntry, filters.MaxEntries,
)
}
func getBeforeEntryQuery(qb *QueryBuilder, filters FilterParams) error {
var (
comparisonOperator string
rootQueryOderDirection string
orderDirection SortOrder
)
switch filters.SortOrder {
case OrderDesc:
orderDirection = OrderAsc
comparisonOperator = greaterThan
rootQueryOderDirection = "DESC"
case OrderAsc:
orderDirection = OrderDesc
comparisonOperator = lessThan
rootQueryOderDirection = "ASC"
}
if filters.PublishedAt != "" {
err := qb.Build(
formatTagFilterWithPublishedAt(comparisonOperator, orderDirection),
filters.PublishedAt, filters.BeforeEntry, filters.MaxEntries,
)
if err != nil {
return err
}
} else {
err := qb.Build(
formatTagFilter(comparisonOperator, filters.OrderBy, orderDirection),
filters.BeforeEntry, filters.MaxEntries,
)
if err != nil {
return err
}
}
// The results will be reversed, so we need to wrap the query in a
// SELECT statement that sorts the tags in the correct order
// if we are fetching by filters.BeforeEntry we need to sort in DESC order
return qb.WrapIntoSubqueryOf(
fmt.Sprintf(`SElECT * FROM (%%s) AS tags ORDER BY tags.%s %s`, filters.OrderBy, rootQueryOderDirection),
)
}
// formatTagFilter using the base query from tagsDetailPaginatedQuery as reference
func formatTagFilter(comparisonSign, orderBy string, sortOrder SortOrder) string {
filter := `AND t.name %s ?
ORDER BY
%s %s
LIMIT ?`
return fmt.Sprintf(filter, comparisonSign, orderBy, sortOrder)
}
// formatTagFilterWithPublishedAt using the base query from tagsDetailPaginatedQuery as reference
func formatTagFilterWithPublishedAt(comparisonSign string, sortOrder SortOrder) string {
filter := `AND (GREATEST(t.created_at, t.updated_at), t.name) %s (?, ?)
ORDER BY
published_at %s,
t.name %s
LIMIT ?`
return fmt.Sprintf(filter, comparisonSign, sortOrder, sortOrder)
}
// HasTagsAfterName checks if a given repository has any more tags after `filters.LastEntry`. This is used
// exclusively for the GET /v2/<name>/tags/list API route, where pagination is done with a marker (`filters.LastEntry`). Even if
// there is no tag with a name of `filters.LastEntry`, the counted tags will always be those with a path lexicographically after
// `filters.LastEntry`. This constraint exists to preserve the existing API behavior (when doing a filesystem walk based
// pagination). Optionally, it is possible to pass a string to be used as a partial match filter for tag names using `filters.Name`.
// The search is not filtered if this value is an empty string.
func (s *repositoryStore) HasTagsAfterName(ctx context.Context, r *models.Repository, filters FilterParams) (bool, error) {
defer metrics.InstrumentQuery("repository_tags_count_after_name")()
qb := NewQueryBuilder()
err := qb.Build(`SELECT
1
FROM
tags
WHERE
top_level_namespace_id = ?
AND repository_id = ?
AND name LIKE ?`,
r.NamespaceID, r.ID, sqlPartialMatch(filters.Name),
)
if err != nil {
return false, err
}
comparison := greaterThan
if filters.SortOrder == OrderDesc {
comparison = lessThan
}
if filters.OrderBy != "published_at" {
err = qb.Build(fmt.Sprintf(`AND name %s ?`, comparison), filters.LastEntry)
if err != nil {
return false, err
}
} else {
err = qb.Build(
fmt.Sprintf(`AND (GREATEST(created_at, updated_at), name) %s (?, ?)`, comparison),
filters.PublishedAt, filters.LastEntry,
)
if err != nil {
return false, err
}
}
var count int
if err := s.db.QueryRowContext(ctx, qb.SQL(), qb.Params()...).Scan(&count); err != nil && !errors.Is(err, sql.ErrNoRows) {
return false, fmt.Errorf("checking if there are more tags after name: %w", err)
}
return count == 1, nil
}
// HasTagsBeforeName checks if a given repository has any more tags before `filters.BeforeEntry`. This is used
// exclusively for the GET /v2/<name>/tags/list API route, where pagination is done with a marker (`filters.BeforeEntry`). Even if
// there is no tag with a name of `filters.BeforeEntry`, the counted tags will always be those with a path lexicographically before
// `filters.BeforeEntry`. This constraint exists to preserve the existing API behavior (when doing a filesystem walk based
// pagination). Optionally, it is possible to pass a string to be used as a partial match filter for tag names using `filters.Name`.
// The search is not filtered if this value is an empty string.
func (s *repositoryStore) HasTagsBeforeName(ctx context.Context, r *models.Repository, filters FilterParams) (bool, error) {
// There is no point in querying this as it would mean we need to count ALL the tags
if filters.BeforeEntry == "" {
return false, nil
}
defer metrics.InstrumentQuery("repository_tags_count_before_name")()
qb := NewQueryBuilder()
err := qb.Build(`SELECT
1
FROM
tags
WHERE
top_level_namespace_id = ?
AND repository_id = ?
AND name LIKE ?`,
r.NamespaceID, r.ID, sqlPartialMatch(filters.Name),
)
if err != nil {
return false, err
}
comparison := lessThan
if filters.SortOrder == OrderDesc {
comparison = greaterThan
}
if filters.OrderBy != "published_at" {
err = qb.Build(fmt.Sprintf(`AND name %s ?`, comparison), filters.BeforeEntry)
if err != nil {
return false, err
}
} else {
err = qb.Build(
fmt.Sprintf(`AND (GREATEST(created_at, updated_at), name) %s (?, ?)`, comparison),
filters.PublishedAt, filters.BeforeEntry,
)
if err != nil {
return false, err
}
}
var count int
if err := s.db.QueryRowContext(ctx, qb.SQL(), qb.Params()...).Scan(&count); err != nil && !errors.Is(err, sql.ErrNoRows) {
return false, fmt.Errorf("checking if there are more tags before name: %w", err)
}
return count == 1, nil
}
// ManifestTags finds all tags of a given repository manifest.
func (s *repositoryStore) ManifestTags(ctx context.Context, r *models.Repository, m *models.Manifest) (models.Tags, error) {
defer metrics.InstrumentQuery("repository_manifest_tags")()
q := `SELECT
id,
top_level_namespace_id,
name,
repository_id,
manifest_id,
created_at,
updated_at
FROM
tags
WHERE
top_level_namespace_id = $1
AND repository_id = $2
AND manifest_id = $3`
rows, err := s.db.QueryContext(ctx, q, r.NamespaceID, r.ID, m.ID)
if err != nil {
return nil, fmt.Errorf("finding tags: %w", err)
}
return scanFullTags(rows)
}
// Count counts all repositories.
func (s *repositoryStore) Count(ctx context.Context) (int, error) {
defer metrics.InstrumentQuery("repository_count")()
q := "SELECT COUNT(*) FROM repositories"
var count int
if err := s.db.QueryRowContext(ctx, q).Scan(&count); err != nil {
return count, fmt.Errorf("counting repositories: %w", err)
}
return count, nil
}
// CountAfterPath counts all repositories with path lexicographically after lastPath. This is used exclusively
// for the GET /v2/_catalog API route, where pagination is done with a marker (lastPath). Empty repositories (which do
// not have at least a manifest) are ignored. Also, even if there is no repository with a path of lastPath, the counted
// repositories will always be those with a path lexicographically after lastPath. These constraints exists to preserve
// the existing API behavior (when doing a filesystem walk based pagination).
func (s *repositoryStore) CountAfterPath(ctx context.Context, path string) (int, error) {
defer metrics.InstrumentQuery("repository_count_after_path")()
q := `SELECT
COUNT(*)
FROM
repositories AS r
WHERE
EXISTS (
SELECT
FROM
manifests AS m
WHERE
m.top_level_namespace_id = r.top_level_namespace_id -- PROBLEM - cross partition scan
AND m.repository_id = r.id)
AND r.path > $1`
var count int
if err := s.db.QueryRowContext(ctx, q, path).Scan(&count); err != nil {
return count, fmt.Errorf("counting repositories lexicographically after path: %w", err)
}
return count, nil
}
// CountPathSubRepositories counts all sub repositories of a repository path (including the base repository).
func (s *repositoryStore) CountPathSubRepositories(ctx context.Context, topLevelNamespaceID int64, path string) (int, error) {
defer metrics.InstrumentQuery("repository_count_sub_repositories")()
q := "SELECT COUNT(*) FROM repositories WHERE top_level_namespace_id = $1 AND (path = $2 OR path LIKE $3)"
var count int
if err := s.db.QueryRowContext(ctx, q, topLevelNamespaceID, path, path+"/%").Scan(&count); err != nil {
return count, fmt.Errorf("counting sub-repositories: %w", err)
}
return count, nil
}
// Manifests finds all manifests associated with a repository.
func (s *repositoryStore) Manifests(ctx context.Context, r *models.Repository) (models.Manifests, error) {
defer metrics.InstrumentQuery("repository_manifests")()
q := `SELECT
m.id,
m.top_level_namespace_id,
m.repository_id,
m.total_size,
m.schema_version,
mt.media_type,
at.media_type as artifact_type,
encode(m.digest, 'hex') as digest,
m.payload,
mtc.media_type as configuration_media_type,
encode(m.configuration_blob_digest, 'hex') as configuration_blob_digest,
m.configuration_payload,
m.non_conformant,
m.non_distributable_layers,
m.subject_id,
m.created_at
FROM
manifests AS m
JOIN media_types AS mt ON mt.id = m.media_type_id
LEFT JOIN media_types AS mtc ON mtc.id = m.configuration_media_type_id
LEFT JOIN media_types AS at ON at.id = m.artifact_media_type_id
WHERE
m.top_level_namespace_id = $1
AND m.repository_id = $2
ORDER BY m.id`
rows, err := s.db.QueryContext(ctx, q, r.NamespaceID, r.ID)
if err != nil {
return nil, fmt.Errorf("finding manifests: %w", err)
}
return scanFullManifests(rows)
}
// FindManifestByDigest finds a manifest by digest within a repository.
func (s *repositoryStore) FindManifestByDigest(ctx context.Context, r *models.Repository, d digest.Digest) (*models.Manifest, error) {
defer metrics.InstrumentQuery("repository_find_manifest_by_digest")()
dgst, err := NewDigest(d)
if err != nil {
return nil, err
}
return findManifestByDigest(ctx, s.db, r.NamespaceID, r.ID, dgst)
}
// FindManifestByTagName finds a manifest by tag name within a repository.
func (s *repositoryStore) FindManifestByTagName(ctx context.Context, r *models.Repository, tagName string) (*models.Manifest, error) {
defer metrics.InstrumentQuery("repository_find_manifest_by_tag_name")()
q := `SELECT
m.id,
m.top_level_namespace_id,
m.repository_id,
m.total_size,
m.schema_version,
mt.media_type,
at.media_type as artifact_type,
encode(m.digest, 'hex') as digest,
m.payload,
mtc.media_type as configuration_media_type,
encode(m.configuration_blob_digest, 'hex') as configuration_blob_digest,
m.configuration_payload,
m.non_conformant,
m.non_distributable_layers,
m.subject_id,
m.created_at
FROM
manifests AS m
JOIN media_types AS mt ON mt.id = m.media_type_id
LEFT JOIN media_types AS mtc ON mtc.id = m.configuration_media_type_id
LEFT JOIN media_types AS at ON at.id = m.artifact_media_type_id
JOIN tags AS t ON t.top_level_namespace_id = m.top_level_namespace_id
AND t.repository_id = m.repository_id
AND t.manifest_id = m.id
WHERE
m.top_level_namespace_id = $1
AND m.repository_id = $2
AND t.name = $3`
row := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.ID, tagName)
return scanFullManifest(row)
}
// Blobs finds all blobs associated with the repository.
func (s *repositoryStore) Blobs(ctx context.Context, r *models.Repository) (models.Blobs, error) {
defer metrics.InstrumentQuery("repository_blobs")()
q := `SELECT
mt.media_type,
encode(b.digest, 'hex') as digest,
b.size,
b.created_at
FROM
blobs AS b
JOIN repository_blobs AS rb ON rb.blob_digest = b.digest
JOIN repositories AS r ON r.id = rb.repository_id
JOIN media_types AS mt ON mt.id = b.media_type_id
WHERE
r.top_level_namespace_id = $1
AND r.id = $2`
rows, err := s.db.QueryContext(ctx, q, r.NamespaceID, r.ID)
if err != nil {
return nil, fmt.Errorf("finding blobs: %w", err)
}
return scanFullBlobs(rows)
}
// FindBlob finds a blob by digest within a repository.
func (s *repositoryStore) FindBlob(ctx context.Context, r *models.Repository, d digest.Digest) (*models.Blob, error) {
defer metrics.InstrumentQuery("repository_find_blob")()
q := `SELECT
mt.media_type,
encode(b.digest, 'hex') as digest,
b.size,
b.created_at
FROM
blobs AS b
JOIN media_types AS mt ON mt.id = b.media_type_id
JOIN repository_blobs AS rb ON rb.blob_digest = b.digest
WHERE
rb.top_level_namespace_id = $1
AND rb.repository_id = $2
AND b.digest = decode($3, 'hex')`
dgst, err := NewDigest(d)
if err != nil {
return nil, err
}
row := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.ID, dgst)
return scanFullBlob(row)
}
// ExistsBlob finds if a blob with a given digest exists within a repository.
func (s *repositoryStore) ExistsBlob(ctx context.Context, r *models.Repository, d digest.Digest) (bool, error) {
defer metrics.InstrumentQuery("repository_exists_blob")()
q := `SELECT
EXISTS (
SELECT
1
FROM
repository_blobs
WHERE
top_level_namespace_id = $1
AND repository_id = $2
AND blob_digest = decode($3, 'hex'))`
dgst, err := NewDigest(d)
if err != nil {
return false, err
}
var exists bool
row := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.ID, dgst)
if err := row.Scan(&exists); err != nil {
return false, fmt.Errorf("scanning blob: %w", err)
}
return exists, nil
}
// Create saves a new repository.
func (s *repositoryStore) Create(ctx context.Context, r *models.Repository) error {
defer metrics.InstrumentQuery("repository_create")()
q := `INSERT INTO repositories (top_level_namespace_id, name, path, parent_id)
VALUES ($1, $2, $3, $4)
RETURNING
id, created_at`
row := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.Name, r.Path, r.ParentID)
if err := row.Scan(&r.ID, &r.CreatedAt); err != nil {
return fmt.Errorf("creating repository: %w", err)
}
s.cache.Set(ctx, r)
return nil
}
// FindTagByName finds a tag by name within a repository.
func (s *repositoryStore) FindTagByName(ctx context.Context, r *models.Repository, name string) (*models.Tag, error) {
defer metrics.InstrumentQuery("repository_find_tag_by_name")()
q := `SELECT
id,
top_level_namespace_id,
name,
repository_id,
manifest_id,
created_at,
updated_at
FROM
tags
WHERE
top_level_namespace_id = $1
AND repository_id = $2
AND name = $3`
row := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.ID, name)
return scanFullTag(row)
}
// Size returns the deduplicated size of a repository. This is the sum of the size of all unique layers referenced by
// at least one tagged (directly or indirectly) manifest. No error is returned if the repository does not exist. It is
// the caller's responsibility to ensure it exists before calling this method and proceed accordingly if that matters.
func (s *repositoryStore) Size(ctx context.Context, r *models.Repository) (RepositorySize, error) {
// Check the cached repository object for the size attribute first
if r.Size != nil {
return RepositorySize{bytes: *r.Size}, nil
}
defer metrics.InstrumentQuery("repository_size")()
q := `SELECT
coalesce(sum(q.size), 0)
FROM ( WITH RECURSIVE cte AS (
SELECT
m.id AS manifest_id
FROM
manifests AS m
WHERE
m.top_level_namespace_id = $1
AND m.repository_id = $2
AND EXISTS (
SELECT
FROM
tags AS t
WHERE
t.top_level_namespace_id = m.top_level_namespace_id
AND t.repository_id = m.repository_id
AND t.manifest_id = m.id)
UNION
SELECT
mr.child_id AS manifest_id
FROM
manifest_references AS mr
JOIN cte ON mr.parent_id = cte.manifest_id
WHERE
mr.top_level_namespace_id = $1
AND mr.repository_id = $2)
SELECT DISTINCT ON (l.digest)
l.size
FROM
layers AS l
JOIN cte ON l.top_level_namespace_id = $1
AND l.repository_id = $2
AND l.manifest_id = cte.manifest_id) AS q`
var b int64
if err := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.ID).Scan(&b); err != nil {
return RepositorySize{}, fmt.Errorf("calculating repository size: %w", err)
}
// Update the size attribute for the cached repository object
r.Size = &b
s.cache.Set(ctx, r)
return RepositorySize{bytes: b}, nil
}
// topLevelSizeWithDescendants is an optimization for SizeWithDescendants when the target repository is a top-level
// repository. This allows using an optimized SQL query for this specific scenario.
func (s *repositoryStore) topLevelSizeWithDescendants(ctx context.Context, r *models.Repository) (int64, error) {
defer metrics.InstrumentQuery("repository_size_with_descendants_top_level")()
q := `SELECT
coalesce(sum(q.size), 0)
FROM ( WITH RECURSIVE cte AS (
SELECT
m.id AS manifest_id,
m.repository_id
FROM
manifests AS m
WHERE
m.top_level_namespace_id = $1
AND EXISTS (
SELECT
FROM
tags AS t
WHERE
t.top_level_namespace_id = m.top_level_namespace_id
AND t.repository_id = m.repository_id
AND t.manifest_id = m.id)
UNION
SELECT
mr.child_id AS manifest_id,
mr.repository_id
FROM
manifest_references AS mr
JOIN cte ON mr.repository_id = cte.repository_id
AND mr.parent_id = cte.manifest_id
WHERE
mr.top_level_namespace_id = $1
)
SELECT DISTINCT ON (l.digest)
l.size
FROM
cte
CROSS JOIN LATERAL (
SELECT
digest,
size
FROM
layers
WHERE
top_level_namespace_id = $1
AND repository_id = cte.repository_id
AND manifest_id = cte.manifest_id
ORDER BY
digest) l) AS q`
var size int64
if err := s.db.QueryRowContext(ctx, q, r.NamespaceID).Scan(&size); err != nil {
return 0, fmt.Errorf("calculating top-level repository size with descendants: %w", err)
}
return size, nil
}
// nonTopLevelSizeWithDescendants is an optimization for SizeWithDescendants when the target repository is not a
// top-level repository. This allows using an optimized SQL query for this specific scenario.
func (s *repositoryStore) nonTopLevelSizeWithDescendants(ctx context.Context, r *models.Repository) (int64, error) {
defer metrics.InstrumentQuery("repository_size_with_descendants")()
q := `SELECT
coalesce(sum(q.size), 0)
FROM ( WITH RECURSIVE repository_ids AS MATERIALIZED (
SELECT
id
FROM
repositories
WHERE
top_level_namespace_id = $1
AND (
path = $2
OR path LIKE $3
)
),
cte AS (
SELECT
m.id AS manifest_id
FROM
manifests AS m
WHERE
m.top_level_namespace_id = $1
AND m.repository_id IN (
SELECT
id
FROM
repository_ids)
AND EXISTS (
SELECT
FROM
tags AS t
WHERE
t.top_level_namespace_id = m.top_level_namespace_id
AND t.repository_id = m.repository_id
AND t.manifest_id = m.id)
UNION
SELECT
mr.child_id AS manifest_id
FROM
manifest_references AS mr
JOIN cte ON mr.parent_id = cte.manifest_id
WHERE
mr.top_level_namespace_id = $1
AND mr.repository_id IN (
SELECT
id
FROM
repository_ids))
SELECT DISTINCT ON (l.digest)
l.size
FROM
layers AS l
JOIN cte ON l.top_level_namespace_id = $1
AND l.repository_id IN (
SELECT
id
FROM
repository_ids)
AND l.manifest_id = cte.manifest_id) AS q`
var size int64
if err := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.Path, r.Path+"/%").Scan(&size); err != nil {
return 0, fmt.Errorf("calculating repository size with descendants: %w", err)
}
return size, nil
}
var ErrSizeHasTimedOut = errors.New("size query timed out previously")
// RepositorySize represents the result of calculating the size of a repository.
type RepositorySize struct {
bytes int64
cached bool
}
// Bytes returns the size in bytes.
func (s RepositorySize) Bytes() int64 {
return s.bytes
}
// Cached indicates whether the size of the repository was obtained from cache.
func (s RepositorySize) Cached() bool {
return s.cached
}
// SizeWithDescendants returns the deduplicated size of a repository, including all descendants (if any). This is the
// sum of the size of all unique layers referenced by at least one tagged (directly or indirectly) manifest. No error is
// returned if the repository does not exist. It is the caller's responsibility to ensure it exists before calling this
// method and proceed accordingly if that matters.
// If this method, for this repository, failed with a statement timeout in the last 24h, then ErrSizeHasTimedOut is
// returned to prevent consecutive failures. The caller can then fall back to EstimatedSizeWithDescendants. This is a
// mitigation strategy for https://gitlab.com/gitlab-org/container-registry/-/issues/779.
func (s *repositoryStore) SizeWithDescendants(ctx context.Context, r *models.Repository) (RepositorySize, error) {
if !r.IsTopLevel() {
b, err := s.nonTopLevelSizeWithDescendants(ctx, r)
return RepositorySize{bytes: b}, err
}
if found, b := s.cache.GetSizeWithDescendants(ctx, r); found {
return RepositorySize{b, true}, nil
}
if s.cache.HasSizeWithDescendantsTimedOut(ctx, r) {
return RepositorySize{}, ErrSizeHasTimedOut
}
b, err := s.topLevelSizeWithDescendants(ctx, r)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.QueryCanceled {
// flag query failure for target repository to avoid consecutive failures
s.cache.SizeWithDescendantsTimedOut(ctx, r)
}
}
s.cache.SetSizeWithDescendants(ctx, r, b)
return RepositorySize{bytes: b}, err
}
// estimateTopLevelSizeWithDescendants is a simplified alternative to topLevelSizeWithDescendants which does not exclude
// estimateTopLevelSizeWithDescendants is a significantly faster alternative to topLevelSizeWithDescendants which does
// not exclude unreferenced layers. Therefore, the measured size should be considered an estimate.
func (s *repositoryStore) estimateTopLevelSizeWithDescendants(ctx context.Context, r *models.Repository) (int64, error) {
defer metrics.InstrumentQuery("repository_size_with_descendants_top_level_estimate")()
q := `SELECT
coalesce(sum(q.size), 0)
FROM ( SELECT DISTINCT ON (digest)
size
FROM
layers
WHERE
top_level_namespace_id = $1) q`
var size int64
if err := s.db.QueryRowContext(ctx, q, r.NamespaceID).Scan(&size); err != nil {
return 0, fmt.Errorf("estimating top-level repository size with descendants: %w", err)
}
return size, nil
}
var ErrOnlyRootEstimates = errors.New("only the size of root repositories can be estimated")
// EstimatedSizeWithDescendants is an alternative to SizeWithDescendants that relies on a simpler query that does not
// exclude unreferenced layers. Therefore, the measured size should be considered an estimate. This is a partial
// mitigation for https://gitlab.com/gitlab-org/container-registry/-/issues/779. For now, only top-level namespaces are
// supported. ErrOnlyRootEstimates is returned if attempting to estimate the size of a non-root repository.
func (s *repositoryStore) EstimatedSizeWithDescendants(ctx context.Context, r *models.Repository) (RepositorySize, error) {
if !r.IsTopLevel() {
return RepositorySize{}, ErrOnlyRootEstimates
}
if found, b := s.cache.GetSizeWithDescendants(ctx, r); found {
return RepositorySize{b, true}, nil
}
b, err := s.estimateTopLevelSizeWithDescendants(ctx, r)
if err != nil {
return RepositorySize{}, err
}
s.cache.SetSizeWithDescendants(ctx, r, b)
return RepositorySize{bytes: b}, err
}
// CreateOrFind attempts to create a repository. If the repository already exists (same path) that record is loaded from
// the database into r. This is similar to a FindByPath followed by a Create, but without being prone to race conditions
// on write operations between the corresponding read (FindByPath) and write (Create) operations. Separate Find* and
// Create method calls should be preferred to this when race conditions are not a concern.
func (s *repositoryStore) CreateOrFind(ctx context.Context, r *models.Repository) error {
if cached := s.cache.Get(ctx, r.Path); cached != nil {
*r = *cached
return nil
}
if r.NamespaceID == 0 {
n := &models.Namespace{Name: r.TopLevelPathSegment()}
ns := NewNamespaceStore(s.db)
if err := ns.SafeFindOrCreate(ctx, n); err != nil {
return fmt.Errorf("finding or creating namespace: %w", err)
}
r.NamespaceID = n.ID
}
defer metrics.InstrumentQuery("repository_create_or_find")()
// First, check if the repository already exists, this avoids incrementing the repositories.id sequence
// unnecessarily as we know that the target repository will already exist for all requests except the first.
tmp, err := s.FindByPath(ctx, r.Path)
if err != nil {
return err
}
if tmp != nil {
*r = *tmp
return nil
}
// if not, proceed with creation attempt...
// ON CONFLICT (path) DO UPDATE SET is a temporary measure until
// https://gitlab.com/gitlab-org/container-registry/-/issues/625. If a repo record already exists for `path` but is
// marked as soft deleted, we should undo the soft delete and proceed gracefully.
q := `INSERT INTO repositories (top_level_namespace_id, name, path, parent_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (path)
DO UPDATE SET
deleted_at = NULL
RETURNING
id, created_at, deleted_at` // deleted_at returned for test validation purposes only
row := s.db.QueryRowContext(ctx, q, r.NamespaceID, r.Name, r.Path, r.ParentID)
if err := row.Scan(&r.ID, &r.CreatedAt, &r.DeletedAt); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("creating repository: %w", err)
}
// if the result set has no rows, then the repository already exists
tmp, err := s.FindByPath(ctx, r.Path)
if err != nil {
return err
}
*r = *tmp
s.cache.Set(ctx, r)
}
return nil
}
func splitRepositoryPath(path string) []string {
return strings.Split(filepath.Clean(path), "/")
}
// repositoryName parses a repository path (e.g. `"a/b/c"`) and returns its name (e.g. `"c"`).
func repositoryName(path string) string {
segments := splitRepositoryPath(path)
return segments[len(segments)-1]
}
// CreateByPath creates the repository for a given path. An error is returned if the repository already exists.
func (s *repositoryStore) CreateByPath(ctx context.Context, path string, opts ...repositoryOption) (*models.Repository, error) {
if cached := s.cache.Get(ctx, path); cached != nil {
return cached, nil
}
n := &models.Namespace{Name: strings.Split(path, "/")[0]}
ns := NewNamespaceStore(s.db)
if err := ns.SafeFindOrCreate(ctx, n); err != nil {
return nil, fmt.Errorf("finding or creating namespace: %w", err)
}
defer metrics.InstrumentQuery("repository_create_by_path")()
r := &models.Repository{NamespaceID: n.ID, Name: repositoryName(path), Path: path}
for _, opt := range opts {
opt(r)
}
if err := s.Create(ctx, r); err != nil {
return nil, err
}
s.cache.Set(ctx, r)
return r, nil
}
// CreateOrFindByPath is the fully idempotent version of CreateByPath, where no error is returned if the repository
// already exists.
func (s *repositoryStore) CreateOrFindByPath(ctx context.Context, path string, opts ...repositoryOption) (*models.Repository, error) {
if cached := s.cache.Get(ctx, path); cached != nil {
return cached, nil
}
n := &models.Namespace{Name: strings.Split(path, "/")[0]}
ns := NewNamespaceStore(s.db)
if err := ns.SafeFindOrCreate(ctx, n); err != nil {
return nil, fmt.Errorf("finding or creating namespace: %w", err)
}
defer metrics.InstrumentQuery("repository_create_or_find_by_path")()
r := &models.Repository{NamespaceID: n.ID, Name: repositoryName(path), Path: path}
for _, opt := range opts {
opt(r)
}
if err := s.CreateOrFind(ctx, r); err != nil {
return nil, err
}
s.cache.Set(ctx, r)
return r, nil
}
// Update updates an existing repository.
func (s *repositoryStore) Update(ctx context.Context, r *models.Repository) error {
defer metrics.InstrumentQuery("repository_update")()
q := `UPDATE
repositories
SET
(name, path, parent_id, updated_at) = ($1, $2, $3, now())
WHERE
top_level_namespace_id = $4
AND id = $5
RETURNING
updated_at`
row := s.db.QueryRowContext(ctx, q, r.Name, r.Path, r.ParentID, r.NamespaceID, r.ID)
if err := row.Scan(&r.UpdatedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("repository not found")
}
return fmt.Errorf("updating repository: %w", err)
}
s.cache.Set(ctx, r)
return nil
}
// LinkBlob links a blob to a repository. It does nothing if already linked.
func (s *repositoryStore) LinkBlob(ctx context.Context, r *models.Repository, d digest.Digest) error {
defer metrics.InstrumentQuery("repository_link_blob")()
q := `INSERT INTO repository_blobs (top_level_namespace_id, repository_id, blob_digest)
VALUES ($1, $2, decode($3, 'hex'))
ON CONFLICT (top_level_namespace_id, repository_id, blob_digest)
DO NOTHING`
dgst, err := NewDigest(d)
if err != nil {
return err
}
if _, err := s.db.ExecContext(ctx, q, r.NamespaceID, r.ID, dgst); err != nil {
return fmt.Errorf("linking blob: %w", err)
}
return nil
}
// UnlinkBlob unlinks a blob from a repository. It does nothing if not linked. A boolean is returned to denote whether
// the link was deleted or not. This avoids the need for a separate preceding `SELECT` to find if it exists.
func (s *repositoryStore) UnlinkBlob(ctx context.Context, r *models.Repository, d digest.Digest) (bool, error) {
defer metrics.InstrumentQuery("repository_unlink_blob")()
q := "DELETE FROM repository_blobs WHERE top_level_namespace_id = $1 AND repository_id = $2 AND blob_digest = decode($3, 'hex')"
dgst, err := NewDigest(d)
if err != nil {
return false, err
}
res, err := s.db.ExecContext(ctx, q, r.NamespaceID, r.ID, dgst)
if err != nil {
return false, fmt.Errorf("linking blob: %w", err)
}
count, err := res.RowsAffected()
if err != nil {
return false, fmt.Errorf("linking blob: %w", err)
}
return count == 1, nil
}
// DeleteTagByName deletes a tag by name within a repository. A boolean is returned to denote whether the tag was
// deleted or not. This avoids the need for a separate preceding `SELECT` to find if it exists.
func (s *repositoryStore) DeleteTagByName(ctx context.Context, r *models.Repository, name string) (bool, error) {
defer metrics.InstrumentQuery("repository_delete_tag_by_name")()
q := "DELETE FROM tags WHERE top_level_namespace_id = $1 AND repository_id = $2 AND name = $3"
res, err := s.db.ExecContext(ctx, q, r.NamespaceID, r.ID, name)
if err != nil {
return false, fmt.Errorf("deleting tag: %w", err)
}
count, err := res.RowsAffected()
if err != nil {
return false, fmt.Errorf("deleting tag: %w", err)
}
s.cache.InvalidateSize(ctx, r)
return count == 1, nil
}
// DeleteManifest deletes a manifest from a repository. A boolean is returned to denote whether the manifest was deleted
// or not. This avoids the need for a separate preceding `SELECT` to find if it exists. A manifest cannot be deleted if
// it is referenced by a manifest list.
func (s *repositoryStore) DeleteManifest(ctx context.Context, r *models.Repository, d digest.Digest) (bool, error) {
defer metrics.InstrumentQuery("repository_delete_manifest")()
q := "DELETE FROM manifests WHERE top_level_namespace_id = $1 AND repository_id = $2 AND digest = decode($3, 'hex')"
dgst, err := NewDigest(d)
if err != nil {
return false, err
}
res, err := s.db.ExecContext(ctx, q, r.NamespaceID, r.ID, dgst)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.ForeignKeyViolation && pgErr.TableName == "manifest_references" {
return false, fmt.Errorf("deleting manifest: %w", ErrManifestReferencedInList)
}
return false, fmt.Errorf("deleting manifest: %w", err)
}
count, err := res.RowsAffected()
if err != nil {
return false, fmt.Errorf("deleting manifest: %w", err)
}
s.cache.InvalidateSize(ctx, r)
return count == 1, nil
}
// FindPaginatedRepositoriesForPath finds all repositories (up to `filters.MaxEntries` repositories) that have the same base path as the requested repository.
// The results are ordered lexicographically by repository path and only begin from `filters.LastEntry`.
// Empty repositories (which do not have at least 1 tag) are ignored in the returned list.
// Also, even if there is no repository with a path equivalent to `filters.LastEntry`, the returned
// repositories will still be those with a base path of the requested repository and lexicographically after `filters.LastEntry`.
func (s *repositoryStore) FindPaginatedRepositoriesForPath(ctx context.Context, r *models.Repository, filters FilterParams) (models.Repositories, error) {
// start from a path lexicographically before r.Path when no last path is available.
// this improves the query performance as we will not need to filter from `r.path > ""` in the query below.
if filters.LastEntry == "" {
filters.LastEntry = lexicographicallyBeforePath(r.Path)
}
defer metrics.InstrumentQuery("repository_find_paginated_repositories_for_path")()
q := `SELECT
id,
top_level_namespace_id,
name,
path,
parent_id,
created_at,
updated_at
FROM
repositories AS r
WHERE
(r.path = $1 OR r.path LIKE $2)
AND EXISTS ( SELECT FROM tags AS t WHERE t.top_level_namespace_id = r.top_level_namespace_id AND t.repository_id = r.id )
AND (r.path > $3 AND r.path < $4)
AND r.top_level_namespace_id = $5
ORDER BY r.path
LIMIT $6`
rows, err := s.db.QueryContext(ctx, q, r.Path, r.Path+"/%", filters.LastEntry, lexicographicallyNextPath(r.Path), r.NamespaceID, filters.MaxEntries)
if err != nil {
return nil, fmt.Errorf("finding pagingated list of repository for path: %w", err)
}
return scanFullRepositories(rows)
}
// RenamePathForSubRepositories updates all sub repositories that start with a repository `oldPath` to a `newPath`
// e.g: All sub repositories with `oldPath`: my-group/my-sub-group/old-repo-name will be changed to:
// my-group/my-sub-group/new-repo-name, where the `newPath` argument is `my-group/my-sub-group/new-repo-name`.
// This does not change the base repository's path however.
func (s *repositoryStore) RenamePathForSubRepositories(ctx context.Context, topLevelNamespaceID int64, oldPath, newPath string) error {
defer metrics.InstrumentQuery("repository_rename_sub_repositories_path")()
q := "UPDATE repositories SET path = REPLACE(path, $1, $2) WHERE top_level_namespace_id = $3 AND path LIKE $4"
_, err := s.db.ExecContext(ctx, q, oldPath, newPath, topLevelNamespaceID, oldPath+"/%")
if err != nil {
return fmt.Errorf("renaming sub-repository paths: %w", err)
}
return nil
}
// Rename updates a repository's path and name attributes to a `newPath` and `newName` respectively.
// This must always be followed by `RenamePathForSubRepositories` to make sure sub-repositories starting with
// the `oldPath`of the repository are also updated to start with the `newPath`.
func (s *repositoryStore) Rename(ctx context.Context, r *models.Repository, newPath, newName string) error {
defer metrics.InstrumentQuery("repository_rename")()
q := "UPDATE repositories SET path = $1, name = $2 WHERE top_level_namespace_id = $3 AND path = $4 RETURNING updated_at"
row := s.db.QueryRowContext(ctx, q, newPath, newName, r.NamespaceID, r.Path)
if err := row.Scan(&r.UpdatedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("repository not found")
}
return fmt.Errorf("renaming repository: %w", err)
}
s.cache.Set(ctx, r)
return nil
}
// UpdateLastPublishedAt updates the timestamp of the last tag published to the repository. This is the greatest value
// between the tag created at and published at timestamps.
func (s *repositoryStore) UpdateLastPublishedAt(ctx context.Context, r *models.Repository, t *models.Tag) error {
defer metrics.InstrumentQuery("repository_update_last_published_at")()
q := `UPDATE
repositories
SET
last_published_at = $1
WHERE
top_level_namespace_id = $2
AND id = $3
RETURNING
last_published_at`
timestamp := t.CreatedAt
if t.UpdatedAt.Valid {
timestamp = t.UpdatedAt.Time
}
row := s.db.QueryRowContext(ctx, q, timestamp, r.NamespaceID, r.ID)
if err := row.Scan(&r.LastPublishedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("repository not found")
}
return fmt.Errorf("updating repository last published at: %w", err)
}
s.cache.Set(ctx, r)
return nil
}
// lexicographicallyNextPath takes a path string and returns the next lexicographical path string.
// Empty paths (i.e a path of the form "") will result in an "a", paths with only "z" characters will append an a.
// All other paths will result in their next logical lexicographical variation (e.g gitlab-com => gitlab-con , gitlab-com. => gitlab-com/)
// This function serves primarily as a helper for optimizing paginated db queries used in `FindPaginatedRepositoriesForPath.
func lexicographicallyNextPath(path string) string {
// Find first character from right
// which is not z.
i := strings.LastIndexFunc(path, func(r rune) bool {
return r != 'z'
})
var nexLexPath string
// If all characters are 'z' or empty string, append
// an 'a' at the end.
if i == -1 {
nexLexPath = path + "a"
} else {
// If there are some non-z characters
rPath := []rune(path)
rPath[i]++
nexLexPath = string(rPath)
}
return nexLexPath
}
// lexicographicallyBeforePath takes a path string and returns the lexicographical path just before the provided path.
// e.g gitlab-con => gitlab-com , gitlab-com/ => gitlab-com.
// In the event that an empty string is provided as a path it returns 'z'.
// In the event where only "a's" exist in the path; the last 'a' character of the path is converted to a "z".
// This function serves primarily as a helper for optimizing paginated db queries used in `FindPaginatedRepositoriesForPath.
func lexicographicallyBeforePath(path string) string {
// this shouldn't be possible but to be safe we return a z on empty path
if path == "" {
return "z"
}
// Find first character from right
// which is not a.
i := strings.LastIndexFunc(path, func(r rune) bool {
return r != 'a'
})
var beforeLexPath string
// If all characters are 'a',
// change the last character to 'z'.
if i == -1 {
beforeLexPath = strings.TrimSuffix(path, "a") + "z"
} else {
// If there are some non "a" characters,
// convert the very last one to its reverse lexicographical unicode character
rPath := []rune(path)
rPath[i]--
beforeLexPath = string(rPath)
}
return beforeLexPath
}