in registry/datastore/importer.go [594:736]
func (imp *Importer) importTags(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository) error {
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repository": dbRepo.Path})
manifestService, err := fsRepo.Manifests(ctx)
if err != nil {
return fmt.Errorf("constructing manifest service: %w", err)
}
tagService := fsRepo.Tags(ctx)
fsTags, err := tagService.All(ctx)
if err != nil {
if errors.As(err, &distribution.ErrRepositoryUnknown{}) {
// No `tags` folder, so no tags and therefore nothing to import. Just handle this gracefully and return.
// The import will be completed successfully.
return nil
}
return fmt.Errorf("reading tags: %w", err)
}
total := len(fsTags)
semaphore := make(chan struct{}, imp.tagConcurrency)
tagResChan := make(chan *tagLookupResponse)
l.WithFields(log.Fields{"total": total}).Info("importing tags")
// Start a goroutine to concurrently dispatch tag details lookup, up to the configured tag concurrency at once.
go func() {
var wg sync.WaitGroup
for _, tag := range fsTags {
semaphore <- struct{}{}
wg.Add(1)
select {
case <-ctx.Done():
// Exit earlier if a tag lookup or import failed.
return
default:
}
go func(t string) {
defer func() {
<-semaphore
wg.Done()
}()
desc, err := tagService.Get(ctx, t)
tagResChan <- &tagLookupResponse{t, desc, err}
}(tag)
}
wg.Wait()
close(tagResChan)
}()
opts := make([]progressbar.Option, len(commonBarOptions), len(commonBarOptions)+3)
copy(opts, commonBarOptions)
opts = append(
opts,
progressbar.OptionSetDescription(fmt.Sprintf("importing tags in %s", dbRepo.Path)),
progressbar.OptionSetItsString("tags"),
progressbar.OptionSetVisibility(imp.showProgressBar),
)
bar := progressbar.NewOptions(total, opts...)
defer func() {
_ = bar.Finish()
_ = bar.Close()
}()
// Consume the tag lookup details serially. In the ideal case, we only need
// retrieve the manifest from the database and associate it with a tag. This
// is fast enough that concurrency really isn't warranted here as well.
var i int
for tRes := range tagResChan {
i++
fsTag := tRes.name
desc := tRes.desc
err := tRes.err
_ = bar.Add(1)
l := l.WithFields(log.Fields{"tag_name": fsTag, "count": i, "total": total, "digest": desc.Digest})
l.Info("importing tag")
if err != nil {
l := l.WithError(err)
if errors.As(err, &distribution.ErrTagUnknown{}) {
// The tag link is missing, log a warning and skip.
l.Warn("missing tag link, skipping")
continue
}
if errors.Is(err, digest.ErrDigestInvalidFormat) || errors.Is(err, digest.ErrDigestUnsupported) {
// The tag link is corrupted, log a warning and skip.
l.Warn("broken tag link, skipping")
continue
}
return fmt.Errorf("reading tag details: %w", err)
}
// Find corresponding manifest in DB or filesystem.
var dbManifest *models.Manifest
dbManifest, err = imp.repositoryStore.FindManifestByDigest(ctx, dbRepo, desc.Digest)
if err != nil {
return fmt.Errorf("finding tagged manifest in database: %w", err)
}
if dbManifest == nil {
m, err := getFsManifest(ctx, manifestService, desc.Digest, l)
if err != nil {
if errors.Is(err, errManifestSkip) {
continue
}
return err
}
switch fsManifest := m.(type) {
case *manifestlist.DeserializedManifestList:
l.Info("importing manifest list")
dbManifest, err = imp.importManifestList(ctx, fsRepo, dbRepo, fsManifest, desc.Digest)
default:
l.Info("importing manifest")
dbManifest, err = imp.importManifest(ctx, fsRepo, dbRepo, fsManifest, desc.Digest)
}
if err != nil {
if errors.Is(err, distribution.ErrSchemaV1Unsupported) {
l.WithError(err).Warn("skipping v1 manifest import")
continue
}
if errors.Is(err, errManifestSkip) {
l.WithError(err).Warn("skipping manifest import")
continue
}
return fmt.Errorf("importing manifest: %w", err)
}
}
dbTag := &models.Tag{Name: fsTag, RepositoryID: dbRepo.ID, ManifestID: dbManifest.ID, NamespaceID: dbRepo.NamespaceID}
if err := imp.tagStore.CreateOrUpdate(ctx, dbTag); err != nil {
l.WithError(err).Error("creating tag")
}
}
return nil
}