in registry/storage/garbagecollect.go [86:487]
func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, registry distribution.Namespace, opts GCOpts) error {
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"stage": "mark",
"driver": storageDriver.Name(),
"dry_run": opts.DryRun,
})
// Always log this info message at the end of MarkAndSweep, even if there was an error or the process was interrupted.
defer l.Info("zero-downtime continuous garbage collection is now available. See the administration documentation for more information: https://docs.gitlab.com/ee/administration/packages/container_registry_metadata_database.html")
// Check that the database does **not** manage this filesystem.
// We should both log and exit during the failure cases becase we send the
// final error to stderr and this is not always captured in user reports.
dbLock := DatabaseInUseLocker{Driver: storageDriver}
locked, err := dbLock.IsLocked(ctx)
if err != nil {
l.WithError(err).Error("unable to check for database in use lock, exiting")
return err
}
if locked {
dbLockPath, err := dbLock.path()
if err != nil {
// This error should not be possible, but just in case.
l.WithError(err).Error("failed to get database in use lock path")
}
l.WithFields(log.Fields{
"lock_path": dbLockPath,
}).Error("this filesystem is managed by the metadata database, and offline garbage collection is no longer possible, if you are not using the database anymore, remove the file at the lock_path in this log message")
return fmt.Errorf("database managed filesystem, cannot continue")
}
if opts.MaxParallelManifestGets < 1 {
opts.MaxParallelManifestGets = 1
}
repositoryEnumerator, ok := registry.(distribution.RepositoryEnumerator)
if !ok {
return fmt.Errorf("converting Namespace to RepositoryEnumerator")
}
// mark
markStart := time.Now()
l.Info("starting mark stage")
// check registry root repository path exists, if path is non existent log warning
// message indicating gc does not need to be run and exit early without errors.
repositoriesRoot, _ := pathFor(repositoriesRootPathSpec{})
if _, err := storageDriver.Stat(ctx, repositoriesRoot); err != nil {
if errors.As(err, &driver.PathNotFoundError{}) {
l.WithError(err).Warn("skipping garbage collection")
return nil
}
return fmt.Errorf("checking root path: %w", err)
}
markSet := newSyncDigestSet()
manifestArr := syncManifestDelContainer{sync.Mutex{}, make([]ManifestDel, 0)}
err = repositoryEnumerator.Enumerate(ctx, func(repoName string) error {
rLog := l.WithFields(log.Fields{"repository": repoName})
rLog.Info("marking repository")
taggedManifests := newSyncDigestSet()
unTaggedManifests := newSyncDigestSet()
referencedManifests := newSyncDigestSet()
var err error
named, err := reference.WithName(repoName)
if err != nil {
return fmt.Errorf("parsing repo name %s: %w", repoName, err)
}
repository, err := registry.Repository(ctx, named)
if err != nil {
return fmt.Errorf("constructing repository: %w", err)
}
manifestService, err := repository.Manifests(ctx)
if err != nil {
return fmt.Errorf("constructing manifest service: %w", err)
}
manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator)
if !ok {
return fmt.Errorf("converting ManifestService into ManifestEnumerator")
}
t, ok := repository.Tags(ctx).(*tagStore)
if !ok {
return fmt.Errorf("converting tagService into tagStore")
}
cachedTagStore := newCachedTagStore(t)
// Since we're removing untagged images, retrieving all tags primes the
// cache. This isn't strictly necessary, but it prevents a potentially large
// amount of goroutines being spawned only to wait for priming to complete
// and allows us to report the number of primed tags.
if opts.RemoveUntagged {
primeStart := time.Now()
rLog.Info("priming tags cache")
allTags, err := cachedTagStore.All(ctx)
if err != nil {
switch err := err.(type) {
case distribution.ErrRepositoryUnknown:
// Ignore path not found error on missing tags folder
default:
return fmt.Errorf("retrieving tags %w", err)
}
}
rLog.WithFields(log.Fields{
"tags_primed": len(allTags),
"duration_s": time.Since(primeStart).Seconds(),
}).Info("tags cache primed")
}
err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
if opts.RemoveUntagged {
// fetch all tags where this manifest is the latest one
tags, err := cachedTagStore.Lookup(ctx, distribution.Descriptor{Digest: dgst})
if err != nil {
return fmt.Errorf("retrieving tags for digest %v: %w", dgst, err)
}
if len(tags) == 0 {
unTaggedManifests.add(dgst)
return nil
}
rLog.WithFields(log.Fields{
"referenced_by": "tag",
"tag_count": len(tags),
"digest": dgst,
}).Info("marking manifest metadata for repository")
taggedManifests.add(dgst)
return nil
}
referencedManifests.add(dgst)
return nil
})
if err != nil {
// In certain situations such as unfinished uploads, deleting all
// tags in S3 or removing the _manifests folder manually, this
// error may be of type PathNotFound.
//
// In these cases we can continue marking other manifests safely.
//
// If we encounter a MultiError, check each underlying error, returning
// nil only if all errors are of type PathNotFound.
if me, ok := err.(*multierror.Error); ok {
for _, e := range me.Errors {
if !errors.As(e, new(driver.PathNotFoundError)) {
return err
}
}
} else if !errors.As(err, new(driver.PathNotFoundError)) {
return err
}
}
semaphore := make(chan struct{}, opts.MaxParallelManifestGets)
if opts.RemoveUntagged {
g, ctx := errgroup.WithContext(ctx)
for dgst := range taggedManifests.members {
semaphore <- struct{}{}
manifestDigest := dgst
g.Go(func() error {
defer func() {
<-semaphore
}()
rLog.WithFields(log.Fields{
"referenced_by": "tag",
"digest_type": "manifest",
"digest": manifestDigest,
}).Info("marking blob")
markSet.add(manifestDigest)
manifest, err := manifestService.Get(ctx, manifestDigest)
if err != nil {
return fmt.Errorf("retrieving manifest for digest %v: %w", manifestDigest, err)
}
if manifestList, ok := manifest.(*manifestlist.DeserializedManifestList); ok {
// Docker buildx incorrectly uses OCI Image Indexes to store lists
// of layer blobs, account for this so that garbage collection
// can preserve these blobs and won't break later down the line
// when we try to get these digests as manifests due to
// the fallback behavior introduced in
// https://github.com/distribution/distribution/pull/864
splitRef := mlcompat.References(manifestList)
// Normal manifest list with only manifest references, add these
// to the set of referenced manifests.
for _, r := range splitRef.Manifests {
rLog.WithFields(log.Fields{
"digest_type": "manifest",
"referenced_by": "manifest_list",
"digest": r.Digest,
"mediatype": r.MediaType,
"parent_digest": manifestDigest,
}).Info("marking manifest")
referencedManifests.add(r.Digest)
}
// Do some extra logging here for invalid manifest lists and provide
// the list of tags associated with this manifest.
if mlcompat.ContainsBlobs(manifestList) {
tags, err := cachedTagStore.Lookup(ctx, distribution.Descriptor{Digest: manifestDigest})
if err != nil {
return fmt.Errorf("retrieving tags for digest %v: %w", manifestDigest, err)
}
rLog.WithFields(log.Fields{
"mediatype": manifestList.Versioned.MediaType,
"digest": manifestDigest,
"tags": tags,
}).Warn("nonconformant manifest list with layer references, please report this to GitLab")
}
// Mark the manifest list layer references as normal blobs.
for _, r := range splitRef.Blobs {
rLog.WithFields(log.Fields{
"digest_type": "layer",
"referenced_by": "manifest_list",
"digest": r.Digest,
"mediatype": r.MediaType,
"parent_digest": manifestDigest,
}).Info("marking blob")
markSet.add(r.Digest)
}
} else {
for _, descriptor := range manifest.References() {
rLog.WithFields(log.Fields{
"referenced_by": "manifest",
"digest_type": "layer",
"digest": descriptor.Digest,
"mediatype": descriptor.MediaType,
"parent_digest": manifestDigest,
}).Info("marking blob")
markSet.add(descriptor.Digest)
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return fmt.Errorf("marking tagged manifests: %w", err)
}
for dgst := range unTaggedManifests.members {
if referencedManifests.contains(dgst) {
continue
}
rLog.WithFields(log.Fields{"digest": dgst}).Info("manifest metadata will be deleted from repository")
// Fetch all tags from repository: all of these tags could contain the
// manifest in history which means that we need check (and delete) those
// references when deleting the manifest.
allTags, err := cachedTagStore.All(ctx)
if err != nil {
switch err := err.(type) {
case distribution.ErrRepositoryUnknown:
// Ignore path not found error on missing tags folder
default:
return fmt.Errorf("retrieving tags %w", err)
}
}
manifestArr.append(ManifestDel{Name: repoName, Digest: dgst, Tags: allTags})
}
}
refType := "tagOrManifest"
// If we're removing untagged, any manifests left at this point were only
// referenced by a manifest list.
if opts.RemoveUntagged {
refType = "manifest_list"
}
markLog := rLog.WithFields(log.Fields{"referenced_by": refType})
g, ctx := errgroup.WithContext(ctx)
for dgst := range referencedManifests.members {
semaphore <- struct{}{}
d := dgst
g.Go(func() error {
defer func() {
<-semaphore
}()
// Mark the manifest's blob
markLog.WithFields(log.Fields{"digest_type": "manifest", "digest": d}).Info("marking blob")
markSet.add(d)
manifest, err := manifestService.Get(ctx, d)
if err != nil {
// If the manifest is not present, then we should continue,
// allowing its layers to be garbage collected.
errUnknownRev := &distribution.ErrManifestUnknownRevision{}
if errors.As(err, errUnknownRev) {
return nil
}
return fmt.Errorf("retrieving manifest for digest %v: %w", d, err)
}
for _, descriptor := range manifest.References() {
markLog.WithFields(log.Fields{"digest_type": "layer", "digest": descriptor.Digest}).Info("marking blob")
markSet.add(descriptor.Digest)
}
return nil
})
}
if err := g.Wait(); err != nil {
return fmt.Errorf("marking referenced manifests: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("marking blobs: %w", err)
}
blobService := registry.Blobs()
deleteSet := newSyncDigestSet()
l.Info("finding blobs eligible for deletion. This may take some time...")
sizeChan := make(chan int64)
sizeDone := make(chan struct{})
var totalSizeBytes int64
go func() {
for size := range sizeChan {
totalSizeBytes += size
}
sizeDone <- struct{}{}
}()
err = blobService.Enumerate(ctx, func(desc distribution.Descriptor) error {
// check if digest is in markSet. If not, delete it!
if !markSet.contains(desc.Digest) {
l.WithFields(log.Fields{"digest": desc.Digest, "size_bytes": desc.Size}).Info("blob eligible for deletion")
sizeChan <- desc.Size
deleteSet.add(desc.Digest)
}
return nil
})
if err != nil {
return fmt.Errorf("enumerating blobs: %w", err)
}
close(sizeChan)
<-sizeDone
l.WithFields(log.Fields{
"blobs_marked": markSet.len(),
"blobs_to_delete": deleteSet.len(),
"manifests_to_delete": len(manifestArr.manifestDels),
"storage_use_estimate_bytes": totalSizeBytes,
"duration_s": time.Since(markStart).Seconds(),
}).Info("mark stage complete")
// sweep
if opts.DryRun {
return nil
}
sweepStart := time.Now()
l = l.WithFields(log.Fields{"stage": "sweep"})
l.Info("starting sweep stage")
vacuum := NewVacuum(storageDriver)
if len(manifestArr.manifestDels) > 0 {
if err := vacuum.RemoveManifests(ctx, manifestArr.manifestDels); err != nil {
return fmt.Errorf("deleting manifests: %w", err)
}
}
// Lock and unlock manually and access members directly to reduce lock operations.
deleteSet.Lock()
defer deleteSet.Unlock()
dgsts := make([]digest.Digest, 0, len(deleteSet.members))
for dgst := range deleteSet.members {
dgsts = append(dgsts, dgst)
}
if len(dgsts) > 0 {
if err := vacuum.RemoveBlobs(ctx, dgsts); err != nil {
return fmt.Errorf("deleting blobs: %w", err)
}
}
l.WithFields(log.Fields{"duration_s": time.Since(sweepStart).Seconds()}).Info("sweep stage complete")
return err
}