registry/datastore/manifest.go (447 lines of code) (raw):
//go:generate mockgen -package mocks -destination mocks/manifest.go . ManifestStore
package datastore
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/docker/distribution/registry/datastore/metrics"
"github.com/docker/distribution/registry/datastore/models"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/opencontainers/go-digest"
)
// ManifestReader is the interface that defines read operations for a Manifest store.
type ManifestReader interface {
FindAll(ctx context.Context) (models.Manifests, error)
Count(ctx context.Context) (int, error)
LayerBlobs(ctx context.Context, m *models.Manifest) (models.Blobs, error)
References(ctx context.Context, m *models.Manifest) (models.Manifests, error)
}
// ManifestWriter is the interface that defines write operations for a Manifest store.
type ManifestWriter interface {
Create(ctx context.Context, m *models.Manifest) error
CreateOrFind(ctx context.Context, m *models.Manifest) error
AssociateManifest(ctx context.Context, ml, m *models.Manifest) error
DissociateManifest(ctx context.Context, ml, m *models.Manifest) error
AssociateLayerBlob(ctx context.Context, m *models.Manifest, b *models.Blob) error
DissociateLayerBlob(ctx context.Context, m *models.Manifest, b *models.Blob) error
Delete(ctx context.Context, namespaceID, repositoryID, id int64) (*digest.Digest, error)
}
// ManifestStore is the interface that a Manifest store should conform to.
type ManifestStore interface {
ManifestReader
ManifestWriter
}
// manifestStore is the concrete implementation of a ManifestStore.
type manifestStore struct {
db Queryer
}
// NewManifestStore builds a new manifest store.
func NewManifestStore(db Queryer) ManifestStore {
return &manifestStore{db: db}
}
func scanFullManifest(row *Row) (*models.Manifest, error) {
var dgst Digest
var cfgDigest, cfgMediaType sql.NullString
var cfgPayload *models.Payload
m := new(models.Manifest)
err := row.Scan(&m.ID, &m.NamespaceID, &m.RepositoryID, &m.TotalSize, &m.SchemaVersion,
&m.MediaType, &m.ArtifactType, &dgst, &m.Payload, &cfgMediaType, &cfgDigest, &cfgPayload,
&m.NonConformant, &m.NonDistributableLayers, &m.SubjectID, &m.CreatedAt)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("scanning manifest: %w", err)
}
return nil, nil
}
d, err := dgst.Parse()
if err != nil {
return nil, err
}
m.Digest = d
if cfgDigest.Valid {
d, err := Digest(cfgDigest.String).Parse()
if err != nil {
return nil, err
}
m.Configuration = &models.Configuration{
MediaType: cfgMediaType.String,
Digest: d,
Payload: *cfgPayload,
}
}
return m, nil
}
func scanFullManifests(rows *sql.Rows) (models.Manifests, error) {
mm := make(models.Manifests, 0)
defer rows.Close()
for rows.Next() {
var dgst Digest
var cfgDigest, cfgMediaType sql.NullString
var cfgPayload *models.Payload
m := new(models.Manifest)
err := rows.Scan(&m.ID, &m.NamespaceID, &m.RepositoryID, &m.TotalSize, &m.SchemaVersion,
&m.MediaType, &m.ArtifactType, &dgst, &m.Payload, &cfgMediaType, &cfgDigest, &cfgPayload,
&m.NonConformant, &m.NonDistributableLayers, &m.SubjectID, &m.CreatedAt)
if err != nil {
return nil, fmt.Errorf("scanning manifest: %w", err)
}
d, err := dgst.Parse()
if err != nil {
return nil, err
}
m.Digest = d
if cfgDigest.Valid {
d, err := Digest(cfgDigest.String).Parse()
if err != nil {
return nil, err
}
m.Configuration = &models.Configuration{
MediaType: cfgMediaType.String,
Digest: d,
Payload: *cfgPayload,
}
}
mm = append(mm, m)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("scanning manifests: %w", err)
}
return mm, nil
}
// FindAll finds all manifests.
func (s *manifestStore) FindAll(ctx context.Context) (models.Manifests, error) {
defer metrics.InstrumentQuery("manifest_find_all")()
q := `SELECT
m.id,
m.top_level_namespace_id,
m.repository_id,
m.total_size,
m.schema_version,
mt.media_type,
at.media_type as artifact_type,
encode(m.digest, 'hex') as digest,
m.payload,
mtc.media_type as configuration_media_type,
encode(m.configuration_blob_digest, 'hex') as configuration_blob_digest,
m.configuration_payload,
m.non_conformant,
m.non_distributable_layers,
m.subject_id,
m.created_at
FROM
manifests AS m
JOIN media_types AS mt ON mt.id = m.media_type_id
LEFT JOIN media_types AS mtc ON mtc.id = m.configuration_media_type_id
LEFT JOIN media_types AS at ON at.id = m.artifact_media_type_id
ORDER BY
id`
rows, err := s.db.QueryContext(ctx, q)
if err != nil {
return nil, fmt.Errorf("finding manifests: %w", err)
}
return scanFullManifests(rows)
}
// Count counts all manifests.
func (s *manifestStore) Count(ctx context.Context) (int, error) {
defer metrics.InstrumentQuery("manifest_count")()
q := "SELECT COUNT(*) FROM manifests"
var count int
if err := s.db.QueryRowContext(ctx, q).Scan(&count); err != nil {
return count, fmt.Errorf("counting manifests: %w", err)
}
return count, nil
}
// LayerBlobs finds layer blobs associated with a manifest, through the `layers` relationship entity.
func (s *manifestStore) LayerBlobs(ctx context.Context, m *models.Manifest) (models.Blobs, error) {
defer metrics.InstrumentQuery("manifest_layer_blobs")()
q := `SELECT
mt.media_type,
encode(b.digest, 'hex') as digest,
b.size,
b.created_at
FROM
layers AS l
INNER JOIN blobs AS b ON l.digest = b.digest
JOIN media_types AS mt ON mt.id = l.media_type_id
WHERE
l.manifest_id = $1
AND l.repository_id = $2
AND l.top_level_namespace_id = $3`
rows, err := s.db.QueryContext(ctx, q, m.ID, m.RepositoryID, m.NamespaceID)
if err != nil {
return nil, fmt.Errorf("finding blobs: %w", err)
}
return scanFullBlobs(rows)
}
// References finds all manifests directly referenced by a manifest (if any).
func (s *manifestStore) References(ctx context.Context, m *models.Manifest) (models.Manifests, error) {
defer metrics.InstrumentQuery("manifest_references")()
q := `SELECT DISTINCT
m.id,
m.top_level_namespace_id,
m.repository_id,
m.total_size,
m.schema_version,
mt.media_type,
at.media_type as artifact_type,
encode(m.digest, 'hex') as digest,
m.payload,
mtc.media_type as configuration_media_type,
encode(m.configuration_blob_digest, 'hex') as configuration_blob_digest,
m.configuration_payload,
m.non_conformant,
m.non_distributable_layers,
m.subject_id,
m.created_at
FROM
manifests AS m
JOIN manifest_references AS mr ON mr.top_level_namespace_id = m.top_level_namespace_id
AND mr.child_id = m.id
JOIN media_types AS mt ON mt.id = m.media_type_id
LEFT JOIN media_types AS mtc ON mtc.id = m.configuration_media_type_id
LEFT JOIN media_types AS at ON at.id = m.artifact_media_type_id
WHERE
m.top_level_namespace_id = $1
AND mr.repository_id = $2
AND mr.parent_id = $3`
rows, err := s.db.QueryContext(ctx, q, m.NamespaceID, m.RepositoryID, m.ID)
if err != nil {
return nil, fmt.Errorf("finding referenced manifests: %w", err)
}
return scanFullManifests(rows)
}
// Create saves a new Manifest.
func (s *manifestStore) Create(ctx context.Context, m *models.Manifest) error {
defer metrics.InstrumentQuery("manifest_create")()
q := `INSERT INTO manifests (top_level_namespace_id, repository_id, total_size, schema_version,
media_type_id, artifact_media_type_id, digest, payload, configuration_media_type_id,
configuration_blob_digest, configuration_payload, non_conformant, non_distributable_layers, subject_id)
VALUES ($1, $2, $3, $4, $5, $6, decode($7, 'hex'), $8, $9, decode($10, 'hex'), $11, $12, $13, $14)
RETURNING
id, created_at`
dgst, err := NewDigest(m.Digest)
if err != nil {
return err
}
mtStore := NewMediaTypeStore(s.db)
mediaTypeID, err := mtStore.SafeFindOrCreateID(ctx, m.MediaType)
if err != nil {
return fmt.Errorf("mapping manifest media type: %w", err)
}
var artifactTypeID sql.NullInt64
if m.ArtifactType.Valid {
aid, err := mtStore.SafeFindOrCreateID(ctx, m.ArtifactType.String)
if err != nil {
return fmt.Errorf("mapping manifest artifact type: %w", err)
}
artifactTypeID.Valid = true
artifactTypeID.Int64 = int64(aid) // nolint: gosec // media type id will always be non-negative
}
var configDgst sql.NullString
var configMediaTypeID sql.NullInt32
var configPayload *models.Payload
if m.Configuration != nil {
dgst, err := NewDigest(m.Configuration.Digest)
if err != nil {
return err
}
configDgst.Valid = true
configDgst.String = dgst.String()
id, err := mtStore.SafeFindOrCreateID(ctx, m.Configuration.MediaType)
if err != nil {
return fmt.Errorf("mapping config media type: %w", err)
}
configMediaTypeID.Valid = true
// nolint: gosec // underlying table uses `smallint` (2 bytes)
configMediaTypeID.Int32 = int32(id)
configPayload = &m.Configuration.Payload
}
row := s.db.QueryRowContext(ctx, q, m.NamespaceID, m.RepositoryID, m.TotalSize, m.SchemaVersion,
mediaTypeID, artifactTypeID, dgst, m.Payload, configMediaTypeID, configDgst, configPayload,
m.NonConformant, m.NonDistributableLayers, m.SubjectID)
if err := row.Scan(&m.ID, &m.CreatedAt); err != nil {
return fmt.Errorf("creating manifest: %w", err)
}
return nil
}
// CreateOrFind attempts to create a manifest. If the manifest already exists (same digest in the scope of a given repository)
// that record is loaded from the database into m. This is similar to a repositoryStore.FindManifestByDigest followed by
// a Create, but without being prone to race conditions on write operations between the corresponding read (FindManifestByDigest)
// and write (Create) operations.
// Separate Find* and Create method calls should be preferred to this when race conditions are not a concern.
func (s *manifestStore) CreateOrFind(ctx context.Context, m *models.Manifest) error {
defer metrics.InstrumentQuery("manifest_create_or_find")()
q := `INSERT INTO manifests (top_level_namespace_id, repository_id, total_size, schema_version,
media_type_id, artifact_media_type_id, digest, payload, configuration_media_type_id,
configuration_blob_digest, configuration_payload, non_conformant, non_distributable_layers, subject_id)
VALUES ($1, $2, $3, $4, $5, $6, decode($7, 'hex'), $8, $9, decode($10, 'hex'), $11, $12, $13, $14)
ON CONFLICT (top_level_namespace_id, repository_id, digest) DO NOTHING
RETURNING
id, created_at`
dgst, err := NewDigest(m.Digest)
if err != nil {
return err
}
mtStore := NewMediaTypeStore(s.db)
mediaTypeID, err := mtStore.SafeFindOrCreateID(ctx, m.MediaType)
if err != nil {
return fmt.Errorf("mapping manifest media type: %w", err)
}
var artifactTypeID sql.NullInt64
if m.ArtifactType.Valid {
aid, err := mtStore.SafeFindOrCreateID(ctx, m.ArtifactType.String)
if err != nil {
return fmt.Errorf("mapping manifest artifact type: %w", err)
}
artifactTypeID.Valid = true
artifactTypeID.Int64 = int64(aid) // nolint: gosec // media type id will always be non-negative
}
var configDgst sql.NullString
var configMediaTypeID sql.NullInt32
var configPayload *models.Payload
if m.Configuration != nil {
dgst, err := NewDigest(m.Configuration.Digest)
if err != nil {
return err
}
configDgst.Valid = true
configDgst.String = dgst.String()
id, err := mtStore.SafeFindOrCreateID(ctx, m.Configuration.MediaType)
if err != nil {
return fmt.Errorf("mapping config media type: %w", err)
}
configMediaTypeID.Valid = true
// nolint: gosec // underlying table uses `smallint` (2 bytes)
configMediaTypeID.Int32 = int32(id)
configPayload = &m.Configuration.Payload
}
row := s.db.QueryRowContext(ctx, q, m.NamespaceID, m.RepositoryID, m.TotalSize, m.SchemaVersion,
mediaTypeID, artifactTypeID, dgst, m.Payload, configMediaTypeID, configDgst, configPayload,
m.NonConformant, m.NonDistributableLayers, m.SubjectID)
if err := row.Scan(&m.ID, &m.CreatedAt); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("creating manifest: %w", err)
}
tmp, err := findManifestByDigest(ctx, s.db, m.NamespaceID, m.RepositoryID, dgst)
if err != nil {
return err
}
*m = *tmp
}
return nil
}
// AssociateManifest associates a manifest with a manifest list. It does nothing if already associated.
func (s *manifestStore) AssociateManifest(ctx context.Context, ml, m *models.Manifest) error {
defer metrics.InstrumentQuery("manifest_associate_manifest")()
if ml.ID == m.ID {
return fmt.Errorf("cannot associate a manifest with itself")
}
q := `INSERT INTO manifest_references (top_level_namespace_id, repository_id, parent_id, child_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (top_level_namespace_id, repository_id, parent_id, child_id)
DO NOTHING`
if _, err := s.db.ExecContext(ctx, q, ml.NamespaceID, ml.RepositoryID, ml.ID, m.ID); err != nil {
var pgErr *pgconn.PgError
// this can happen if the child manifest is deleted by the online GC while attempting to create the list
if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.ForeignKeyViolation {
return ErrRefManifestNotFound
}
return fmt.Errorf("associating manifest: %w", err)
}
return nil
}
// DissociateManifest dissociates a manifest and a manifest list. It does nothing if not associated.
func (s *manifestStore) DissociateManifest(ctx context.Context, ml, m *models.Manifest) error {
defer metrics.InstrumentQuery("manifest_dissociate_manifest")()
q := `DELETE FROM manifest_references
WHERE top_level_namespace_id = $1
AND repository_id = $2
AND parent_id = $3
AND child_id = $4`
res, err := s.db.ExecContext(ctx, q, ml.NamespaceID, ml.RepositoryID, ml.ID, m.ID)
if err != nil {
return fmt.Errorf("dissociating manifest: %w", err)
}
if _, err := res.RowsAffected(); err != nil {
return fmt.Errorf("dissociating manifest: %w", err)
}
return nil
}
// AssociateLayerBlob associates a layer blob and a manifest. It does nothing if already associated.
func (s *manifestStore) AssociateLayerBlob(ctx context.Context, m *models.Manifest, b *models.Blob) error {
defer metrics.InstrumentQuery("manifest_associate_layer_blob")()
q := `INSERT INTO layers (top_level_namespace_id, repository_id, manifest_id, digest, media_type_id, size)
VALUES ($1, $2, $3, decode($4, 'hex'), $5, $6)
ON CONFLICT (top_level_namespace_id, repository_id, manifest_id, digest)
DO NOTHING`
dgst, err := NewDigest(b.Digest)
if err != nil {
return err
}
mtStore := NewMediaTypeStore(s.db)
mediaTypeID, err := mtStore.SafeFindOrCreateID(ctx, b.MediaType)
if err != nil {
return err
}
if _, err := s.db.ExecContext(ctx, q, m.NamespaceID, m.RepositoryID, m.ID, dgst, mediaTypeID, b.Size); err != nil {
return fmt.Errorf("associating layer blob: %w", err)
}
return nil
}
// DissociateLayerBlob dissociates a layer blob and a manifest. It does nothing if not associated.
func (s *manifestStore) DissociateLayerBlob(ctx context.Context, m *models.Manifest, b *models.Blob) error {
defer metrics.InstrumentQuery("manifest_dissociate_layer_blob")()
q := `DELETE FROM layers
WHERE top_level_namespace_id = $1
AND repository_id = $2
AND manifest_id = $3
AND digest = decode($4, 'hex')`
dgst, err := NewDigest(b.Digest)
if err != nil {
return err
}
res, err := s.db.ExecContext(ctx, q, m.NamespaceID, m.RepositoryID, m.ID, dgst)
if err != nil {
return fmt.Errorf("dissociating layer blob: %w", err)
}
if _, err := res.RowsAffected(); err != nil {
return fmt.Errorf("dissociating layer blob: %w", err)
}
return nil
}
// Delete deletes a manifest. The digest is returned to denote whether the manifest was deleted or not. This avoids the
// need for a separate preceding `SELECT` to find if it exists. A manifest cannot be deleted if it is referenced by a
// manifest list.
func (s *manifestStore) Delete(ctx context.Context, namespaceID, repositoryID, id int64) (*digest.Digest, error) {
defer metrics.InstrumentQuery("manifest_delete")()
q := `DELETE FROM manifests
WHERE top_level_namespace_id = $1
AND repository_id = $2
AND id = $3
RETURNING
encode(digest, 'hex')`
var tmp Digest
row := s.db.QueryRowContext(ctx, q, namespaceID, repositoryID, id)
if err := row.Scan(&tmp); err != nil {
var pgErr *pgconn.PgError
switch {
case errors.Is(err, sql.ErrNoRows):
return nil, nil
case errors.As(err, &pgErr) && pgErr.Code == pgerrcode.ForeignKeyViolation && pgErr.TableName == "manifest_references":
return nil, fmt.Errorf("deleting manifest ID %d: %w: %w", id, ErrManifestReferencedInList, pgErr)
default:
return nil, fmt.Errorf("deleting manifest: %w", err)
}
}
dgst, err := tmp.Parse()
if err != nil {
return nil, err
}
return &dgst, nil
}
// findManifestByDigest finds a manifest by digest, repository and top level namespace ID
func findManifestByDigest(ctx context.Context, db Queryer, namespaceID, repositoryID int64, dgst Digest) (*models.Manifest, error) {
q := `SELECT
m.id,
m.top_level_namespace_id,
m.repository_id,
m.total_size,
m.schema_version,
mt.media_type,
at.media_type as artifact_type,
encode(m.digest, 'hex') as digest,
m.payload,
mtc.media_type as configuration_media_type,
encode(m.configuration_blob_digest, 'hex') as configuration_blob_digest,
m.configuration_payload,
m.non_conformant,
m.non_distributable_layers,
m.subject_id,
m.created_at
FROM
manifests AS m
JOIN media_types AS mt ON mt.id = m.media_type_id
LEFT JOIN media_types AS mtc ON mtc.id = m.configuration_media_type_id
LEFT JOIN media_types AS at ON at.id = m.artifact_media_type_id
WHERE
m.top_level_namespace_id = $1
AND m.repository_id = $2
AND m.digest = decode($3, 'hex')`
row := db.QueryRowContext(ctx, q, namespaceID, repositoryID, dgst)
return scanFullManifest(row)
}