in internal/praefect/coordinator.go [355:562]
func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
targetRepo := call.targetRepo
virtualStorage := call.targetRepo.GetStorageName()
change, params, err := getReplicationDetails(call.fullMethodName, call.msg)
if err != nil {
return nil, fmt.Errorf("mutator call: replication details: %w", err)
}
partitioningHintRewritten := false
var additionalRepoRelativePath string
if additionalRepo, err := call.methodInfo.AdditionalRepo(call.msg); errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
// We can land here in two cases: either the message doesn't have an additional
// repository, or the repository wasn't set. The former case is obviously fine, but
// the latter case is fine, too, given that the additional repository may be an
// optional field for some RPC calls. The Gitaly-side RPC handlers should know to
// handle this case anyway, so we just leave the field unset in that case.
// If a partitioning hint is provided in the request metadata, extract and rewrite it to the appropriate
// replicate path. The hint is the @hashed/... relative path to the repo and the rewritten hint is the
// equivalent @cluster/... replica path tto the repo.
partitioningHintRewritten = true
additionalRepoRelativePath, err = storage.ExtractPartitioningHint(ctx)
if err != nil {
return nil, fmt.Errorf("mutator call: extract partitioning hint: %w", err)
}
if additionalRepoRelativePath == "" && call.methodInfo.FullMethodName() == gitalypb.RepositoryService_CreateFork_FullMethodName {
// The source repository is not marked as an additional repository so extract it manually.
// Gitaly needs the rewritten relative path as a partitioning hint.
additionalRepoRelativePath = call.msg.(*gitalypb.CreateForkRequest).GetSourceRepository().GetRelativePath()
}
} else if err != nil {
return nil, structerr.NewInvalidArgument("%w", err)
} else {
// We do not support resolving multiple different repositories that reside on
// different virtual storages. This kind of makes sense from a technical point of
// view as Praefect cannot guarantee to resolve both virtual storages. So for the
// time being we accept this restriction and handle it explicitly.
//
// Note that this is the same condition as in `rewrittenRepositoryMessage()`. This
// is done so that we detect such erroneous requests before we try to connect to the
// target node, which allows us to return a proper error to the user that indicates
// the underlying issue instead of an unrelated symptom.
//
// This limitation may be lifted in the future.
if virtualStorage != additionalRepo.GetStorageName() {
return nil, structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported")
}
additionalRepoRelativePath = additionalRepo.GetRelativePath()
}
var route RepositoryMutatorRoute
switch change {
case datastore.CreateRepo:
route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.GetRelativePath(), additionalRepoRelativePath)
// These RPCs are repository upserts. They should work if the
// repository ID already exists in Praefect.
if (call.fullMethodName == "/gitaly.RepositoryService/ReplicateRepository" ||
call.fullMethodName == "/gitaly.RepositoryService/RestoreRepository") &&
errors.Is(err, datastore.ErrRepositoryAlreadyExists) {
change = datastore.UpdateRepo
route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.GetRelativePath(), additionalRepoRelativePath)
}
if err != nil {
return nil, fmt.Errorf("route repository creation: %w", err)
}
default:
route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.GetRelativePath(), additionalRepoRelativePath)
if err != nil {
if errors.Is(err, ErrRepositoryReadOnly) {
return nil, err
}
return nil, fmt.Errorf("mutator call: route repository mutator: %w", err)
}
}
route.addLogFields(ctx)
primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage, route.ReplicaPath, route.AdditionalReplicaPath)
if err != nil {
return nil, fmt.Errorf("mutator call: rewrite storage: %w", err)
}
if partitioningHintRewritten && additionalRepoRelativePath != "" {
// Send the rewritten path as partitioning hint to Gitaly.
ctx = storage.ContextWithPartitioningHint(ctx, route.AdditionalReplicaPath)
}
var finalizers []func() error
primaryDest := proxy.Destination{
Ctx: streamParametersContext(ctx),
Conn: route.Primary.Connection,
Msg: primaryMessage,
}
var secondaryDests []proxy.Destination
if shouldUseTransaction(ctx, call.fullMethodName) {
c.votersMetric.WithLabelValues(virtualStorage).Observe(float64(1 + len(route.Secondaries)))
transaction, transactionCleanup, err := c.registerTransaction(ctx, route.Primary, route.Secondaries)
if err != nil {
return nil, fmt.Errorf("%w: %v %v", err, route.Primary, route.Secondaries)
}
finalizers = append(finalizers, transactionCleanup)
nodeErrors := &nodeErrors{
errByNode: make(map[string]error),
}
injectedCtx, err := txinfo.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true)
if err != nil {
return nil, err
}
primaryDest.Ctx = streamParametersContext(injectedCtx)
primaryDest.ErrHandler = func(err error) error {
nodeErrors.Lock()
defer nodeErrors.Unlock()
nodeErrors.errByNode[route.Primary.Storage] = err
return err
}
for _, secondary := range route.Secondaries {
secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage, route.ReplicaPath, route.AdditionalReplicaPath)
if err != nil {
return nil, err
}
injectedCtx, err := txinfo.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false)
if err != nil {
return nil, err
}
secondaryDests = append(secondaryDests, proxy.Destination{
Ctx: streamParametersContext(injectedCtx),
Conn: secondary.Connection,
Msg: secondaryMsg,
ErrHandler: func(err error) error {
nodeErrors.Lock()
defer nodeErrors.Unlock()
nodeErrors.errByNode[secondary.Storage] = err
c.logger.WithError(err).
ErrorContext(ctx, "proxying to secondary failed")
// Cancels failed node's voter in its current subtransaction.
// Also updates internal state of subtransaction to fail and
// release blocked voters if quorum becomes impossible.
if err := c.txMgr.CancelTransactionNodeVoter(transaction.ID(), secondary.Storage); err != nil {
c.logger.WithError(err).
ErrorContext(ctx, "canceling secondary voter failed")
}
// The error is ignored, so we do not abort transactions
// which are ongoing and may succeed even with a subset
// of secondaries bailing out.
return nil
},
})
}
finalizers = append(finalizers,
c.createTransactionFinalizer(ctx, transaction, route, virtualStorage,
targetRepo, change, params, call.fullMethodName, nodeErrors),
)
} else {
finalizers = append(finalizers,
c.newRequestFinalizer(
ctx,
route.RepositoryID,
virtualStorage,
targetRepo,
route.ReplicaPath,
route.Primary.Storage,
nil,
append(routerNodesToStorages(route.Secondaries), route.ReplicationTargets...),
change,
params,
call.fullMethodName,
))
}
reqFinalizer := func() error {
var firstErr error
for _, finalizer := range finalizers {
err := finalizer()
if err == nil {
continue
}
if firstErr == nil {
firstErr = err
continue
}
c.logger.
WithError(err).
ErrorContext(ctx, "coordinator proxy stream finalizer failure")
}
return firstErr
}
return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil), nil
}