registry/datastore/gcmanifesttask.go (268 lines of code) (raw):

//go:generate mockgen -package mocks -destination mocks/gcmanifesttask.go . GCManifestTaskStore package datastore import ( "context" "database/sql" "errors" "fmt" "strconv" "strings" "time" "github.com/docker/distribution/registry/datastore/metrics" "github.com/docker/distribution/registry/datastore/models" ) type GCManifestTaskStore interface { FindAll(ctx context.Context) ([]*models.GCManifestTask, error) FindAndLock(ctx context.Context, namespaceID, repositoryID, manifestID int64) (*models.GCManifestTask, error) FindAndLockBefore(ctx context.Context, namespaceID, repositoryID, manifestID int64, date time.Time) (*models.GCManifestTask, error) FindAndLockNBefore(ctx context.Context, namespaceID, repositoryID int64, manifestIDs []int64, date time.Time) ([]*models.GCManifestTask, error) Count(ctx context.Context) (int, error) Next(ctx context.Context) (*models.GCManifestTask, error) Postpone(ctx context.Context, b *models.GCManifestTask, d time.Duration) error IsDangling(ctx context.Context, b *models.GCManifestTask) (bool, error) Delete(ctx context.Context, b *models.GCManifestTask) error } type gcManifestTaskStore struct { db Queryer } // NewGCManifestTaskStore builds a new gcManifestTaskStore. func NewGCManifestTaskStore(db Queryer) GCManifestTaskStore { return &gcManifestTaskStore{db: db} } func scanFullGCManifestTasks(rows *sql.Rows) ([]*models.GCManifestTask, error) { rr := make([]*models.GCManifestTask, 0) defer rows.Close() for rows.Next() { r := new(models.GCManifestTask) err := rows.Scan(&r.NamespaceID, &r.RepositoryID, &r.ManifestID, &r.ReviewAfter, &r.ReviewCount, &r.CreatedAt, &r.Event) if err != nil { return nil, fmt.Errorf("scanning GC manifest task: %w", err) } rr = append(rr, r) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("scanning GC manifest tasks: %w", err) } return rr, nil } func scanFullGCManifestTask(row *Row) (*models.GCManifestTask, error) { r := new(models.GCManifestTask) if err := row.Scan(&r.NamespaceID, &r.RepositoryID, &r.ManifestID, &r.ReviewAfter, &r.ReviewCount, &r.CreatedAt, &r.Event); err != nil { if !errors.Is(err, sql.ErrNoRows) { return nil, fmt.Errorf("scanning GC manifest task: %w", err) } return nil, nil } return r, nil } // FindAll finds all GC manifest tasks. func (s *gcManifestTaskStore) FindAll(ctx context.Context) ([]*models.GCManifestTask, error) { defer metrics.InstrumentQuery("gc_manifest_task_find_all")() q := `SELECT top_level_namespace_id, repository_id, manifest_id, review_after, review_count, created_at, event FROM gc_manifest_review_queue` rows, err := s.db.QueryContext(ctx, q) if err != nil { return nil, fmt.Errorf("finding GC manifest tasks: %w", err) } return scanFullGCManifestTasks(rows) } // FindAndLock finds a GC manifest task and locks it against writes. This query blocks if the row exists but is already // locked by another process. func (s *gcManifestTaskStore) FindAndLock(ctx context.Context, namespaceID, repositoryID, manifestID int64) (*models.GCManifestTask, error) { defer metrics.InstrumentQuery("gc_manifest_task_find_and_lock")() q := `SELECT top_level_namespace_id, repository_id, manifest_id, review_after, review_count, created_at, event FROM gc_manifest_review_queue WHERE top_level_namespace_id = $1 AND repository_id = $2 AND manifest_id = $3 FOR UPDATE` row := s.db.QueryRowContext(ctx, q, namespaceID, repositoryID, manifestID) return scanFullGCManifestTask(row) } // FindAndLockBefore finds a GC manifest task scheduled for review before date and locks it against writes. This query // blocks if the row exists but is already locked by another process. func (s *gcManifestTaskStore) FindAndLockBefore(ctx context.Context, namespaceID, repositoryID, manifestID int64, date time.Time) (*models.GCManifestTask, error) { defer metrics.InstrumentQuery("gc_manifest_task_find_and_lock_before")() q := `SELECT top_level_namespace_id, repository_id, manifest_id, review_after, review_count, created_at, event FROM gc_manifest_review_queue WHERE top_level_namespace_id = $1 AND repository_id = $2 AND manifest_id = $3 AND review_after < $4 FOR UPDATE` row := s.db.QueryRowContext(ctx, q, namespaceID, repositoryID, manifestID, date) return scanFullGCManifestTask(row) } // FindAndLockNBefore finds multiple GC manifest tasks scheduled for review before date and locks them against writes. // This query blocks if any row exists but is already locked by another process. func (s *gcManifestTaskStore) FindAndLockNBefore(ctx context.Context, namespaceID, repositoryID int64, manifestIDs []int64, date time.Time) ([]*models.GCManifestTask, error) { defer metrics.InstrumentQuery("gc_manifest_task_find_and_lock_n_before")() q := `SELECT top_level_namespace_id, repository_id, manifest_id, review_after, review_count, created_at, event FROM gc_manifest_review_queue WHERE top_level_namespace_id = $1 AND repository_id = $2 AND manifest_id IN (%s) AND review_after < $3 ORDER BY top_level_namespace_id, repository_id, manifest_id FOR UPDATE` ids := make([]string, 0, len(manifestIDs)) for _, id := range manifestIDs { ids = append(ids, strconv.FormatInt(id, 10)) } q = fmt.Sprintf(q, strings.Join(ids, ",")) rows, err := s.db.QueryContext(ctx, q, namespaceID, repositoryID, date) if err != nil { return nil, fmt.Errorf("finding and locking GC manifest tasks: %w", err) } return scanFullGCManifestTasks(rows) } // Count counts all GC manifest tasks. func (s *gcManifestTaskStore) Count(ctx context.Context) (int, error) { defer metrics.InstrumentQuery("gc_manifest_task_count")() q := "SELECT COUNT(*) FROM gc_manifest_review_queue" var count int if err := s.db.QueryRowContext(ctx, q).Scan(&count); err != nil { return count, fmt.Errorf("counting GC manifest tasks: %w", err) } return count, nil } // Next reads and locks the manifest review queue row with the oldest review_after before the current date. In case of a // draw (multiple unlocked records with the same review_after) the returned row is the one that was first inserted. // This method may be called safely from multiple concurrent goroutines or processes. A `SELECT FOR UPDATE` is used to // ensure that callers don't get the same record. The operation does not block, and no error is returned if there are // no rows or none is available (i.e., all locked by other processes). A `nil` record is returned in this situation. func (s *gcManifestTaskStore) Next(ctx context.Context) (*models.GCManifestTask, error) { defer metrics.InstrumentQuery("gc_manifest_task_next")() q := `SELECT top_level_namespace_id, repository_id, manifest_id, review_after, review_count, created_at, event FROM gc_manifest_review_queue WHERE review_after < NOW() ORDER BY review_after FOR UPDATE SKIP LOCKED LIMIT 1` row := s.db.QueryRowContext(ctx, q) b, err := scanFullGCManifestTask(row) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil } return nil, fmt.Errorf("fetching next GC manifest task: %w", err) } return b, nil } // Postpone moves the review_after of a manifest task forward by a given amount of time. The review_count is // automatically incremented. func (s *gcManifestTaskStore) Postpone(ctx context.Context, m *models.GCManifestTask, d time.Duration) error { defer metrics.InstrumentQuery("gc_manifest_task_postpone")() q := `UPDATE gc_manifest_review_queue SET review_after = $1, review_count = $2 WHERE top_level_namespace_id = $3 AND repository_id = $4 AND manifest_id = $5` ra := m.ReviewAfter.Add(d) rc := m.ReviewCount + 1 res, err := s.db.ExecContext(ctx, q, ra, rc, m.NamespaceID, m.RepositoryID, m.ManifestID) if err != nil { return fmt.Errorf("postponing GC manifest task: %w", err) } count, err := res.RowsAffected() if err != nil { return fmt.Errorf("postponing GC manifest task: %w", err) } if count == 0 { return fmt.Errorf("GC manifest task not found") } m.ReviewAfter = ra m.ReviewCount = rc return nil } // IsDangling determines if the manifest referenced by the GC manifest task is eligible for deletion or not. func (s *gcManifestTaskStore) IsDangling(ctx context.Context, m *models.GCManifestTask) (bool, error) { defer metrics.InstrumentQuery("gc_manifest_task_is_dangling")() q := `SELECT EXISTS ( SELECT 1 FROM tags WHERE top_level_namespace_id = $1 AND repository_id = $2 AND manifest_id = $3 UNION ALL SELECT 1 FROM manifest_references WHERE top_level_namespace_id = $1 AND repository_id = $2 AND child_id = $3 UNION ALL SELECT 1 FROM manifests WHERE top_level_namespace_id = $1 AND repository_id = $2 AND subject_id = $3)` var referenced bool if err := s.db.QueryRowContext(ctx, q, m.NamespaceID, m.RepositoryID, m.ManifestID).Scan(&referenced); err != nil { return false, fmt.Errorf("determining manifest eligibily for deletion: %w", err) } return !referenced, nil } // Delete deletes a manifest task from the manifest review queue. func (s *gcManifestTaskStore) Delete(ctx context.Context, m *models.GCManifestTask) error { defer metrics.InstrumentQuery("gc_manifest_task_delete")() q := `DELETE FROM gc_manifest_review_queue WHERE top_level_namespace_id = $1 AND repository_id = $2 AND manifest_id = $3` res, err := s.db.ExecContext(ctx, q, m.NamespaceID, m.RepositoryID, m.ManifestID) if err != nil { return fmt.Errorf("deleting GC manifest task: %w", err) } count, err := res.RowsAffected() if err != nil { return fmt.Errorf("deleting GC manifest task: %w", err) } if count == 0 { return fmt.Errorf("GC manifest task not found") } return nil }