in internal/gitaly/storage/storagemgr/middleware.go [257:412]
func beginTransactionForRepository(ctx context.Context, logger log.Logger, txRegistry *TransactionRegistry, node storage.Node, locator storage.Locator, methodInfo protoregistry.MethodInfo, req proto.Message) (_ transactionalizedRequest, returnedErr error) {
targetRepo, err := methodInfo.TargetRepo(req)
if err != nil {
if errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
// The above error is returned when the repository field is not set in the request.
// Return instead the error many tests are asserting to be returned from the handlers.
return transactionalizedRequest{}, structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet)
}
return transactionalizedRequest{}, fmt.Errorf("extract target repository: %w", err)
}
if targetRepo.GetGitObjectDirectory() != "" || len(targetRepo.GetGitAlternateObjectDirectories()) > 0 {
// The object directories should only be configured on a repository coming from a request that
// was already configured with a quarantine directory and is being looped back to Gitaly from Rails'
// authorization checks. If that's the case, the request should already be running in scope of a
// transaction and the repository rewritten to point to the snapshot repository. We thus don't start
// a new transaction if we encounter this.
//
// This property is violated in tests which manually configure the object directory or the alternate
// object directory. This allows for circumventing the transaction management by configuring the either
// of the object directories. We'll leave this unaddressed for now and later address this by removing
// the options to configure object directories and alternates in a request.
if methodInfo.Operation == protoregistry.OpMutator {
// Accessor requests may come with quarantine configured from Rails' access checks. Since the
// RPC that triggered these access checks would already run in a transaction and target a
// snapshot, we won't start another one. Mutators however are rejected to prevent writes
// unintentionally targeting the main repository.
return transactionalizedRequest{}, ErrQuarantineConfiguredOnMutator
}
rewrittenReq, err := restoreSnapshotRelativePath(ctx, methodInfo, req)
if err != nil {
return transactionalizedRequest{}, fmt.Errorf("restore snapshot relative path: %w", err)
}
return nonTransactionalRequest(ctx, rewrittenReq), nil
}
// While the PartitionManager already verifies the repository's storage and relative path, it does not
// return the exact same error messages as some RPCs are testing for at the moment. In order to maintain
// compatibility with said tests, validate the repository here ahead of time and return the possible error
// as is.
if err := locator.ValidateRepository(ctx, targetRepo, storage.WithSkipRepositoryExistenceCheck()); err != nil {
return transactionalizedRequest{}, err
}
var (
alternateStorageName string
alternateRelativePath string
)
if hint, err := storage.ExtractPartitioningHint(ctx); err != nil {
return transactionalizedRequest{}, fmt.Errorf("extract partitioning hint: %w", err)
} else if hint != "" {
// In some cases a repository needs to be partitioned with a repository that isn't set as an additional
// repository in the request. If so, a partitioning hint is sent through the gRPC metadata to provide
// the relative path of the repository the target repository should be partitioned with.
alternateStorageName = targetRepo.GetStorageName()
alternateRelativePath = hint
} else if req, ok := req.(*gitalypb.CreateForkRequest); ok {
// We use the source repository of a CreateForkRequest implicitly as a partitioning hint as we know the source
// repository and the fork must be placed in the same partition in order to join them to the same pool. Source
// repository is not marked as an additional repository so it doesn't get rewritten by Praefect. The original
// form is needed in the handler as Gitaly fetches the source repository through Praefect's API which needs
// the original repository to route the request correctly.
//
// The implicit hinting here avoids having to add hints at every callsite. We only do this if no explicit
// partitioning hint was provided as Praefect provides an explicit hint with CreateForkRequest.
alternateStorageName = req.GetSourceRepository().GetStorageName()
alternateRelativePath = req.GetSourceRepository().GetRelativePath()
}
// Object pools need to be placed in the same partition as their members. Below we figure out which repository,
// if any, the target repository of the RPC must be partitioned with. We figure this out using two strategies:
//
// The general case is handled by extracting the additional repository from the RPC, and partitioning the target
// repository of the RPC with the additional repository. Many of the ObjectPoolService's RPCs operate on two
// repositories. Depending on the RPC, the additional repository is either the object pool itself or a member
// of the pool.
//
// CreateFork is special cased. The fork must partitioned with the source repository in order to successfully
// link it with the object pool later. The source repository is not tagged as additional repository in the
// CreateForkRequest. If the request is CreateForkRequest, we extract the source repository and partition the
// fork with it.
if additionalRepo, err := methodInfo.AdditionalRepo(req); err != nil {
if !errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
return transactionalizedRequest{}, fmt.Errorf("extract additional repository: %w", err)
}
// There was no additional repository.
} else {
if alternateRelativePath != "" {
return transactionalizedRequest{}, ErrPartitioningHintAndAdditionalRepoProvided
}
alternateStorageName = additionalRepo.GetStorageName()
alternateRelativePath = additionalRepo.GetRelativePath()
}
if alternateStorageName != "" && alternateStorageName != targetRepo.GetStorageName() {
return transactionalizedRequest{}, ErrRepositoriesInDifferentStorages
}
// Begin fails when attempting to access a repository that doesn't exist and doesn't have a partition
// assignment yet. Repository creating RPCs are an exception and are allowed to create the partition
// assignment so the transaction can begin, and the repository can be created. The partition assignments
// are created before the repository is created and are thus not atomic. Failed creations may leave stale
// partition assignments in the key-value store. We'll later make the repository and partition assignment
// creations atomic.
//
// See issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5957
_, isRepositoryCreation := repositoryCreatingRPCs[methodInfo.FullMethodName()]
storageHandle, err := node.GetStorage(targetRepo.GetStorageName())
if err != nil {
return transactionalizedRequest{}, fmt.Errorf("get storage: %w", err)
}
tx, err := storageHandle.Begin(ctx, storage.TransactionOptions{
ReadOnly: isReadOnly(methodInfo),
RelativePath: targetRepo.GetRelativePath(),
AlternateRelativePath: alternateRelativePath,
AllowPartitionAssignmentWithoutRepository: isRepositoryCreation,
ForceExclusiveSnapshot: forceExclusiveSnapshot[methodInfo.FullMethodName()],
})
if err != nil {
var relativePath relativePathNotFoundError
if errors.As(err, &relativePath) {
_, isRepositoryNotFoundAllowed := repositoryNotFoundAllowed[methodInfo.FullMethodName()]
if isRepositoryNotFoundAllowed {
return nonTransactionalRequest(ctx, req), nil
}
// The partition assigner does not have the storage available and returns thus just an error with the
// relative path. Convert the error to the usual repository not found error that the RPCs are returning
// to conform to the API.
return transactionalizedRequest{}, storage.NewRepositoryNotFoundError(targetRepo.GetStorageName(), string(relativePath))
}
return transactionalizedRequest{}, fmt.Errorf("begin transaction: %w", err)
}
defer func() {
if returnedErr != nil {
if err := tx.Rollback(ctx); err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("rollback: %w", err))
}
}
}()
rewrittenReq, err := rewriteRequest(tx, methodInfo, req)
if err != nil {
return transactionalizedRequest{}, fmt.Errorf("rewrite request: %w", err)
}
return newTransactionalizedRequest(ctx, logger, txRegistry, rewrittenReq, tx), nil
}