func beginTransactionForRepository()

in internal/gitaly/storage/storagemgr/middleware.go [257:412]


func beginTransactionForRepository(ctx context.Context, logger log.Logger, txRegistry *TransactionRegistry, node storage.Node, locator storage.Locator, methodInfo protoregistry.MethodInfo, req proto.Message) (_ transactionalizedRequest, returnedErr error) {
	targetRepo, err := methodInfo.TargetRepo(req)
	if err != nil {
		if errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
			// The above error is returned when the repository field is not set in the request.
			// Return instead the error many tests are asserting to be returned from the handlers.
			return transactionalizedRequest{}, structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet)
		}

		return transactionalizedRequest{}, fmt.Errorf("extract target repository: %w", err)
	}

	if targetRepo.GetGitObjectDirectory() != "" || len(targetRepo.GetGitAlternateObjectDirectories()) > 0 {
		// The object directories should only be configured on a repository coming from a request that
		// was already configured with a quarantine directory and is being looped back to Gitaly from Rails'
		// authorization checks. If that's the case, the request should already be running in scope of a
		// transaction and the repository rewritten to point to the snapshot repository. We thus don't start
		// a new transaction if we encounter this.
		//
		// This property is violated in tests which manually configure the object directory or the alternate
		// object directory. This allows for circumventing the transaction management by configuring the either
		// of the object directories. We'll leave this unaddressed for now and later address this by removing
		// the options to configure object directories and alternates in a request.

		if methodInfo.Operation == protoregistry.OpMutator {
			// Accessor requests may come with quarantine configured from Rails' access checks. Since the
			// RPC that triggered these access checks would already run in a transaction and target a
			// snapshot, we won't start another one. Mutators however are rejected to prevent writes
			// unintentionally targeting the main repository.
			return transactionalizedRequest{}, ErrQuarantineConfiguredOnMutator
		}

		rewrittenReq, err := restoreSnapshotRelativePath(ctx, methodInfo, req)
		if err != nil {
			return transactionalizedRequest{}, fmt.Errorf("restore snapshot relative path: %w", err)
		}

		return nonTransactionalRequest(ctx, rewrittenReq), nil
	}

	// While the PartitionManager already verifies the repository's storage and relative path, it does not
	// return the exact same error messages as some RPCs are testing for at the moment. In order to maintain
	// compatibility with said tests, validate the repository here ahead of time and return the possible error
	// as is.
	if err := locator.ValidateRepository(ctx, targetRepo, storage.WithSkipRepositoryExistenceCheck()); err != nil {
		return transactionalizedRequest{}, err
	}

	var (
		alternateStorageName  string
		alternateRelativePath string
	)
	if hint, err := storage.ExtractPartitioningHint(ctx); err != nil {
		return transactionalizedRequest{}, fmt.Errorf("extract partitioning hint: %w", err)
	} else if hint != "" {
		// In some cases a repository needs to be partitioned with a repository that isn't set as an additional
		// repository in the request. If so, a partitioning hint is sent through the gRPC metadata to provide
		// the relative path of the repository the target repository should be partitioned with.
		alternateStorageName = targetRepo.GetStorageName()
		alternateRelativePath = hint
	} else if req, ok := req.(*gitalypb.CreateForkRequest); ok {
		// We use the source repository of a CreateForkRequest implicitly as a partitioning hint as we know the source
		// repository and the fork must be placed in the same partition in order to join them to the same pool. Source
		// repository is not marked as an additional repository so it doesn't get rewritten by Praefect. The original
		// form is needed in the handler as Gitaly fetches the source repository through Praefect's API which needs
		// the original repository to route the request correctly.
		//
		// The implicit hinting here avoids having to add hints at every callsite. We only do this if no explicit
		// partitioning hint was provided as Praefect provides an explicit hint with CreateForkRequest.
		alternateStorageName = req.GetSourceRepository().GetStorageName()
		alternateRelativePath = req.GetSourceRepository().GetRelativePath()
	}

	// Object pools need to be placed in the same partition as their members. Below we figure out which repository,
	// if any, the target repository of the RPC must be partitioned with. We figure this out using two strategies:
	//
	// The general case is handled by extracting the additional repository from the RPC, and partitioning the target
	// repository of the RPC with the additional repository. Many of the ObjectPoolService's RPCs operate on two
	// repositories. Depending on the RPC, the additional repository is either the object pool itself or a member
	// of the pool.
	//
	// CreateFork is special cased. The fork must partitioned with the source repository in order to successfully
	// link it with the object pool later. The source repository is not tagged as additional repository in the
	// CreateForkRequest. If the request is CreateForkRequest, we extract the source repository and partition the
	// fork with it.
	if additionalRepo, err := methodInfo.AdditionalRepo(req); err != nil {
		if !errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
			return transactionalizedRequest{}, fmt.Errorf("extract additional repository: %w", err)
		}

		// There was no additional repository.
	} else {
		if alternateRelativePath != "" {
			return transactionalizedRequest{}, ErrPartitioningHintAndAdditionalRepoProvided
		}

		alternateStorageName = additionalRepo.GetStorageName()
		alternateRelativePath = additionalRepo.GetRelativePath()
	}

	if alternateStorageName != "" && alternateStorageName != targetRepo.GetStorageName() {
		return transactionalizedRequest{}, ErrRepositoriesInDifferentStorages
	}

	// Begin fails when attempting to access a repository that doesn't exist and doesn't have a partition
	// assignment yet. Repository creating RPCs are an exception and are allowed to create the partition
	// assignment so the transaction can begin, and the repository can be created. The partition assignments
	// are created before the repository is created and are thus not atomic. Failed creations may leave stale
	// partition assignments in the key-value store. We'll later make the repository and partition assignment
	// creations atomic.
	//
	// See issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5957
	_, isRepositoryCreation := repositoryCreatingRPCs[methodInfo.FullMethodName()]

	storageHandle, err := node.GetStorage(targetRepo.GetStorageName())
	if err != nil {
		return transactionalizedRequest{}, fmt.Errorf("get storage: %w", err)
	}

	tx, err := storageHandle.Begin(ctx, storage.TransactionOptions{
		ReadOnly:              isReadOnly(methodInfo),
		RelativePath:          targetRepo.GetRelativePath(),
		AlternateRelativePath: alternateRelativePath,
		AllowPartitionAssignmentWithoutRepository: isRepositoryCreation,
		ForceExclusiveSnapshot:                    forceExclusiveSnapshot[methodInfo.FullMethodName()],
	})
	if err != nil {
		var relativePath relativePathNotFoundError
		if errors.As(err, &relativePath) {
			_, isRepositoryNotFoundAllowed := repositoryNotFoundAllowed[methodInfo.FullMethodName()]
			if isRepositoryNotFoundAllowed {
				return nonTransactionalRequest(ctx, req), nil
			}
			// The partition assigner does not have the storage available and returns thus just an error with the
			// relative path. Convert the error to the usual repository not found error that the RPCs are returning
			// to conform to the API.
			return transactionalizedRequest{}, storage.NewRepositoryNotFoundError(targetRepo.GetStorageName(), string(relativePath))
		}

		return transactionalizedRequest{}, fmt.Errorf("begin transaction: %w", err)
	}
	defer func() {
		if returnedErr != nil {
			if err := tx.Rollback(ctx); err != nil {
				returnedErr = errors.Join(returnedErr, fmt.Errorf("rollback: %w", err))
			}
		}
	}()

	rewrittenReq, err := rewriteRequest(tx, methodInfo, req)
	if err != nil {
		return transactionalizedRequest{}, fmt.Errorf("rewrite request: %w", err)
	}

	return newTransactionalizedRequest(ctx, logger, txRegistry, rewrittenReq, tx), nil
}