registry/datastore/gcblobtask.go (189 lines of code) (raw):
//go:generate mockgen -package mocks -destination mocks/gcblobtask.go . GCBlobTaskStore
package datastore
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/docker/distribution/registry/datastore/metrics"
"github.com/docker/distribution/registry/datastore/models"
)
type GCBlobTaskStore interface {
FindAll(ctx context.Context) ([]*models.GCBlobTask, error)
Count(ctx context.Context) (int, error)
Next(ctx context.Context) (*models.GCBlobTask, error)
Postpone(ctx context.Context, b *models.GCBlobTask, d time.Duration) error
IsDangling(ctx context.Context, b *models.GCBlobTask) (bool, error)
Delete(ctx context.Context, b *models.GCBlobTask) error
}
type gcBlobTaskStore struct {
db Queryer
}
// NewGCBlobTaskStore builds a new gcBlobTaskStore.
func NewGCBlobTaskStore(db Queryer) GCBlobTaskStore {
return &gcBlobTaskStore{db: db}
}
func scanFullGCBlobTasks(rows *sql.Rows) ([]*models.GCBlobTask, error) {
rr := make([]*models.GCBlobTask, 0)
defer rows.Close()
for rows.Next() {
var dgst Digest
r := new(models.GCBlobTask)
err := rows.Scan(&r.ReviewAfter, &r.ReviewCount, &dgst, &r.CreatedAt, &r.Event)
if err != nil {
return nil, fmt.Errorf("scanning GC blob task: %w", err)
}
d, err := dgst.Parse()
if err != nil {
return nil, err
}
r.Digest = d
rr = append(rr, r)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("scanning GC blob tasks: %w", err)
}
return rr, nil
}
func scanFullGCBlobTask(row *Row) (*models.GCBlobTask, error) {
b := new(models.GCBlobTask)
var dgst Digest
err := row.Scan(&b.ReviewAfter, &b.ReviewCount, &dgst, &b.CreatedAt, &b.Event)
if err != nil {
return nil, fmt.Errorf("scanning GC blob task: %w", err)
}
d, err := dgst.Parse()
if err != nil {
return nil, err
}
b.Digest = d
return b, nil
}
// FindAll finds all GC blob tasks.
func (s *gcBlobTaskStore) FindAll(ctx context.Context) ([]*models.GCBlobTask, error) {
defer metrics.InstrumentQuery("gc_blob_task_find_all")()
q := `SELECT
review_after,
review_count,
encode(digest, 'hex') as digest,
created_at,
event
FROM
gc_blob_review_queue`
rows, err := s.db.QueryContext(ctx, q)
if err != nil {
return nil, fmt.Errorf("finding GC blob tasks: %w", err)
}
return scanFullGCBlobTasks(rows)
}
// Count counts all GC blob tasks.
func (s *gcBlobTaskStore) Count(ctx context.Context) (int, error) {
defer metrics.InstrumentQuery("gc_blob_task_count")()
q := "SELECT COUNT(*) FROM gc_blob_review_queue"
var count int
if err := s.db.QueryRowContext(ctx, q).Scan(&count); err != nil {
return count, fmt.Errorf("counting GC blob tasks: %w", err)
}
return count, nil
}
// Next reads and locks the blob review queue row with the oldest review_after before the current date. In case of a
// draw (multiple unlocked records with the same review_after) the returned row is the one that was first inserted.
// This method may be called safely from multiple concurrent goroutines or processes. A `SELECT FOR UPDATE` is used to
// ensure that callers don't get the same record. The operation does not block, and no error is returned if there are
// no rows or none is available (i.e., all locked by other processes). A `nil` record is returned in this situation.
func (s *gcBlobTaskStore) Next(ctx context.Context) (*models.GCBlobTask, error) {
defer metrics.InstrumentQuery("gc_blob_task_next")()
q := `SELECT
review_after,
review_count,
encode(digest, 'hex') AS digest,
created_at,
event
FROM
gc_blob_review_queue
WHERE
review_after < NOW()
ORDER BY
review_after
FOR UPDATE
SKIP LOCKED
LIMIT 1`
row := s.db.QueryRowContext(ctx, q)
b, err := scanFullGCBlobTask(row)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("fetching next GC blob task: %w", err)
}
return b, nil
}
// Postpone moves the review_after of a blob task forward by a given amount of time. The review_count is automatically
// incremented.
func (s *gcBlobTaskStore) Postpone(ctx context.Context, b *models.GCBlobTask, d time.Duration) error {
defer metrics.InstrumentQuery("gc_blob_task_postpone")()
q := `UPDATE
gc_blob_review_queue
SET
review_after = $1,
review_count = $2
WHERE
digest = decode($3, 'hex')`
ra := b.ReviewAfter.Add(d)
rc := b.ReviewCount + 1
dgst, err := NewDigest(b.Digest)
if err != nil {
return err
}
res, err := s.db.ExecContext(ctx, q, ra, rc, dgst)
if err != nil {
return fmt.Errorf("postponing GC blob task: %w", err)
}
count, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("postponing GC blob task: %w", err)
}
if count == 0 {
return fmt.Errorf("GC blob task not found")
}
b.ReviewAfter = ra
b.ReviewCount = rc
return nil
}
// Delete deletes a blob task from the blob review queue.
func (s *gcBlobTaskStore) Delete(ctx context.Context, b *models.GCBlobTask) error {
defer metrics.InstrumentQuery("gc_blob_task_delete")()
q := "DELETE FROM gc_blob_review_queue WHERE digest = decode($1, 'hex')"
dgst, err := NewDigest(b.Digest)
if err != nil {
return err
}
res, err := s.db.ExecContext(ctx, q, dgst)
if err != nil {
return fmt.Errorf("deleting GC blob task: %w", err)
}
count, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("deleting GC blob task: %w", err)
}
if count == 0 {
return fmt.Errorf("GC blob task not found")
}
return nil
}
// IsDangling determines if the blob referenced by the GC blob task is eligible for deletion or not.
func (s *gcBlobTaskStore) IsDangling(ctx context.Context, b *models.GCBlobTask) (bool, error) {
defer metrics.InstrumentQuery("gc_blob_task_is_dangling")()
q := `SELECT
EXISTS (
SELECT
1
FROM
gc_blobs_configurations
WHERE
digest = decode($1, 'hex')
UNION
SELECT
1
FROM
gc_blobs_layers
WHERE
digest = decode($1, 'hex'))`
dgst, err := NewDigest(b.Digest)
if err != nil {
return false, err
}
var referenced bool
if err := s.db.QueryRowContext(ctx, q, dgst).Scan(&referenced); err != nil {
return false, fmt.Errorf("determining blob eligibily for deletion: %w", err)
}
return !referenced, nil
}