registry/datastore/blob.go (183 lines of code) (raw):
//go:generate mockgen -package mocks -destination mocks/blob.go . BlobStore
package datastore
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/docker/distribution/registry/datastore/metrics"
"github.com/docker/distribution/registry/datastore/models"
"github.com/opencontainers/go-digest"
)
// BlobReader is the interface that defines read operations for a blob store.
type BlobReader interface {
FindAll(ctx context.Context) (models.Blobs, error)
FindByDigest(ctx context.Context, d digest.Digest) (*models.Blob, error)
Count(ctx context.Context) (int, error)
}
// BlobWriter is the interface that defines write operations for a blob store.
type BlobWriter interface {
Create(ctx context.Context, b *models.Blob) error
CreateOrFind(ctx context.Context, b *models.Blob) error
Delete(ctx context.Context, d digest.Digest) error
}
// BlobStore is the interface that a blob store should conform to.
type BlobStore interface {
BlobReader
BlobWriter
}
// blobStore is the concrete implementation of a BlobStore.
type blobStore struct {
db Queryer
}
// NewBlobStore builds a new blobStore.
func NewBlobStore(db Queryer) BlobStore {
return &blobStore{db: db}
}
func scanFullBlob(row *Row) (*models.Blob, error) {
var dgst Digest
b := new(models.Blob)
if err := row.Scan(&b.MediaType, &dgst, &b.Size, &b.CreatedAt); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("scanning blob: %w", err)
}
return nil, nil
}
d, err := dgst.Parse()
if err != nil {
return nil, err
}
b.Digest = d
return b, nil
}
func scanFullBlobs(rows *sql.Rows) (models.Blobs, error) {
bb := make(models.Blobs, 0)
defer rows.Close()
for rows.Next() {
var dgst Digest
b := new(models.Blob)
err := rows.Scan(&b.MediaType, &dgst, &b.Size, &b.CreatedAt)
if err != nil {
return nil, fmt.Errorf("scanning blob: %w", err)
}
d, err := dgst.Parse()
if err != nil {
return nil, err
}
b.Digest = d
bb = append(bb, b)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("scanning blobs: %w", err)
}
return bb, nil
}
// FindByDigest finds a blob by digest.
func (s *blobStore) FindByDigest(ctx context.Context, d digest.Digest) (*models.Blob, error) {
defer metrics.InstrumentQuery("blob_find_by_digest")()
q := `SELECT
mt.media_type,
encode(b.digest, 'hex') as digest,
b.size,
b.created_at
FROM
blobs AS b
JOIN media_types AS mt ON b.media_type_id = mt.id
WHERE
b.digest = decode($1, 'hex')`
dgst, err := NewDigest(d)
if err != nil {
return nil, err
}
row := s.db.QueryRowContext(ctx, q, dgst)
return scanFullBlob(row)
}
// FindAll finds all blobs.
func (s *blobStore) FindAll(ctx context.Context) (models.Blobs, error) {
defer metrics.InstrumentQuery("blob_find_all")()
q := `SELECT
mt.media_type,
encode(b.digest, 'hex') as digest,
b.size,
b.created_at
FROM
blobs AS b
JOIN media_types AS mt ON b.media_type_id = mt.id`
rows, err := s.db.QueryContext(ctx, q)
if err != nil {
return nil, fmt.Errorf("finding blobs: %w", err)
}
return scanFullBlobs(rows)
}
// Count counts all blobs.
func (s *blobStore) Count(ctx context.Context) (int, error) {
defer metrics.InstrumentQuery("blob_count")()
q := "SELECT COUNT(*) FROM blobs"
var count int
if err := s.db.QueryRowContext(ctx, q).Scan(&count); err != nil {
return count, fmt.Errorf("counting blobs: %w", err)
}
return count, nil
}
// Create saves a new blob.
func (s *blobStore) Create(ctx context.Context, b *models.Blob) error {
defer metrics.InstrumentQuery("blob_create")()
q := `INSERT INTO blobs (digest, media_type_id, size)
VALUES (decode($1, 'hex'), $2, $3)
RETURNING
created_at`
dgst, err := NewDigest(b.Digest)
if err != nil {
return err
}
mtStore := NewMediaTypeStore(s.db)
mediaTypeID, err := mtStore.FindID(ctx, b.MediaType)
if err != nil {
return err
}
row := s.db.QueryRowContext(ctx, q, dgst, mediaTypeID, b.Size)
if err := row.Scan(&b.CreatedAt); err != nil {
return fmt.Errorf("creating blob: %w", err)
}
return nil
}
// CreateOrFind attempts to create a blob. If the blob already exists (same digest_hex) that record is loaded from the
// database into b. This is similar to a FindByDigest followed by a Create, but without being prone to race conditions
// on write operations between the corresponding read (FindByDigest) and write (Create) operations. Separate Find* and
// Create method calls should be preferred to this when race conditions are not a concern.
func (s *blobStore) CreateOrFind(ctx context.Context, b *models.Blob) error {
defer metrics.InstrumentQuery("blob_create_or_find")()
q := `INSERT INTO blobs (digest, media_type_id, size)
VALUES (decode($1, 'hex'), $2, $3)
ON CONFLICT (digest)
DO NOTHING
RETURNING
created_at`
dgst, err := NewDigest(b.Digest)
if err != nil {
return err
}
mtStore := NewMediaTypeStore(s.db)
mediaTypeID, err := mtStore.FindID(ctx, b.MediaType)
if err != nil {
return err
}
row := s.db.QueryRowContext(ctx, q, dgst, mediaTypeID, b.Size)
if err := row.Scan(&b.CreatedAt); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("creating blob: %w", err)
}
// if the result set has no rows, then the blob already exists
tmp, err := s.FindByDigest(ctx, b.Digest)
if err != nil {
return err
}
*b = *tmp
}
return nil
}
// Delete deletes a blob.
func (s *blobStore) Delete(ctx context.Context, d digest.Digest) error {
defer metrics.InstrumentQuery("blob_delete")()
q := "DELETE FROM blobs WHERE digest = decode($1, 'hex')"
dgst, err := NewDigest(d)
if err != nil {
return err
}
res, err := s.db.ExecContext(ctx, q, dgst)
if err != nil {
return fmt.Errorf("deleting blob: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("deleting blob: %w", err)
}
if n == 0 {
return ErrNotFound
}
return nil
}