func()

in internal/gitaly/storage/storagemgr/partition_manager.go [479:654]


func (sm *StorageManager) startPartition(ctx context.Context, partitionID storage.PartitionID) (*partitionHandle, error) {
	for {
		sm.mu.Lock()
		if sm.closed {
			sm.mu.Unlock()
			return nil, ErrPartitionManagerClosed
		}

		var isInactive bool
		// Check whether the partition is currently already open as it is being accessed.
		ptn, ok := sm.activePartitions[partitionID]
		if !ok {
			// If not, check whether we've kept the partitions still open ready for access.
			if ptn, ok = sm.inactivePartitions.Get(partitionID); ok {
				isInactive = true
			}
		}
		if !ok {
			sm.initializingPartitions.Add(1)
			// The partition isn't running yet so we're responsible for setting it up.
			if err := func() (returnedErr error) {
				// Place the partition's state in the map and release the
				// lock so we don't block retrieval of other partitions
				// while setting up this one.
				ptn = &partition{
					id:              partitionID,
					initialized:     make(chan struct{}),
					closing:         make(chan struct{}),
					closed:          make(chan struct{}),
					managerFinished: make(chan struct{}),
					referenceCount:  1,
				}
				sm.activePartitions[partitionID] = ptn
				sm.mu.Unlock()

				defer func() {
					if returnedErr != nil {
						// If the partition setup failed, set the error so the goroutines waiting
						// for the partition to be setup know to abort.
						ptn.errInitialization = returnedErr
						// Remove the partition immediately from the map. Since the setup failed,
						// there's no goroutine running for the partition.
						sm.mu.Lock()
						delete(sm.activePartitions, partitionID)
						sm.mu.Unlock()
					}

					sm.initializingPartitions.Done()
					close(ptn.initialized)
				}()

				relativeStateDir := deriveStateDirectory(partitionID)
				absoluteStateDir := filepath.Join(sm.path, relativeStateDir)
				if err := mkdirAllSync(ctx, sm.syncer, filepath.Dir(absoluteStateDir), mode.Directory); err != nil {
					return fmt.Errorf("create state directory hierarchy: %w", err)
				}

				stagingDir, err := os.MkdirTemp(sm.stagingDirectory, "")
				if err != nil {
					return fmt.Errorf("create staging directory: %w", err)
				}

				logger := sm.logger.WithField("partition_id", partitionID)

				mgr := sm.partitionFactory.New(
					logger,
					partitionID,
					keyvalue.NewPrefixedTransactioner(sm.database, keyPrefixPartition(partitionID)),
					sm.name,
					sm.path,
					absoluteStateDir,
					stagingDir,
				)

				ptn.Partition = mgr

				sm.metrics.partitionsStarted.Inc()
				sm.runningPartitionGoroutines.Add(1)
				go func() {
					if err := mgr.Run(); err != nil {
						logger.WithError(err).WithField("partition_state_directory", relativeStateDir).Error("partition failed")
					}

					// In the event that Partition stops running, a new Partition instance will
					// need to be started in order to continue processing transactions. The partition instance
					// is deleted allowing the next transaction for the repository to create a new partition
					// instance.
					sm.mu.Lock()
					delete(sm.activePartitions, partitionID)
					sm.inactivePartitions.Remove(partitionID)
					sm.closingPartitions[ptn] = struct{}{}
					sm.mu.Unlock()

					close(ptn.managerFinished)

					// If the Partition returned due to an error, it could be that there are still
					// in-flight transactions operating on their staged state. Removing the staging directory
					// while they are active can lead to unexpected errors. Wait with the removal until they've
					// all finished, and only then remove the staging directory.
					//
					// All transactions must eventually finish, so we don't wait on a context cancellation here.
					<-ptn.closing

					// Now that all handles to the partition have been closed, there can be no more transactions
					// using the snapshots, nor can there be new snapshots starting. Close the snapshots that
					// may have been cached.
					if err := mgr.CloseSnapshots(); err != nil {
						logger.WithError(err).Error("failed closing snapshots")
					}

					if err := os.RemoveAll(stagingDir); err != nil {
						logger.WithError(err).Error("failed removing partition's staging directory")
					}

					sm.mu.Lock()
					close(ptn.closed)
					// Remove the partition from the list of closing partitions so that it can be garbage collected.
					delete(sm.closingPartitions, ptn)
					sm.mu.Unlock()

					sm.metrics.partitionsStopped.Inc()
					sm.runningPartitionGoroutines.Done()
				}()

				return nil
			}(); err != nil {
				return nil, fmt.Errorf("start partition: %w", err)
			}

			// We were the one setting up the partition. Return the handle directly as we know it succeeded.
			return newPartitionHandle(sm, ptn), nil
		}

		// Someone else has set up the partition or is in process of doing so.

		if ptn.isClosing() {
			// If the partition is in the process of shutting down, the partition should not be
			// used. The lock is released while waiting for the partition to complete closing as to
			// not block other partitions from processing transactions. Once closing is complete, a
			// new attempt is made to get a valid partition.
			sm.mu.Unlock()
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			case <-ptn.managerFinished:
				continue
			}
		}

		// Increment the reference count and release the lock. We don't want to hold the lock while waiting
		// for the initialization so other partition retrievals can proceed.
		ptn.referenceCount++
		if isInactive {
			// If so, move the partition to the list of active partitions and remove
			// it from the inactive list as it now has a user again.
			sm.activePartitions[partitionID] = ptn
			sm.inactivePartitions.Remove(partitionID)
		}
		sm.mu.Unlock()

		// Wait for the goroutine setting up the partition to finish initializing it. The initialization
		// doesn't take long so we don't wait on context here.
		<-ptn.initialized
		// If there was an error initializing the partition, bail out. We don't reattempt
		// setting up the partition here as it's unlikely to succeed. Subsequent requests
		// that didn't run concurrently with this attempt will retry.
		//
		// We also don't need to worry about the reference count since the goroutine
		// setting up the partition removes it from the map immediately if initialization fails.
		if err := ptn.errInitialization; err != nil {
			return nil, fmt.Errorf("initialize partition: %w", err)
		}

		return newPartitionHandle(sm, ptn), nil
	}
}