internal/gitaly/storage/storagemgr/partition/factory.go (197 lines of code) (raw):

package partition import ( "context" "crypto/sha256" "fmt" "os" "path/filepath" "strings" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/offloading" ) // Factory is factory type that can create new partitions. type Factory struct { cmdFactory gitcmd.CommandFactory repoFactory localrepo.Factory partitionMetrics Metrics logConsumer storage.LogConsumer raftCfg config.Raft raftFactory raftmgr.RaftReplicaFactory offloadingSink *offloading.Sink } // New returns a new Partition instance. func (f Factory) New( logger logger.Logger, partitionID storage.PartitionID, db keyvalue.Transactioner, storageName string, storagePath string, absoluteStateDir string, stagingDir string, ) storagemgr.Partition { // ScopeByStorage takes in context to pass it to the locator. This may be useful in the // RPC handlers to rewrite the storage in the future but never here. Requiring a context // here is more of a structural issue in the code, and is not useful. repoFactory, err := f.repoFactory.ScopeByStorage(context.Background(), storageName) if err != nil { // ScopeByStorage will only error if accessing a non existent storage. This can't // be the case when Factory is used as the storage is already verified. // This is a layering issue in the code, and not a realistic error scenario. We // thus panic out rather than make the error part of the interface. panic(fmt.Errorf("building a partition for a non-existent storage: %q", storageName)) } positionTracker := log.NewPositionTracker() if f.logConsumer != nil { if err := positionTracker.Register(log.ConsumerPosition); err != nil { panic(err) } } var logManager storage.LogManager if f.raftCfg.Enabled { factory := f.raftFactory migrator, err := NewReplicaPartitionMigrator(absoluteStateDir, storageName) if err != nil { panic(fmt.Errorf("creating replica partition migrator: %w", err)) } if err = migrator.Forward(); err != nil { panic(fmt.Errorf("migrating replica partitions: %w", err)) } absoluteStateDir = getRaftPartitionPath(storageName, partitionID, absoluteStateDir) replicaLogStore, err := raftmgr.NewReplicaLogStore( storageName, partitionID, f.raftCfg, db, stagingDir, absoluteStateDir, f.logConsumer, positionTracker, logger, f.partitionMetrics.raft, ) if err != nil { panic(fmt.Errorf("creating raft log store: %w", err)) } raftReplica, err := factory( storageName, partitionID, replicaLogStore, logger, f.partitionMetrics.raft, ) if err != nil { panic(fmt.Errorf("creating raft replica: %w", err)) } logManager = raftReplica } else { logManager = log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer, positionTracker) } parameters := &transactionManagerParameters{ PtnID: partitionID, Logger: logger, DB: db, StorageName: storageName, StoragePath: storagePath, StateDir: absoluteStateDir, StagingDir: stagingDir, OffloadingSink: f.offloadingSink, CmdFactory: f.cmdFactory, RepositoryFactory: repoFactory, Metrics: f.partitionMetrics.Scope(storageName), LogManager: logManager, } return NewTransactionManager(parameters) } // getRaftPartitionPath returns the path where a Raft replica should be stored for a partition. func getRaftPartitionPath(storageName string, partitionID storage.PartitionID, absoluteStateDir string) string { hasher := sha256.New() raftPartitionPath := storage.CreateRaftPartitionPath(storageName, partitionID.String()) hasher.Write([]byte(raftPartitionPath)) partitionsDir, err := getPartitionsDir(absoluteStateDir) if err != nil { panic(fmt.Errorf("determining partitions directory: %w", err)) } return storage.HashRaftPartitionPath(hasher, partitionsDir, raftPartitionPath) } // getPartitionsDir determines the partitions directory derived from the state directory // if there is no /partitions in the path, it creates one from the state directory func getPartitionsDir(stateDir string) (string, error) { var partitionsDir string const partitionsSubdir = "/partitions" index := strings.LastIndex(stateDir, partitionsSubdir) // If "/partitions" is not in the path, use the standard partition computation // Typically for tests a tmp file system is used that does not have this structure if index == -1 { partitionsDir = filepath.Join(stateDir, partitionsSubdir) if err := os.MkdirAll(partitionsDir, mode.Directory); err != nil { return "", fmt.Errorf("failed to create partitions directory %s: %w", partitionsDir, err) } } else { index += len(partitionsSubdir) partitionsDir = stateDir[:index] } return partitionsDir, nil } // NewFactory creates a partition factory with the given components: func NewFactory(opts ...FactoryOption) Factory { var options factoryOptions for _, o := range opts { o(&options) } if options.cmdFactory == nil { panic("cmdFactory is required") } if options.repoFactory == nil { panic("repoFactory is required") } if options.partitionMetrics == nil { panic("partitionMetrics is required") } return Factory{ cmdFactory: options.cmdFactory, repoFactory: *options.repoFactory, partitionMetrics: *options.partitionMetrics, logConsumer: options.logConsumer, raftCfg: options.raftCfg, raftFactory: options.raftFactory, offloadingSink: options.offloadingSink, } } // FactoryOption is a functional option that configures a partition factory instance. type FactoryOption func(*factoryOptions) type factoryOptions struct { cmdFactory gitcmd.CommandFactory repoFactory *localrepo.Factory partitionMetrics *Metrics logConsumer storage.LogConsumer raftCfg config.Raft raftFactory raftmgr.RaftReplicaFactory offloadingSink *offloading.Sink } // WithCmdFactory sets the command factory parameter. // The cmdFactory is mandatory and is used to create Git commands // that the partition uses for repository operations. func WithCmdFactory(cf gitcmd.CommandFactory) FactoryOption { return func(o *factoryOptions) { o.cmdFactory = cf } } // WithRepoFactory sets the repository factory parameter. // The repoFactory is mandatory and is used to create local repository instances. func WithRepoFactory(rf localrepo.Factory) FactoryOption { return func(o *factoryOptions) { o.repoFactory = &rf } } // WithMetrics sets the partition metrics parameter. // The partitionMetrics is mandatory and is used to track partition operations. func WithMetrics(m Metrics) FactoryOption { return func(o *factoryOptions) { o.partitionMetrics = &m } } // WithLogConsumer sets the log consumer parameter. // The logConsumer is optional and is used to consume WAL entries. func WithLogConsumer(lc storage.LogConsumer) FactoryOption { return func(o *factoryOptions) { o.logConsumer = lc } } // WithRaftConfig sets the raft configuration parameter. // The raft configuration is optional and is used to config Raft. func WithRaftConfig(rc config.Raft) FactoryOption { return func(o *factoryOptions) { o.raftCfg = rc } } // WithRaftFactory sets the raft factory parameter. // The raft factory is optional and is used to create Raft replicas for replicated partitions. func WithRaftFactory(rf raftmgr.RaftReplicaFactory) FactoryOption { return func(o *factoryOptions) { o.raftFactory = rf } } // WithOffloadingSink sets the offloading sink. // The offloading sink is optional and is used to upload or download offloaded objects. func WithOffloadingSink(s *offloading.Sink) FactoryOption { return func(o *factoryOptions) { o.offloadingSink = s } }