registry/gc/worker/blobworker.go (193 lines of code) (raw):
package worker
import (
"context"
"errors"
"fmt"
"time"
"github.com/docker/distribution/log"
"github.com/docker/distribution/registry/datastore"
"github.com/docker/distribution/registry/datastore/models"
"github.com/docker/distribution/registry/gc/internal/metrics"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver"
"github.com/hashicorp/go-multierror"
)
const defaultStorageTimeout = 5 * time.Second
var (
// for test purposes (mocking)
blobTaskStoreConstructor = datastore.NewGCBlobTaskStore
blobStoreConstructor = datastore.NewBlobStore
)
var _ Worker = (*BlobWorker)(nil)
// BlobWorker is the online GC worker responsible for processing tasks related with blobs. It consumes tasks from the
// blob review queue, identifies if the corresponding blob is eligible for deletion, and if so, deletes it from storage
// and database backends, in this order.
type BlobWorker struct {
*baseWorker
vacuum *storage.Vacuum
storageTimeout time.Duration
}
// BlobWorkerOption provides functional options for NewBlobWorker.
type BlobWorkerOption func(*BlobWorker)
// WithBlobLogger sets the logger.
func WithBlobLogger(l log.Logger) BlobWorkerOption {
return func(w *BlobWorker) {
w.logger = l
}
}
// WithBlobTxTimeout sets the database transaction timeout for each run. Defaults to 10 seconds.
func WithBlobTxTimeout(d time.Duration) BlobWorkerOption {
return func(w *BlobWorker) {
w.txTimeout = d
}
}
// WithBlobStorageTimeout sets the timeout for storage operations. This is currently used to limit the duration of
// requests to delete dangling blobs on the storage backend. Defaults to 5 seconds.
func WithBlobStorageTimeout(d time.Duration) BlobWorkerOption {
return func(w *BlobWorker) {
w.storageTimeout = d
}
}
func (w *BlobWorker) applyDefaults() {
w.baseWorker.applyDefaults()
if w.storageTimeout == 0 {
w.storageTimeout = defaultStorageTimeout
}
}
// NewBlobWorker creates a new BlobWorker.
func NewBlobWorker(db datastore.Handler, storageDeleter driver.StorageDeleter, opts ...BlobWorkerOption) *BlobWorker {
w := &BlobWorker{
baseWorker: &baseWorker{db: db},
vacuum: storage.NewVacuum(storageDeleter),
}
w.name = "registry.gc.worker.BlobWorker"
w.applyDefaults()
for _, opt := range opts {
opt(w)
}
w.logger = w.logger.WithFields(log.Fields{componentKey: w.name})
return w
}
// Run implements Worker.
func (w *BlobWorker) Run(ctx context.Context) RunResult {
return w.run(ctx, w)
}
func (w *BlobWorker) processTask(ctx context.Context) RunResult {
l := log.GetLogger(log.WithContext(ctx))
// don't let the database transaction run for longer than w.txTimeout
ctx, cancel := context.WithDeadline(ctx, systemClock.Now().Add(w.txTimeout))
defer cancel()
var res RunResult
tx, err := w.db.BeginTx(ctx, nil)
if err != nil {
res.Err = fmt.Errorf("creating database transaction: %w", err)
return res
}
defer w.rollbackOnExit(ctx, tx)
bts := blobTaskStoreConstructor(tx)
t, err := bts.Next(ctx)
if err != nil {
res.Err = err
return res
}
if t == nil {
l.Info("no task available")
if err := tx.Commit(); err != nil {
res.Err = fmt.Errorf("committing database transaction: %w", err)
return res
}
return res
}
res.Found = true
res.Event = t.Event
l.WithFields(log.Fields{
"review_after": t.ReviewAfter.UTC(),
"review_count": t.ReviewCount,
"digest": t.Digest,
"created_at": t.CreatedAt.UTC(),
"event": t.Event,
}).Info("processing task")
dangling, err := bts.IsDangling(ctx, t)
if err != nil {
switch {
case errors.Is(err, context.DeadlineExceeded):
// The transaction duration exceeded w.txTimeout and therefore the connection was closed, just return
// because the task was unlocked on close and therefore we can't postpone the next review
default:
// we don't know how to react here, so just try to postpone the task review and return
if innerErr := w.postponeTaskAndCommit(ctx, tx, t); innerErr != nil {
err = multierror.Append(err, innerErr)
}
}
res.Err = err
return res
}
res.Dangling = dangling
if dangling {
l.Info("the blob is dangling")
if err := w.deleteBlob(ctx, tx, t); err != nil {
res.Err = err
return res
}
} else {
l.Info("the blob is not dangling")
}
l.Info("deleting task")
if err := bts.Delete(ctx, t); err != nil {
res.Err = err
return res
}
if err := tx.Commit(); err != nil {
res.Err = fmt.Errorf("committing database transaction: %w", err)
return res
}
return res
}
func (w *BlobWorker) deleteBlob(ctx context.Context, tx datastore.Transactor, t *models.GCBlobTask) error {
l := log.GetLogger(log.WithContext(ctx))
bs := blobStoreConstructor(tx)
// delete blob from storage
ctx2, cancel := context.WithDeadline(ctx, systemClock.Now().Add(w.storageTimeout))
defer cancel()
report := metrics.BlobStorageDelete()
var err error
if err = w.vacuum.RemoveBlob(ctx2, t.Digest); err != nil {
switch {
case errors.As(err, &driver.PathNotFoundError{}):
// this is unexpected, but it's not a show stopper for GC
l.Warn("blob no longer exists on storage")
default:
err = fmt.Errorf("deleting blob from storage: %w", err)
// we don't know how to react here, so just try to postpone the task review and return
if innerErr := w.postponeTaskAndCommit(ctx, tx, t); innerErr != nil {
err = multierror.Append(err, innerErr)
}
report(err)
return err
}
} else {
// get blob media type and size for metrics purposes
b, err := bs.FindByDigest(ctx, t.Digest)
if err != nil {
// log and continue, try to delete the blob on the database and handle the failure there (if it persists)
l.WithError(err).Error("failed searching for blob on database")
} else {
if b == nil {
// this is unexpected, but it's not a show stopper for GC
l.Warn("blob no longer exists on database")
return nil
}
metrics.StorageDeleteBytes(b.Size, b.MediaType)
}
}
report(nil)
// delete blob from database
report = metrics.BlobDatabaseDelete()
if err = bs.Delete(ctx, t.Digest); err != nil {
switch {
case err == datastore.ErrNotFound:
// this is unexpected, but it's not a show stopper for GC
l.Warn("blob no longer exists on database")
return nil
case errors.Is(err, context.DeadlineExceeded):
// the transaction duration exceeded w.txTimeout and therefore the connection was closed, just return
default:
// we don't know how to react here, so just try to postpone the task review and return
if innerErr := w.postponeTaskAndCommit(ctx, tx, t); innerErr != nil {
err = multierror.Append(err, innerErr)
}
}
report(err)
return err
}
l.WithFields(log.Fields{"digest": t.Digest}).Info("blob deleted")
report(nil)
return nil
}
func (w *BlobWorker) postponeTaskAndCommit(ctx context.Context, tx datastore.Transactor, t *models.GCBlobTask) error {
d := exponentialBackoff(t.ReviewCount)
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"backoff_duration": d.String()}).Info("postponing next review")
if err := blobTaskStoreConstructor(tx).Postpone(ctx, t, d); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing database transaction: %w", err)
}
metrics.ReviewPostpone(w.name)
return nil
}