func()

in registry/datastore/importer.go [594:736]


func (imp *Importer) importTags(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository) error {
	l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repository": dbRepo.Path})

	manifestService, err := fsRepo.Manifests(ctx)
	if err != nil {
		return fmt.Errorf("constructing manifest service: %w", err)
	}

	tagService := fsRepo.Tags(ctx)
	fsTags, err := tagService.All(ctx)
	if err != nil {
		if errors.As(err, &distribution.ErrRepositoryUnknown{}) {
			// No `tags` folder, so no tags and therefore nothing to import. Just handle this gracefully and return.
			// The import will be completed successfully.
			return nil
		}
		return fmt.Errorf("reading tags: %w", err)
	}

	total := len(fsTags)
	semaphore := make(chan struct{}, imp.tagConcurrency)
	tagResChan := make(chan *tagLookupResponse)

	l.WithFields(log.Fields{"total": total}).Info("importing tags")

	// Start a goroutine to concurrently dispatch tag details lookup, up to the configured tag concurrency at once.
	go func() {
		var wg sync.WaitGroup
		for _, tag := range fsTags {
			semaphore <- struct{}{}
			wg.Add(1)

			select {
			case <-ctx.Done():
				// Exit earlier if a tag lookup or import failed.
				return
			default:
			}

			go func(t string) {
				defer func() {
					<-semaphore
					wg.Done()
				}()

				desc, err := tagService.Get(ctx, t)
				tagResChan <- &tagLookupResponse{t, desc, err}
			}(tag)
		}

		wg.Wait()
		close(tagResChan)
	}()

	opts := make([]progressbar.Option, len(commonBarOptions), len(commonBarOptions)+3)
	copy(opts, commonBarOptions)
	opts = append(
		opts,
		progressbar.OptionSetDescription(fmt.Sprintf("importing tags in %s", dbRepo.Path)),
		progressbar.OptionSetItsString("tags"),
		progressbar.OptionSetVisibility(imp.showProgressBar),
	)
	bar := progressbar.NewOptions(total, opts...)
	defer func() {
		_ = bar.Finish()
		_ = bar.Close()
	}()

	// Consume the tag lookup details serially. In the ideal case, we only need
	// retrieve the manifest from the database and associate it with a tag. This
	// is fast enough that concurrency really isn't warranted here as well.
	var i int
	for tRes := range tagResChan {
		i++
		fsTag := tRes.name
		desc := tRes.desc
		err := tRes.err
		_ = bar.Add(1)

		l := l.WithFields(log.Fields{"tag_name": fsTag, "count": i, "total": total, "digest": desc.Digest})
		l.Info("importing tag")

		if err != nil {
			l := l.WithError(err)

			if errors.As(err, &distribution.ErrTagUnknown{}) {
				// The tag link is missing, log a warning and skip.
				l.Warn("missing tag link, skipping")
				continue
			}
			if errors.Is(err, digest.ErrDigestInvalidFormat) || errors.Is(err, digest.ErrDigestUnsupported) {
				// The tag link is corrupted, log a warning and skip.
				l.Warn("broken tag link, skipping")
				continue
			}

			return fmt.Errorf("reading tag details: %w", err)
		}

		// Find corresponding manifest in DB or filesystem.
		var dbManifest *models.Manifest
		dbManifest, err = imp.repositoryStore.FindManifestByDigest(ctx, dbRepo, desc.Digest)
		if err != nil {
			return fmt.Errorf("finding tagged manifest in database: %w", err)
		}
		if dbManifest == nil {
			m, err := getFsManifest(ctx, manifestService, desc.Digest, l)
			if err != nil {
				if errors.Is(err, errManifestSkip) {
					continue
				}
				return err
			}

			switch fsManifest := m.(type) {
			case *manifestlist.DeserializedManifestList:
				l.Info("importing manifest list")
				dbManifest, err = imp.importManifestList(ctx, fsRepo, dbRepo, fsManifest, desc.Digest)
			default:
				l.Info("importing manifest")
				dbManifest, err = imp.importManifest(ctx, fsRepo, dbRepo, fsManifest, desc.Digest)
			}
			if err != nil {
				if errors.Is(err, distribution.ErrSchemaV1Unsupported) {
					l.WithError(err).Warn("skipping v1 manifest import")
					continue
				}
				if errors.Is(err, errManifestSkip) {
					l.WithError(err).Warn("skipping manifest import")
					continue
				}
				return fmt.Errorf("importing manifest: %w", err)
			}
		}

		dbTag := &models.Tag{Name: fsTag, RepositoryID: dbRepo.ID, ManifestID: dbManifest.ID, NamespaceID: dbRepo.NamespaceID}
		if err := imp.tagStore.CreateOrUpdate(ctx, dbTag); err != nil {
			l.WithError(err).Error("creating tag")
		}
	}

	return nil
}