registry/datastore/importer.go (1,177 lines of code) (raw):
package datastore
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/docker/distribution"
"github.com/docker/distribution/internal/feature"
"github.com/docker/distribution/log"
"github.com/docker/distribution/manifest/manifestlist"
mlcompat "github.com/docker/distribution/manifest/manifestlist/compat"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/datastore/models"
"github.com/docker/distribution/registry/storage/driver"
"github.com/jackc/pgx/v5/pgconn"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/schollz/progressbar/v3"
"google.golang.org/api/googleapi"
)
var (
errNegativeTestingDelay = errors.New("negative testing delay")
errManifestSkip = errors.New("the manifest is invalid and its (pre)import should be skipped")
errTagsTableNotEmpty = errors.New("tags table is not empty")
errDBLocked = errors.New("database-in-use lockfile exists")
commonBarOptions = []progressbar.Option{
progressbar.OptionSetElapsedTime(true),
progressbar.OptionShowCount(),
progressbar.OptionSetPredictTime(false),
progressbar.OptionShowElapsedTimeOnFinish(),
progressbar.OptionShowDescriptionAtLineEnd(),
progressbar.OptionShowIts(),
progressbar.OptionSetTheme(progressbar.Theme{
Saucer: "=",
SaucerHead: ">",
SaucerPadding: " ",
BarStart: "[",
BarEnd: "]",
}),
}
)
const mtOctetStream = "application/octet-stream"
// Importer populates the registry database with filesystem metadata. This is only meant to be used for an initial
// one-off migration, starting with an empty database.
type Importer struct {
registry distribution.Namespace
db *DB
repositoryStore RepositoryStore
manifestStore ManifestStore
tagStore TagStore
blobStore BlobStore
importDanglingManifests bool
importDanglingBlobs bool
dryRun bool
tagConcurrency int
rowCount bool
showProgressBar bool
testingDelay time.Duration
preImportRetryTimeout time.Duration
}
// ImporterOption provides functional options for the Importer.
type ImporterOption func(*Importer)
// WithImportDanglingManifests configures the Importer to import all manifests
// rather than only tagged manifests.
//
// Deprecated: WithImportDanglingManifests is a legacy option that is no longer used
// in the import command made available to the user.
func WithImportDanglingManifests(imp *Importer) {
imp.importDanglingManifests = true
}
// WithImportDanglingBlobs configures the Importer to import all blobs
// rather than only blobs referenced by manifests.
//
// Deprecated: WithImportDanglingBlobs is a legacy option that is no longer used
// in the import command made available to the user.
func WithImportDanglingBlobs(imp *Importer) {
imp.importDanglingBlobs = true
}
// WithDryRun configures the Importer to use a single transacton which is rolled
// back and the end of an import cycle.
func WithDryRun(imp *Importer) {
imp.dryRun = true
}
// WithRowCount configures the Importer to count and log the number of rows across the most relevant database tables
// on (pre)import completion.
func WithRowCount(imp *Importer) {
imp.rowCount = true
}
// WithTagConcurrency configures the Importer to retrieve the details of n tags
// concurrently.
func WithTagConcurrency(n int) ImporterOption {
return func(imp *Importer) {
imp.tagConcurrency = n
}
}
// WithTestSlowImport configures the Importer to sleep at the end of the import
// for the given duration. This is useful for testing, but should never be
// enabled on production environments.
func WithTestSlowImport(d time.Duration) ImporterOption {
return func(imp *Importer) {
imp.testingDelay = d
}
}
// WithPreImportRetryTimeout configures the Importer with a retry timeout for certain operations
// due to network connection errors or DB timeouts. See shouldRetryManifestPreImport for more details.
func WithPreImportRetryTimeout(d time.Duration) ImporterOption {
return func(imp *Importer) {
imp.preImportRetryTimeout = d
}
}
// WithProgressBar shows a progress bar and writes detailed logs to a file.
func WithProgressBar(imp *Importer) {
imp.showProgressBar = true
}
// NewImporter creates a new Importer.
func NewImporter(db *DB, registry distribution.Namespace, opts ...ImporterOption) *Importer {
imp := &Importer{
registry: registry,
db: db,
tagConcurrency: 1,
// default manifest pre import retry timeout
preImportRetryTimeout: time.Minute,
}
for _, o := range opts {
o(imp)
}
imp.loadStores(imp.db)
return imp
}
func (imp *Importer) beginTx(ctx context.Context) (Transactor, error) {
tx, err := imp.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
imp.loadStores(tx)
return tx, nil
}
func (imp *Importer) loadStores(db Queryer) {
imp.manifestStore = NewManifestStore(db)
imp.blobStore = NewBlobStore(db)
imp.repositoryStore = NewRepositoryStore(db)
imp.tagStore = NewTagStore(db)
}
func (imp *Importer) findOrCreateDBManifest(ctx context.Context, dbRepo *models.Repository, m *models.Manifest) (*models.Manifest, error) {
dbManifest, err := imp.repositoryStore.FindManifestByDigest(ctx, dbRepo, m.Digest)
if err != nil {
return nil, fmt.Errorf("searching for manifest: %w", err)
}
if dbManifest == nil {
if err := imp.manifestStore.Create(ctx, m); err != nil {
return nil, fmt.Errorf("creating manifest: %w", err)
}
dbManifest = m
}
return dbManifest, nil
}
func (imp *Importer) importLayer(ctx context.Context, dbRepo *models.Repository, dbLayer *models.Blob) error {
if err := imp.blobStore.CreateOrFind(ctx, dbLayer); err != nil {
return fmt.Errorf("creating layer blob: %w", err)
}
if err := imp.repositoryStore.LinkBlob(ctx, dbRepo, dbLayer.Digest); err != nil {
return fmt.Errorf("linking layer blob to repository: %w", err)
}
return nil
}
func (imp *Importer) importLayers(ctx context.Context, dbRepo *models.Repository, fsRepo distribution.Repository, fsLayers []distribution.Descriptor) ([]*models.Blob, error) {
total := len(fsLayers)
var dbLayers []*models.Blob
for i, fsLayer := range fsLayers {
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"repository": dbRepo.Path,
"digest": fsLayer.Digest,
"media_type": fsLayer.MediaType,
"size": fsLayer.Size,
})
ctx = log.WithLogger(ctx, l)
l.WithFields(log.Fields{"total": total, "count": i + 1}).Info("importing layer")
if _, err := fsRepo.Blobs(ctx).Stat(ctx, fsLayer.Digest); err != nil {
if errors.Is(err, distribution.ErrBlobUnknown) {
l.Warn("blob is not linked to repository, skipping blob import")
continue
}
if errors.Is(err, digest.ErrDigestInvalidFormat) || errors.Is(err, digest.ErrDigestUnsupported) {
l.WithError(err).Warn("broken layer link, skipping manifest import")
return dbLayers, errManifestSkip
}
return dbLayers, fmt.Errorf("checking for access to blob with digest %s on repository %s: %w", fsLayer.Digest, fsRepo.Named().Name(), err)
}
// Use the generic octet stream media type for common blob storage, but set
// the original fs media type on the *models.Blob object populated by importLayer.
// This way, when the layers are associated with the manifest, the
// manifest-layer associations record the layer media type in the manifest JSON.
layer := &models.Blob{MediaType: mtOctetStream, Digest: fsLayer.Digest, Size: fsLayer.Size}
if err := imp.importLayer(ctx, dbRepo, layer); err != nil {
return dbLayers, err
}
layer.MediaType = imp.layerMediaType(ctx, fsLayer)
dbLayers = append(dbLayers, layer)
}
return dbLayers, nil
}
// Check that the layer media type is known to the registry or dynamic media
// types are enabled before replacing the generic media type with the specific
// layer media type.
//
// This code should be removed once dynamic media types are implemented:
// https://gitlab.com/groups/gitlab-org/-/epics/13805
func (imp *Importer) layerMediaType(ctx context.Context, fsLayer distribution.Descriptor) string {
if feature.DynamicMediaTypes.Enabled() {
return fsLayer.MediaType
}
mtStore := NewMediaTypeStore(imp.db)
exists, err := mtStore.Exists(ctx, fsLayer.MediaType)
if err != nil {
// Log and continue on this failure.
log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"media_type": fsLayer.MediaType}).
WithError(err).Warn("error checking for existence of layer media type")
}
// Slightly paranoid, but let's not trust the boolean value returned by the
// existence check if there is an error.
if err == nil && exists {
return fsLayer.MediaType
}
// Fallback to generic media type.
return mtOctetStream
}
func (imp *Importer) importManifestV2(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository, m distribution.ManifestV2, dgst digest.Digest, payload []byte, nonConformant bool) (*models.Manifest, error) {
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repository": dbRepo.Path})
dbConfigBlob := &models.Blob{
MediaType: m.Config().MediaType,
Digest: m.Config().Digest,
Size: m.Config().Size,
}
l = l.WithFields(log.Fields{
"digest": dbConfigBlob.Digest,
"media_type": dbConfigBlob.MediaType,
"size": dbConfigBlob.Size,
"non_conformant": nonConformant,
})
l.Info("importing configuration")
ctx = log.WithLogger(ctx, l)
configPayload, err := getConfigPayload(ctx, m.Config(), fsRepo)
if err != nil {
return nil, err
}
// Use the generic octet stream media type for common blob storage, but set
// the original media type on the *models.Blob object populated by CreateOrFind.
// This way, when the configuration is stored with the manifest, the media
// type will match what is present in the manifest JSON.
dbConfigBlob.MediaType = mtOctetStream
if err := imp.blobStore.CreateOrFind(ctx, dbConfigBlob); err != nil {
return nil, err
}
dbConfigBlob.MediaType = m.Config().MediaType
// link configuration to repository
if err := imp.repositoryStore.LinkBlob(ctx, dbRepo, dbConfigBlob.Digest); err != nil {
return nil, fmt.Errorf("associating configuration blob with repository: %w", err)
}
// Import manifest layers stored locally on the registry.
dbLayers, err := imp.importLayers(ctx, dbRepo, fsRepo, m.DistributableLayers())
if err != nil {
return nil, fmt.Errorf("importing layers: %w", err)
}
// find or create DB manifest
dbManifest, err := imp.findOrCreateDBManifest(ctx, dbRepo, &models.Manifest{
NamespaceID: dbRepo.NamespaceID,
RepositoryID: dbRepo.ID,
TotalSize: m.TotalSize(),
SchemaVersion: m.Version().SchemaVersion,
MediaType: m.Version().MediaType,
Digest: dgst,
Payload: payload,
NonConformant: nonConformant,
Configuration: &models.Configuration{
MediaType: dbConfigBlob.MediaType,
Digest: dbConfigBlob.Digest,
Payload: configPayload,
},
})
if err != nil {
return nil, err
}
// Link imported layers to the manifest.
for _, dbLayer := range dbLayers {
if err := imp.manifestStore.AssociateLayerBlob(ctx, dbManifest, dbLayer); err != nil {
return nil, fmt.Errorf("associating layer blob with manifest: %w", err)
}
}
return dbManifest, nil
}
// getConfigPayload will read the configuration payload from fsRepo only if the manifest configuration size is
// smaller than ConfigSizeLimit
func getConfigPayload(ctx context.Context, m distribution.Descriptor, fsRepo distribution.Repository) ([]byte, error) {
if m.Size > ConfigSizeLimit {
return nil, nil
}
l := log.GetLogger(log.WithContext(ctx))
// get configuration blob payload
blobStore := fsRepo.Blobs(ctx)
configPayload, err := blobStore.Get(ctx, m.Digest)
if err != nil {
if errors.Is(err, digest.ErrDigestInvalidFormat) {
l.WithError(err).Warn("broken configuration layer link, skipping")
return nil, errManifestSkip
}
if errors.Is(err, distribution.ErrBlobUnknown) {
// This error might happen if the config blob is not present on common, so
// this might shadow that as a simple "config unlinked" problem. However,
// we haven't seen such an error before, and even if we do, we can't bring
// such a blob back to life. So we simply skip here regardless.
l.WithError(err).Warn("configuration blob not linked, skipping")
return nil, errManifestSkip
}
if errors.Is(err, digest.ErrDigestUnsupported) {
l.WithError(err).Warn("broken configuration link, skipping")
return nil, errManifestSkip
}
return nil, fmt.Errorf("obtaining configuration payload: %w", err)
}
return configPayload, nil
}
func (imp *Importer) importManifestList(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository, ml *manifestlist.DeserializedManifestList, dgst digest.Digest) (*models.Manifest, error) {
if mlcompat.LikelyBuildxCache(ml) {
_, payload, err := ml.Payload()
if err != nil {
return nil, err
}
// convert to OCI manifest and process as if it was one
m, err := mlcompat.OCIManifestFromBuildkitIndex(ml)
if err != nil {
return nil, fmt.Errorf("converting buildkit index to manifest: %w", err)
}
// Note that `payload` is not the deserialized manifest list (`ml`) payload but rather the index payload, untouched.
manifestV2, err := imp.importManifestV2(ctx, fsRepo, dbRepo, m, dgst, payload, true)
if err != nil {
return nil, err
}
return manifestV2, nil
}
_, payload, err := ml.Payload()
if err != nil {
return nil, fmt.Errorf("parsing payload: %w", err)
}
// Media type can be either Docker (`application/vnd.docker.distribution.manifest.list.v2+json`) or OCI (empty).
// We need to make it explicit if empty, otherwise we're not able to distinguish between media types.
mediaType := ml.MediaType
if mediaType == "" {
mediaType = v1.MediaTypeImageIndex
}
// create manifest list on DB
dbManifestList, err := imp.findOrCreateDBManifest(ctx, dbRepo, &models.Manifest{
NamespaceID: dbRepo.NamespaceID,
RepositoryID: dbRepo.ID,
SchemaVersion: ml.SchemaVersion,
MediaType: mediaType,
Digest: dgst,
Payload: payload,
})
if err != nil {
return nil, fmt.Errorf("creating manifest list in database: %w", err)
}
manifestService, err := fsRepo.Manifests(ctx)
if err != nil {
return nil, fmt.Errorf("constructing manifest service: %w", err)
}
// import manifests in list
total := len(ml.Manifests)
for i, m := range ml.Manifests {
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"repository": dbRepo.Path,
"digest": m.Digest.String(),
"count": i + 1,
"total": total,
})
fsManifest, err := getFsManifest(ctx, manifestService, m.Digest, l)
if err != nil {
if errors.Is(err, errManifestSkip) {
// Skipping the import of this broken referenced manifest will lead to a partially broken list. We could
// skip the import of the referencing list as well, but it's already broken on the old registry
// (filesystem metadata) so it's preferable to keep the pull behavior consistent across old and new.
continue
}
return nil, fmt.Errorf("retrieving referenced manifest %q from filesystem: %w", m.Digest, err)
}
l.WithFields(log.Fields{"type": fmt.Sprintf("%T", fsManifest)}).Info("importing manifest referenced in list")
dbManifest, err := imp.importManifest(ctx, fsRepo, dbRepo, fsManifest, m.Digest)
if err != nil {
if errors.Is(err, distribution.ErrSchemaV1Unsupported) {
l.WithError(err).Warn("skipping v1 manifest")
continue
}
return nil, err
}
if err := imp.manifestStore.AssociateManifest(ctx, dbManifestList, dbManifest); err != nil {
return nil, err
}
}
return dbManifestList, nil
}
func (imp *Importer) importManifest(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository, m distribution.Manifest, dgst digest.Digest) (*models.Manifest, error) {
switch fsManifest := m.(type) {
case *schema1.SignedManifest:
return nil, distribution.ErrSchemaV1Unsupported
case distribution.ManifestV2:
_, payload, err := m.Payload()
if err != nil {
return nil, fmt.Errorf("getting manifest payload: %w", err)
}
return imp.importManifestV2(ctx, fsRepo, dbRepo, fsManifest, dgst, payload, false)
default:
return nil, fmt.Errorf("unknown manifest class digest=%s repository=%s", dgst, dbRepo.Path)
}
}
func (imp *Importer) importManifests(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository) error {
manifestService, err := fsRepo.Manifests(ctx)
if err != nil {
return fmt.Errorf("constructing manifest service: %w", err)
}
manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator)
if !ok {
return fmt.Errorf("converting ManifestService into ManifestEnumerator")
}
index := 0
err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
index++
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"repository": dbRepo.Path,
"digest": dgst,
"count": index,
})
m, err := getFsManifest(ctx, manifestService, dgst, l)
if err != nil {
if errors.Is(err, errManifestSkip) {
return nil
}
return err
}
l = l.WithFields(log.Fields{"type": fmt.Sprintf("%T", m)})
switch fsManifest := m.(type) {
case *manifestlist.DeserializedManifestList:
l.Info("importing manifest list")
_, err = imp.importManifestList(ctx, fsRepo, dbRepo, fsManifest, dgst)
default:
l.Info("importing manifest")
_, err = imp.importManifest(ctx, fsRepo, dbRepo, fsManifest, dgst)
if errors.Is(err, distribution.ErrSchemaV1Unsupported) {
l.WithError(err).Warn("skipping v1 manifest import")
return nil
}
}
return err
})
return err
}
// getFsManifest retrieves a manifest from the filesystem. In case the manifest is empty, the corresponding revision
// is unknown (rare unexpected errors, likely due to a past bug or data corruption) or it's an unsupported v1 schema,
// it simply logs a warning message and returns a nil distribution.Manifest and errManifestSkip error to the caller.
// In such case, the import of this manifest should be skipped, and an appropriate warn log message is emitted within
// this function.
func getFsManifest(ctx context.Context, manifestService distribution.ManifestService, dgst digest.Digest, l log.Logger) (distribution.Manifest, error) {
m, err := manifestService.Get(ctx, dgst)
if err != nil {
if errors.As(err, &distribution.ErrManifestEmpty{}) {
// This manifest is empty, which means it's unrecoverable, and therefore we should simply log, leave it
// behind and continue
l.WithError(err).Warn("empty manifest payload, skipping")
return nil, errManifestSkip
}
if errors.As(err, &distribution.ErrManifestUnknownRevision{}) {
// This manifest does not have a corresponding revision on the filesystem (unexpected) and as such,
// attempting to pull if from the API (on the old code path) will return a not found error (even though the
// manifest does exist). We should preserve whatever is the behavior on the old code path, so pulling this
// manifest should also fail on the new code path. Therefore, just log and skip.
l.WithError(err).Warn("unknown manifest revision, skipping")
return nil, errManifestSkip
}
if errors.Is(err, distribution.ErrSchemaV1Unsupported) {
// v1 schema manifests are no longer supported (both writes and reads), so just log a warning and skip
l.WithError(err).Warn("unsupported v1 manifest, skipping")
return nil, errManifestSkip
}
if errors.Is(err, digest.ErrDigestInvalidFormat) {
// The manifest link is corrupted. Although its payload may still be present in common blob storage, this
// manifest is no longer accessible from the outside in the scope of the current repository. For security
// reasons we should not repair the broken link and therefore just log a warning and skip.
l.WithError(err).Warn("broken manifest link, skipping")
return nil, errManifestSkip
}
if errors.Is(err, digest.ErrDigestUnsupported) {
// this error is returned if the manifest's digest uses an unsupported algorithm
// per https://github.com/opencontainers/go-digest/blob/v1.0.0/algorithm.go#L50
l.WithError(err).Warn("unsupported manifest digest algorithm, skipping")
return nil, errManifestSkip
}
return nil, fmt.Errorf("retrieving manifest %q from filesystem: %w", dgst, err)
}
return m, nil
}
type tagLookupResponse struct {
name string
desc distribution.Descriptor
err error
}
func (imp *Importer) importTags(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository) error {
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repository": dbRepo.Path})
manifestService, err := fsRepo.Manifests(ctx)
if err != nil {
return fmt.Errorf("constructing manifest service: %w", err)
}
tagService := fsRepo.Tags(ctx)
fsTags, err := tagService.All(ctx)
if err != nil {
if errors.As(err, &distribution.ErrRepositoryUnknown{}) {
// No `tags` folder, so no tags and therefore nothing to import. Just handle this gracefully and return.
// The import will be completed successfully.
return nil
}
return fmt.Errorf("reading tags: %w", err)
}
total := len(fsTags)
semaphore := make(chan struct{}, imp.tagConcurrency)
tagResChan := make(chan *tagLookupResponse)
l.WithFields(log.Fields{"total": total}).Info("importing tags")
// Start a goroutine to concurrently dispatch tag details lookup, up to the configured tag concurrency at once.
go func() {
var wg sync.WaitGroup
for _, tag := range fsTags {
semaphore <- struct{}{}
wg.Add(1)
select {
case <-ctx.Done():
// Exit earlier if a tag lookup or import failed.
return
default:
}
go func(t string) {
defer func() {
<-semaphore
wg.Done()
}()
desc, err := tagService.Get(ctx, t)
tagResChan <- &tagLookupResponse{t, desc, err}
}(tag)
}
wg.Wait()
close(tagResChan)
}()
opts := make([]progressbar.Option, len(commonBarOptions), len(commonBarOptions)+3)
copy(opts, commonBarOptions)
opts = append(
opts,
progressbar.OptionSetDescription(fmt.Sprintf("importing tags in %s", dbRepo.Path)),
progressbar.OptionSetItsString("tags"),
progressbar.OptionSetVisibility(imp.showProgressBar),
)
bar := progressbar.NewOptions(total, opts...)
defer func() {
_ = bar.Finish()
_ = bar.Close()
}()
// Consume the tag lookup details serially. In the ideal case, we only need
// retrieve the manifest from the database and associate it with a tag. This
// is fast enough that concurrency really isn't warranted here as well.
var i int
for tRes := range tagResChan {
i++
fsTag := tRes.name
desc := tRes.desc
err := tRes.err
_ = bar.Add(1)
l := l.WithFields(log.Fields{"tag_name": fsTag, "count": i, "total": total, "digest": desc.Digest})
l.Info("importing tag")
if err != nil {
l := l.WithError(err)
if errors.As(err, &distribution.ErrTagUnknown{}) {
// The tag link is missing, log a warning and skip.
l.Warn("missing tag link, skipping")
continue
}
if errors.Is(err, digest.ErrDigestInvalidFormat) || errors.Is(err, digest.ErrDigestUnsupported) {
// The tag link is corrupted, log a warning and skip.
l.Warn("broken tag link, skipping")
continue
}
return fmt.Errorf("reading tag details: %w", err)
}
// Find corresponding manifest in DB or filesystem.
var dbManifest *models.Manifest
dbManifest, err = imp.repositoryStore.FindManifestByDigest(ctx, dbRepo, desc.Digest)
if err != nil {
return fmt.Errorf("finding tagged manifest in database: %w", err)
}
if dbManifest == nil {
m, err := getFsManifest(ctx, manifestService, desc.Digest, l)
if err != nil {
if errors.Is(err, errManifestSkip) {
continue
}
return err
}
switch fsManifest := m.(type) {
case *manifestlist.DeserializedManifestList:
l.Info("importing manifest list")
dbManifest, err = imp.importManifestList(ctx, fsRepo, dbRepo, fsManifest, desc.Digest)
default:
l.Info("importing manifest")
dbManifest, err = imp.importManifest(ctx, fsRepo, dbRepo, fsManifest, desc.Digest)
}
if err != nil {
if errors.Is(err, distribution.ErrSchemaV1Unsupported) {
l.WithError(err).Warn("skipping v1 manifest import")
continue
}
if errors.Is(err, errManifestSkip) {
l.WithError(err).Warn("skipping manifest import")
continue
}
return fmt.Errorf("importing manifest: %w", err)
}
}
dbTag := &models.Tag{Name: fsTag, RepositoryID: dbRepo.ID, ManifestID: dbManifest.ID, NamespaceID: dbRepo.NamespaceID}
if err := imp.tagStore.CreateOrUpdate(ctx, dbTag); err != nil {
l.WithError(err).Error("creating tag")
}
}
return nil
}
func (imp *Importer) importRepository(ctx context.Context, path string) error {
named, err := reference.WithName(path)
if err != nil {
return fmt.Errorf("parsing repository name: %w", err)
}
fsRepo, err := imp.registry.Repository(ctx, named)
if err != nil {
return fmt.Errorf("constructing filesystem repository: %w", err)
}
// Find or create repository.
var dbRepo *models.Repository
if dbRepo, err = imp.repositoryStore.CreateOrFindByPath(ctx, path); err != nil {
return fmt.Errorf("creating or finding repository in database: %w", err)
}
if imp.importDanglingManifests {
log.GetLogger(log.WithContext(ctx)).Warn("beginning legacy dangling manifests import")
// import all repository manifests
if err := imp.importManifests(ctx, fsRepo, dbRepo); err != nil {
return fmt.Errorf("importing manifests: %w", err)
}
}
// import repository tags and associated manifests
if err := imp.importTags(ctx, fsRepo, dbRepo); err != nil {
return fmt.Errorf("importing tags: %w", err)
}
return nil
}
func (imp *Importer) preImportTaggedManifests(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository) error {
tagService := fsRepo.Tags(ctx)
fsTags, err := tagService.All(ctx)
if err != nil {
if errors.As(err, &distribution.ErrRepositoryUnknown{}) {
// No `tags` folder, so no tags and therefore nothing to import. Just handle this gracefully and return.
// The pre-import will be completed successfully.
return nil
}
return fmt.Errorf("reading tags: %w", err)
}
total := len(fsTags)
doneManifests := make(map[digest.Digest]struct{}, 0)
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repository": dbRepo.Path, "total": total})
l.Info("processing tags")
opts := make([]progressbar.Option, len(commonBarOptions), len(commonBarOptions)+3)
copy(opts, commonBarOptions)
opts = append(
opts,
progressbar.OptionSetDescription(fmt.Sprintf("pre importing manifests in %s", dbRepo.Path)),
progressbar.OptionSetItsString("manifests"),
progressbar.OptionSetVisibility(imp.showProgressBar),
)
bar := progressbar.NewOptions(total, opts...)
defer func() {
_ = bar.Finish()
_ = bar.Close()
}()
for i, fsTag := range fsTags {
l := l.WithFields(log.Fields{"tag_name": fsTag, "count": i + 1})
l.Info("processing tag")
_ = bar.Add(1)
// read tag details from the filesystem
desc, err := tagService.Get(ctx, fsTag)
if err != nil {
if errors.As(err, &distribution.ErrTagUnknown{}) {
// this tag was either deleted since all tags were listed or the link was missing already, log and skip
l.WithError(err).Warn("missing tag link, skipping")
continue
}
if errors.Is(err, digest.ErrDigestInvalidFormat) || errors.Is(err, digest.ErrDigestUnsupported) {
// the tag link is corrupted, just log a warning and skip
l.WithError(err).Warn("broken tag link, skipping")
continue
}
return fmt.Errorf("reading tag %q from filesystem: %w", fsTag, err)
}
// We should always fully pre-import a manifest (the manifest itself and its references) at least once per
// pre-import run to avoid running into https://gitlab.com/gitlab-org/container-registry/-/issues/652. However,
// there is no need to re-import the same manifest multiple times per pre-import (e.g. the same manifest with
// multiple tags). Therefore, we keep a list of pre-imported manifests per run and only pre-import each once.
if _, ok := doneManifests[desc.Digest]; ok {
// for precaution, just double check that it does indeed exist on the database
dbManifest, err := imp.repositoryStore.FindManifestByDigest(ctx, dbRepo, desc.Digest)
if err != nil {
return fmt.Errorf("finding tagged manifests in database: %w", err)
}
if dbManifest == nil {
return fmt.Errorf("previously pre-imported manifest %q not found in database", desc.Digest.String())
}
} else {
if err := imp.preImportManifest(ctx, fsRepo, dbRepo, desc.Digest); err != nil {
if errors.Is(err, errManifestSkip) {
continue
}
return fmt.Errorf("pre importing manifest: %w", err)
}
doneManifests[desc.Digest] = struct{}{}
}
}
return nil
}
func (imp *Importer) preImportManifest(ctx context.Context, fsRepo distribution.Repository, dbRepo *models.Repository, dgst digest.Digest) error {
manifestService, err := fsRepo.Manifests(ctx)
if err != nil {
return fmt.Errorf("constructing manifest service: %w", err)
}
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repository": dbRepo.Path, "digest": dgst})
m, err := getFsManifest(ctx, manifestService, dgst, l)
if err != nil {
return err
}
switch fsManifest := m.(type) {
case *manifestlist.DeserializedManifestList:
l.Info("pre-importing manifest list")
if _, err := imp.importManifestList(ctx, fsRepo, dbRepo, fsManifest, dgst); err != nil {
return fmt.Errorf("pre importing manifest list: %w", err)
}
default:
l.Info("pre-importing manifest")
if _, err := imp.importManifest(ctx, fsRepo, dbRepo, fsManifest, dgst); err != nil {
switch {
case errors.Is(err, distribution.ErrSchemaV1Unsupported):
l.WithError(err).Warn("skipping v1 manifest import")
return errManifestSkip
case errors.Is(err, errManifestSkip):
l.WithError(err).Warn("skipping manifest import")
return err
default:
// nolint: revive // max-control-nesting
if shouldRetryManifestPreImport(err) {
return imp.retryImportManifestWithBackoff(l, fsRepo, fsManifest, dbRepo, dgst)
}
}
return err
}
}
return nil
}
func (imp *Importer) retryImportManifestWithBackoff(l log.Logger, fsRepo distribution.Repository, fsManifest distribution.Manifest, dbRepo *models.Repository, dgst digest.Digest) error {
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = 100 * time.Millisecond
backOff.MaxElapsedTime = imp.preImportRetryTimeout
count := 0
operation := func() error {
count++
l = l.WithFields(log.Fields{"retry_count": count})
l.Info("retrying pre import manifest")
// use a new context with an extended deadline just for this retry
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, retryErr := imp.importManifest(ctx, fsRepo, dbRepo, fsManifest, dgst); retryErr != nil {
err := fmt.Errorf("retrying pre import manifest %s: %w", dgst, retryErr)
l.WithError(err).Warn("pre import retry failed")
return err
}
return nil
}
return backoff.Retry(operation, backOff)
}
// shouldRetryManifestPreImport checks the returned error from importManifest and decides
// whether the manifest pre import should be retried based on the types of errors.
func shouldRetryManifestPreImport(err error) bool {
// check for generic network timeouts
var netError net.Error
if errors.As(err, &netError) && netError.Timeout() {
return true
}
// check for connection reset errors
var netOpError *net.OpError
if errors.As(err, &netOpError) {
var syscallErr *os.SyscallError
if errors.As(err, &syscallErr) && syscallErr.Err == syscall.ECONNRESET {
return true
}
}
// check DB connection errors and see if it's safe to retry
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgconn.SafeToRetry(pgErr) {
return true
}
// special condition for GCS 503 responses used for gitlab.com, we should find a
// generic way to expose this error for all drivers https://gitlab.com/gitlab-org/container-registry/-/issues/707
var gcsErr *googleapi.Error
if errors.As(err, &gcsErr) {
if gcsErr.Code == http.StatusServiceUnavailable {
return true
}
}
return false
}
func (imp *Importer) countRows(ctx context.Context) (map[string]int, error) {
numRepositories, err := imp.repositoryStore.Count(ctx)
if err != nil {
return nil, err
}
numManifests, err := imp.manifestStore.Count(ctx)
if err != nil {
return nil, err
}
numBlobs, err := imp.blobStore.Count(ctx)
if err != nil {
return nil, err
}
numTags, err := imp.tagStore.Count(ctx)
if err != nil {
return nil, err
}
count := map[string]int{
"repositories": numRepositories,
"manifests": numManifests,
"blobs": numBlobs,
"tags": numTags,
}
return count, nil
}
func (imp *Importer) isTagsTableEmpty(ctx context.Context) (bool, error) {
count, err := imp.tagStore.Count(ctx)
if err != nil {
return false, err
}
return count == 0, nil
}
// ImportAll populates the registry database with metadata from all repositories in the storage backend.
//
// Deprecated: ImportAll is the original implementation and should no longer be used, use FullImport instead.
func (imp *Importer) ImportAll(ctx context.Context) error {
var tx Transactor
var err error
// Add pre_import field to all subsequent logging.
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"pre_import": false, "dry_run": imp.dryRun, "legacy": true})
ctx = log.WithLogger(ctx, l)
l.Warn("this is the legacy full import method, do not use on production registries")
// Create a single transaction and roll it back at the end for dry runs.
if imp.dryRun {
tx, err = imp.beginTx(ctx)
if err != nil {
return fmt.Errorf("beginning dry run transaction: %w", err)
}
defer tx.Rollback()
}
start := time.Now()
l.Info("starting metadata import")
if imp.importDanglingBlobs {
if err := imp.importBlobsImpl(ctx); err != nil {
return fmt.Errorf("importing blobs: %w", err)
}
}
if err := imp.importAllRepositoriesImpl(ctx); err != nil {
return err
}
// This should only delay during testing.
time.Sleep(imp.testingDelay)
if imp.rowCount {
counters, err := imp.countRows(ctx)
if err != nil {
l.WithError(err).Error("counting table rows")
}
logCounters := make(map[string]any, len(counters))
for t, n := range counters {
logCounters[t] = n
}
l = l.WithFields(logCounters)
}
t := time.Since(start).Seconds()
l.WithFields(log.Fields{"duration_s": t}).Info("metadata import complete")
return err
}
type step int
const (
unknown step = iota // Always first to catch uninitialized values.
preImport
repoImport
commonBlobs
)
// doImport manages which import steps to run and ensure pre and post import
// tasks are handled consistently across import steps. Included import steps are
// always ran in the following order: pre import, repository import, common blobs.
// The function signature requires at least one step to be included, but allows multiple.
func (imp *Importer) doImport(ctx context.Context, required step, steps ...step) error {
var (
tx Transactor
err error
pre, repos, blobs bool
)
// Assign each valid step to a boolean value. This ensures that we only run a
// particular step once and in the correct order.
steps = append(steps, required)
for _, s := range steps {
switch s {
case preImport:
pre = true
case repoImport:
repos = true
case commonBlobs:
blobs = true
default:
return fmt.Errorf("unknown import step: %v", s)
}
}
var f *os.File
bar := progressbar.NewOptions(-1,
progressbar.OptionShowElapsedTimeOnFinish(),
progressbar.OptionShowDescriptionAtLineEnd(),
progressbar.OptionSetVisibility(imp.showProgressBar),
)
defer bar.Close()
commonBarOptions = append(commonBarOptions, progressbar.OptionSetVisibility(imp.showProgressBar))
if imp.showProgressBar {
fn := fmt.Sprintf("%s-registry-import.log", time.Now().Format(time.RFC3339))
// nolint: gosec // G304
f, err = os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600)
if err != nil {
return fmt.Errorf("opening log file: %w", err)
}
wd, err := os.Getwd()
if err != nil {
return fmt.Errorf("getting working directory: %w", err)
}
// A little hacky, but we can use a progress bar to show the overall import
// progress by printing the bar with different descriptions. Otherwise, the
// progress bars and a regular logger would step over one another.
_ = bar.Add(1) // Give the bar some state so we can print it.
imp.printBar(bar, fmt.Sprintf("registry import starting, detailed log written to: %s", filepath.Join(wd, fn)))
} else {
f = os.Stdout
}
l := log.GetLogger(log.WithContext(ctx), log.WithWriter(f)).WithFields(log.Fields{
"pre_import": pre,
"repository_import": repos,
"common_blobs": blobs,
"dry_run": imp.dryRun,
})
ctx = log.WithLogger(ctx, l)
// Create a single transaction and roll it back at the end for dry runs.
if imp.dryRun {
tx, err = imp.beginTx(ctx)
if err != nil {
return fmt.Errorf("beginning dry run transaction: %w", err)
}
defer tx.Rollback()
}
start := time.Now()
l.Info("starting metadata import")
if pre {
start := time.Now()
imp.printBar(bar, "step one: import manifests")
if err := imp.preImportAllRepositories(ctx); err != nil {
return fmt.Errorf("pre importing all repositories: %w", err)
}
imp.printBar(bar, fmt.Sprintf("step one completed in %s", time.Since(start).Round(time.Second)))
}
if repos {
start := time.Now()
imp.printBar(bar, "step two: import tags")
if err := imp.importAllRepositoriesImpl(ctx); err != nil {
return fmt.Errorf("importing all repositories: %w", err)
}
imp.printBar(bar, fmt.Sprintf("step two completed in %s", time.Since(start).Round(time.Second)))
}
if blobs {
start := time.Now()
imp.printBar(bar, "step three: import blobs")
if err := imp.importBlobsImpl(ctx); err != nil {
return fmt.Errorf("importing blobs: %w", err)
}
imp.printBar(bar, fmt.Sprintf("step three completed in %s", time.Since(start).Round(time.Second)))
}
bar.Describe("registry import complete")
t := time.Since(start).Seconds()
if imp.rowCount {
counters, err := imp.countRows(ctx)
if err != nil {
l.WithError(err).Error("counting table rows")
}
logCounters := make(map[string]any, len(counters))
for t, n := range counters {
logCounters[t] = n
}
l = l.WithFields(logCounters)
}
l.WithFields(log.Fields{"duration_s": t}).Info("metadata import complete")
return err
}
// FullImport populates the registry database with metadata from all repositories in the storage backend.
func (imp *Importer) FullImport(ctx context.Context) error {
return imp.doImport(ctx, preImport, repoImport, commonBlobs)
}
// PreImportAll populates repository data without including any tag information.
// This command is safe to run without read-only mode enabled on the registry.
func (imp *Importer) PreImportAll(ctx context.Context) error {
return imp.doImport(ctx, preImport)
}
// ImportAllRepositories populates all repository data, when used after a pre import
// cycle, this data will largely include only tags, but if a tag is not
// associated with an existing manifest, all metadata associated with that
// manifest will be imported. This command must only be used when read-only
// mode is enabled on the registry.
func (imp *Importer) ImportAllRepositories(ctx context.Context) error {
return imp.doImport(ctx, repoImport)
}
// ImportBlobs populates the registry database with metadata from all blobs in the storage backend.
func (imp *Importer) ImportBlobs(ctx context.Context) error {
return imp.doImport(ctx, commonBlobs)
}
func (imp *Importer) preImportAllRepositories(ctx context.Context) error {
repositoryEnumerator, ok := imp.registry.(distribution.RepositoryEnumerator)
if !ok {
return errors.New("building repository enumerator")
}
index := 0
return repositoryEnumerator.Enumerate(ctx, func(path string) error {
index++
repoStart := time.Now()
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repository": path, "count": index})
l.Info("pre importing repository")
named, err := reference.WithName(path)
if err != nil {
return fmt.Errorf("parsing repository name: %w", err)
}
fsRepo, err := imp.registry.Repository(ctx, named)
if err != nil {
return fmt.Errorf("constructing filesystem repository: %w", err)
}
dbRepo, err := imp.repositoryStore.CreateOrFindByPath(ctx, path)
if err != nil {
return fmt.Errorf("creating or finding repository in database: %w", err)
}
if err = imp.preImportTaggedManifests(ctx, fsRepo, dbRepo); err != nil {
l.WithError(err).Error("pre importing tagged manifests")
// if the storage driver failed to find a repository path (usually due to missing `_manifests/revisions`
// or `_manifests/tags` folders) continue to the next one, otherwise stop as the error is unknown.
if !(errors.As(err, &driver.PathNotFoundError{}) || errors.As(err, &distribution.ErrRepositoryUnknown{})) {
return fmt.Errorf("pre importing tagged manifests: %w", err)
}
return nil
}
repoEnd := time.Since(repoStart).Seconds()
l.WithFields(log.Fields{"duration_s": repoEnd}).Info("repository pre import complete")
return nil
})
}
func (imp *Importer) importBlobsImpl(ctx context.Context) error {
var index int
start := time.Now()
l := log.GetLogger(log.WithContext(ctx))
l.Info("importing all blobs")
opts := make([]progressbar.Option, len(commonBarOptions), len(commonBarOptions)+3)
copy(opts, commonBarOptions)
opts = append(
opts,
progressbar.OptionSetDescription("importing blobs"),
progressbar.OptionSetItsString("blobs"),
progressbar.OptionSetVisibility(imp.showProgressBar),
)
bar := progressbar.NewOptions(-1, opts...)
defer func() {
_ = bar.Finish()
_ = bar.Close()
}()
if err := imp.registry.Blobs().Enumerate(ctx, func(desc distribution.Descriptor) error {
index++
_ = bar.Add(1)
l.WithFields(log.Fields{"digest": desc.Digest, "count": index, "size": desc.Size}).Info("importing blob")
dbBlob, err := imp.blobStore.FindByDigest(ctx, desc.Digest)
if err != nil {
return fmt.Errorf("checking for existence of blob: %w", err)
}
if dbBlob == nil {
if err := imp.blobStore.Create(ctx, &models.Blob{MediaType: mtOctetStream, Digest: desc.Digest, Size: desc.Size}); err != nil {
return fmt.Errorf("creating blob in database: %w", err)
}
}
return nil
}); err != nil {
return err
}
end := time.Since(start).Seconds()
l.WithFields(log.Fields{"duration_s": end}).Info("blob import complete")
return nil
}
func (imp *Importer) handleLockers(ctx context.Context, err error) error {
if !feature.EnforceLockfiles.Enabled() {
return nil
}
if err != nil || imp.dryRun {
if err := imp.RestoreLockfiles(ctx); err != nil {
return fmt.Errorf("could not restore lockfiles: %w", err)
}
return nil
}
// Once we have finished importing all tags, we need to release the
// `filesystem-in-use` lockfile in order for the registry to boot in
// database mode.
if err := imp.registry.Lockers().FSUnlock(ctx); err != nil {
return fmt.Errorf("could not unlock filesystem-in-use lockfile: %w", err)
}
// Additionally, we need to lock the `database-in-use` lockfile
// to prevent the registry starting in filesystem mode again.
if err := imp.registry.Lockers().DBLock(ctx); err != nil {
return fmt.Errorf("could not lock database-in-use lockfile: %w", err)
}
return nil
}
func (imp *Importer) importAllRepositoriesImpl(ctx context.Context) (err error) {
var tx Transactor
defer func() {
if lockErr := imp.handleLockers(ctx, err); lockErr != nil {
if err != nil {
err = fmt.Errorf("%s: %w", err, lockErr)
} else {
err = lockErr
}
}
}()
isTagsTableEmpty, err := imp.isTagsTableEmpty(ctx)
if err != nil {
return fmt.Errorf("chechking if tags table is empty: %w", err)
}
if !isTagsTableEmpty {
log.GetLogger(log.WithContext(ctx)).WithError(errTagsTableNotEmpty).Error("cannot import all repositories while the tags table has entries, you must truncate the table manually before retrying, see https://docs.gitlab.com/ee/administration/packages/container_registry_metadata_database.html#troubleshooting")
return errTagsTableNotEmpty
}
repositoryEnumerator, ok := imp.registry.(distribution.RepositoryEnumerator)
if !ok {
return errors.New("error building repository enumerator")
}
index := 0
return repositoryEnumerator.Enumerate(ctx, func(path string) error {
if !imp.dryRun {
tx, err = imp.beginTx(ctx)
if err != nil {
return fmt.Errorf("beginning repository transaction: %w", err)
}
defer tx.Rollback()
}
index++
start := time.Now()
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{"repository": path, "count": index})
l.Info("importing repository")
if err := imp.importRepository(ctx, path); err != nil {
l.WithError(err).Error("error importing repository")
// if the storage driver failed to find a repository path (usually due to missing `_manifests/revisions`
// or `_manifests/tags` folders) continue to the next one, otherwise stop as the error is unknown.
if !(errors.As(err, &driver.PathNotFoundError{}) || errors.As(err, &distribution.ErrRepositoryUnknown{})) {
return err
}
return nil
}
end := time.Since(start).Seconds()
l.WithFields(log.Fields{"duration_s": end}).Info("repository import complete")
if !imp.dryRun {
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit repository transaction: %w", err)
}
// reset stores to use the main connection handler instead of the last (committed/rolled back) transaction
imp.loadStores(imp.db)
}
return nil
})
}
// Import populates the registry database with metadata from a specific repository in the storage backend.
func (imp *Importer) Import(ctx context.Context, path string) error {
tx, err := imp.beginTx(ctx)
if err != nil {
return fmt.Errorf("begin repository transaction: %w", err)
}
defer tx.Rollback()
// Add specific log fields to all subsequent log entries.
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"pre_import": false,
"dry_run": imp.dryRun,
"component": "importer",
})
ctx = log.WithLogger(ctx, l)
start := time.Now()
l = l.WithFields(log.Fields{"repository": path})
l.Info("starting metadata import")
l.Info("importing repository")
if err := imp.importRepository(ctx, path); err != nil {
l.WithError(err).Error("error importing repository")
return err
}
// This should only delay during testing.
timer := time.NewTimer(imp.testingDelay)
select {
case <-timer.C:
// do nothing
l.Debug("done waiting for slow import test")
case <-ctx.Done():
return nil
}
if imp.rowCount {
counters, err := imp.countRows(ctx)
if err != nil {
l.WithError(err).Error("counting table rows")
}
logCounters := make(map[string]any, len(counters))
for t, n := range counters {
logCounters[t] = n
}
l = l.WithFields(logCounters)
}
t := time.Since(start).Seconds()
l.WithFields(log.Fields{"duration_s": t}).Info("metadata import complete")
if imp.dryRun {
return err
}
if err = tx.Commit(); err != nil {
l.WithError(err).Error("committing transaction for final import")
}
return err
}
// PreImport populates repository data without including any tag information.
// Running pre-import can reduce the runtime of an Import against the same
// repository and, with online garbage collection enabled, does not require a
// repository to be read-only.
func (imp *Importer) PreImport(ctx context.Context, path string) error {
var tx Transactor
var err error
// Add specific log fields to all subsequent log entries.
l := log.GetLogger(log.WithContext(ctx)).WithFields(log.Fields{
"pre_import": true,
"component": "importer",
})
ctx = log.WithLogger(ctx, l)
// Create a single transaction and roll it back at the end for dry runs.
if imp.dryRun {
tx, err = imp.beginTx(ctx)
if err != nil {
return fmt.Errorf("begin dry run transaction: %w", err)
}
defer tx.Rollback()
}
start := time.Now()
l = l.WithFields(log.Fields{"repository": path})
l.Info("starting repository pre-import")
named, err := reference.WithName(path)
if err != nil {
return fmt.Errorf("parsing repository name: %w", err)
}
fsRepo, err := imp.registry.Repository(ctx, named)
if err != nil {
return fmt.Errorf("constructing filesystem repository: %w", err)
}
dbRepo, err := imp.repositoryStore.CreateOrFindByPath(ctx, path)
if err != nil {
return fmt.Errorf("creating or finding repository in database: %w", err)
}
if err = imp.preImportTaggedManifests(ctx, fsRepo, dbRepo); err != nil {
return fmt.Errorf("pre importing tagged manifests: %w", err)
}
if imp.testingDelay < 0 {
return errNegativeTestingDelay
}
// This should only delay during testing.
timer := time.NewTimer(imp.testingDelay)
select {
case <-timer.C:
// do nothing
l.Debug("done waiting for slow pre import test")
case <-ctx.Done():
return nil
}
if !imp.dryRun {
// reset stores to use the main connection handler instead of the last (committed/rolled back) transaction
imp.loadStores(imp.db)
}
if imp.rowCount {
counters, err := imp.countRows(ctx)
if err != nil {
l.WithError(err).Error("counting table rows")
}
logCounters := make(map[string]any, len(counters))
for t, n := range counters {
logCounters[t] = n
}
l = l.WithFields(logCounters)
}
t := time.Since(start).Seconds()
l.WithFields(log.Fields{"duration_s": t}).Info("pre-import complete")
return nil
}
// RestoreLockfiles restores the original state of the lockfiles by unlocking the database
// and locking the filesystem
func (imp *Importer) RestoreLockfiles(ctx context.Context) error {
dbLocked, err := imp.registry.Lockers().DBIsLocked(ctx)
if err != nil {
return err
}
// We should not restore lock files if the importer runs with the --dry-run option, and
// the database-in-use lockfile already exists since the importer only locks the database
// after import completion. This check will help to prevent a scenario
// where an admin runs the importer again for whatever reason.
if imp.dryRun && dbLocked {
return errDBLocked
}
if err := imp.registry.Lockers().DBUnlock(ctx); err != nil {
return err
}
if err := imp.registry.Lockers().FSLock(ctx); err != nil {
return fmt.Errorf("creating the filesystem-in-use file in the storage driver: %w", err)
}
return nil
}
// printBar prints the bar if showProgressBar is enabled. Optionally, sets the
// bar description with the passed string. Passing in a string will mutate the
// bar description.
func (imp *Importer) printBar(b *progressbar.ProgressBar, s ...string) {
if !imp.showProgressBar {
return
}
if len(s) > 0 {
b.Describe(s[0])
}
_, _ = fmt.Println(b)
}