in internal/gitaly/storage/storagemgr/partition_manager.go [479:654]
func (sm *StorageManager) startPartition(ctx context.Context, partitionID storage.PartitionID) (*partitionHandle, error) {
for {
sm.mu.Lock()
if sm.closed {
sm.mu.Unlock()
return nil, ErrPartitionManagerClosed
}
var isInactive bool
// Check whether the partition is currently already open as it is being accessed.
ptn, ok := sm.activePartitions[partitionID]
if !ok {
// If not, check whether we've kept the partitions still open ready for access.
if ptn, ok = sm.inactivePartitions.Get(partitionID); ok {
isInactive = true
}
}
if !ok {
sm.initializingPartitions.Add(1)
// The partition isn't running yet so we're responsible for setting it up.
if err := func() (returnedErr error) {
// Place the partition's state in the map and release the
// lock so we don't block retrieval of other partitions
// while setting up this one.
ptn = &partition{
id: partitionID,
initialized: make(chan struct{}),
closing: make(chan struct{}),
closed: make(chan struct{}),
managerFinished: make(chan struct{}),
referenceCount: 1,
}
sm.activePartitions[partitionID] = ptn
sm.mu.Unlock()
defer func() {
if returnedErr != nil {
// If the partition setup failed, set the error so the goroutines waiting
// for the partition to be setup know to abort.
ptn.errInitialization = returnedErr
// Remove the partition immediately from the map. Since the setup failed,
// there's no goroutine running for the partition.
sm.mu.Lock()
delete(sm.activePartitions, partitionID)
sm.mu.Unlock()
}
sm.initializingPartitions.Done()
close(ptn.initialized)
}()
relativeStateDir := deriveStateDirectory(partitionID)
absoluteStateDir := filepath.Join(sm.path, relativeStateDir)
if err := mkdirAllSync(ctx, sm.syncer, filepath.Dir(absoluteStateDir), mode.Directory); err != nil {
return fmt.Errorf("create state directory hierarchy: %w", err)
}
stagingDir, err := os.MkdirTemp(sm.stagingDirectory, "")
if err != nil {
return fmt.Errorf("create staging directory: %w", err)
}
logger := sm.logger.WithField("partition_id", partitionID)
mgr := sm.partitionFactory.New(
logger,
partitionID,
keyvalue.NewPrefixedTransactioner(sm.database, keyPrefixPartition(partitionID)),
sm.name,
sm.path,
absoluteStateDir,
stagingDir,
)
ptn.Partition = mgr
sm.metrics.partitionsStarted.Inc()
sm.runningPartitionGoroutines.Add(1)
go func() {
if err := mgr.Run(); err != nil {
logger.WithError(err).WithField("partition_state_directory", relativeStateDir).Error("partition failed")
}
// In the event that Partition stops running, a new Partition instance will
// need to be started in order to continue processing transactions. The partition instance
// is deleted allowing the next transaction for the repository to create a new partition
// instance.
sm.mu.Lock()
delete(sm.activePartitions, partitionID)
sm.inactivePartitions.Remove(partitionID)
sm.closingPartitions[ptn] = struct{}{}
sm.mu.Unlock()
close(ptn.managerFinished)
// If the Partition returned due to an error, it could be that there are still
// in-flight transactions operating on their staged state. Removing the staging directory
// while they are active can lead to unexpected errors. Wait with the removal until they've
// all finished, and only then remove the staging directory.
//
// All transactions must eventually finish, so we don't wait on a context cancellation here.
<-ptn.closing
// Now that all handles to the partition have been closed, there can be no more transactions
// using the snapshots, nor can there be new snapshots starting. Close the snapshots that
// may have been cached.
if err := mgr.CloseSnapshots(); err != nil {
logger.WithError(err).Error("failed closing snapshots")
}
if err := os.RemoveAll(stagingDir); err != nil {
logger.WithError(err).Error("failed removing partition's staging directory")
}
sm.mu.Lock()
close(ptn.closed)
// Remove the partition from the list of closing partitions so that it can be garbage collected.
delete(sm.closingPartitions, ptn)
sm.mu.Unlock()
sm.metrics.partitionsStopped.Inc()
sm.runningPartitionGoroutines.Done()
}()
return nil
}(); err != nil {
return nil, fmt.Errorf("start partition: %w", err)
}
// We were the one setting up the partition. Return the handle directly as we know it succeeded.
return newPartitionHandle(sm, ptn), nil
}
// Someone else has set up the partition or is in process of doing so.
if ptn.isClosing() {
// If the partition is in the process of shutting down, the partition should not be
// used. The lock is released while waiting for the partition to complete closing as to
// not block other partitions from processing transactions. Once closing is complete, a
// new attempt is made to get a valid partition.
sm.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ptn.managerFinished:
continue
}
}
// Increment the reference count and release the lock. We don't want to hold the lock while waiting
// for the initialization so other partition retrievals can proceed.
ptn.referenceCount++
if isInactive {
// If so, move the partition to the list of active partitions and remove
// it from the inactive list as it now has a user again.
sm.activePartitions[partitionID] = ptn
sm.inactivePartitions.Remove(partitionID)
}
sm.mu.Unlock()
// Wait for the goroutine setting up the partition to finish initializing it. The initialization
// doesn't take long so we don't wait on context here.
<-ptn.initialized
// If there was an error initializing the partition, bail out. We don't reattempt
// setting up the partition here as it's unlikely to succeed. Subsequent requests
// that didn't run concurrently with this attempt will retry.
//
// We also don't need to worry about the reference count since the goroutine
// setting up the partition removes it from the map immediately if initialization fails.
if err := ptn.errInitialization; err != nil {
return nil, fmt.Errorf("initialize partition: %w", err)
}
return newPartitionHandle(sm, ptn), nil
}
}