internal/gitaly/storage/storagemgr/partition_assignment_migration.go (60 lines of code) (raw):

package storagemgr import ( "context" "errors" "fmt" "io/fs" "github.com/dgraph-io/badger/v4" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/walk" ) var repoAssignedKey = []byte("repo_assigned_to_partition") // AssignmentWorker assigns repositories and their linked object pools to the same partition // if they haven't been assigned to any partition yet. func AssignmentWorker(ctx context.Context, cfg config.Cfg, mgr storage.Node, dbMgr *databasemgr.DBManager, locator storage.Locator) error { for _, s := range cfg.Storages { storageMgr, err := mgr.GetStorage(s.Name) if err != nil { return fmt.Errorf("getting storage: %w", err) } db, err := dbMgr.GetDB(s.Name) if err != nil { return fmt.Errorf("getting db: %w", err) } reposAssigned := false if err := db.View(func(txn keyvalue.ReadWriter) error { _, err := txn.Get(repoAssignedKey) // key has never been set, which means no repository has been assigned to any partition yet if errors.Is(err, badger.ErrKeyNotFound) { return nil } else if err != nil { return fmt.Errorf("retrieving key %s: %w", repoAssignedKey, err) } // key exists, which means all repositories have been assigned to a partition reposAssigned = true return nil }); err != nil { return fmt.Errorf("reading value from db: %w", err) } if reposAssigned { return nil } if err := walk.FindRepositories(ctx, locator, s.Name, func(relPath string, gitDirInfo fs.FileInfo) error { _, err := storageMgr.MaybeAssignToPartition(ctx, relPath) if err != nil { return fmt.Errorf("maybe assign to partition: %w", err) } return nil }); err != nil { return fmt.Errorf("walking repositories in partition assignment worker: %w", err) } if err := db.Update(func(txn keyvalue.ReadWriter) error { if err := txn.Set(repoAssignedKey, []byte(nil)); err != nil { return fmt.Errorf("set: %w", err) } return nil }); err != nil { return fmt.Errorf("update db with key %s: %w", repoAssignedKey, err) } } return nil }