func()

in internal/praefect/coordinator.go [355:562]


func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
	targetRepo := call.targetRepo
	virtualStorage := call.targetRepo.GetStorageName()

	change, params, err := getReplicationDetails(call.fullMethodName, call.msg)
	if err != nil {
		return nil, fmt.Errorf("mutator call: replication details: %w", err)
	}

	partitioningHintRewritten := false
	var additionalRepoRelativePath string
	if additionalRepo, err := call.methodInfo.AdditionalRepo(call.msg); errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) {
		// We can land here in two cases: either the message doesn't have an additional
		// repository, or the repository wasn't set. The former case is obviously fine, but
		// the latter case is fine, too, given that the additional repository may be an
		// optional field for some RPC calls. The Gitaly-side RPC handlers should know to
		// handle this case anyway, so we just leave the field unset in that case.

		// If a partitioning hint is provided in the request metadata, extract and rewrite it to the appropriate
		// replicate path. The hint is the @hashed/... relative path to the repo and the rewritten hint is the
		// equivalent @cluster/... replica path tto the repo.
		partitioningHintRewritten = true
		additionalRepoRelativePath, err = storage.ExtractPartitioningHint(ctx)
		if err != nil {
			return nil, fmt.Errorf("mutator call: extract partitioning hint: %w", err)
		}

		if additionalRepoRelativePath == "" && call.methodInfo.FullMethodName() == gitalypb.RepositoryService_CreateFork_FullMethodName {
			// The source repository is not marked as an additional repository so extract it manually.
			// Gitaly needs the rewritten relative path as a partitioning hint.
			additionalRepoRelativePath = call.msg.(*gitalypb.CreateForkRequest).GetSourceRepository().GetRelativePath()
		}
	} else if err != nil {
		return nil, structerr.NewInvalidArgument("%w", err)
	} else {
		// We do not support resolving multiple different repositories that reside on
		// different virtual storages. This kind of makes sense from a technical point of
		// view as Praefect cannot guarantee to resolve both virtual storages. So for the
		// time being we accept this restriction and handle it explicitly.
		//
		// Note that this is the same condition as in `rewrittenRepositoryMessage()`. This
		// is done so that we detect such erroneous requests before we try to connect to the
		// target node, which allows us to return a proper error to the user that indicates
		// the underlying issue instead of an unrelated symptom.
		//
		// This limitation may be lifted in the future.
		if virtualStorage != additionalRepo.GetStorageName() {
			return nil, structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported")
		}

		additionalRepoRelativePath = additionalRepo.GetRelativePath()
	}

	var route RepositoryMutatorRoute
	switch change {
	case datastore.CreateRepo:
		route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.GetRelativePath(), additionalRepoRelativePath)

		// These RPCs are repository upserts. They should work if the
		// repository ID already exists in Praefect.
		if (call.fullMethodName == "/gitaly.RepositoryService/ReplicateRepository" ||
			call.fullMethodName == "/gitaly.RepositoryService/RestoreRepository") &&
			errors.Is(err, datastore.ErrRepositoryAlreadyExists) {
			change = datastore.UpdateRepo
			route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.GetRelativePath(), additionalRepoRelativePath)
		}
		if err != nil {
			return nil, fmt.Errorf("route repository creation: %w", err)
		}
	default:
		route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.GetRelativePath(), additionalRepoRelativePath)
		if err != nil {
			if errors.Is(err, ErrRepositoryReadOnly) {
				return nil, err
			}

			return nil, fmt.Errorf("mutator call: route repository mutator: %w", err)
		}
	}

	route.addLogFields(ctx)

	primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage, route.ReplicaPath, route.AdditionalReplicaPath)
	if err != nil {
		return nil, fmt.Errorf("mutator call: rewrite storage: %w", err)
	}

	if partitioningHintRewritten && additionalRepoRelativePath != "" {
		// Send the rewritten path as partitioning hint to Gitaly.
		ctx = storage.ContextWithPartitioningHint(ctx, route.AdditionalReplicaPath)
	}

	var finalizers []func() error

	primaryDest := proxy.Destination{
		Ctx:  streamParametersContext(ctx),
		Conn: route.Primary.Connection,
		Msg:  primaryMessage,
	}

	var secondaryDests []proxy.Destination

	if shouldUseTransaction(ctx, call.fullMethodName) {
		c.votersMetric.WithLabelValues(virtualStorage).Observe(float64(1 + len(route.Secondaries)))

		transaction, transactionCleanup, err := c.registerTransaction(ctx, route.Primary, route.Secondaries)
		if err != nil {
			return nil, fmt.Errorf("%w: %v %v", err, route.Primary, route.Secondaries)
		}
		finalizers = append(finalizers, transactionCleanup)

		nodeErrors := &nodeErrors{
			errByNode: make(map[string]error),
		}

		injectedCtx, err := txinfo.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true)
		if err != nil {
			return nil, err
		}
		primaryDest.Ctx = streamParametersContext(injectedCtx)
		primaryDest.ErrHandler = func(err error) error {
			nodeErrors.Lock()
			defer nodeErrors.Unlock()
			nodeErrors.errByNode[route.Primary.Storage] = err
			return err
		}

		for _, secondary := range route.Secondaries {
			secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage, route.ReplicaPath, route.AdditionalReplicaPath)
			if err != nil {
				return nil, err
			}

			injectedCtx, err := txinfo.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false)
			if err != nil {
				return nil, err
			}

			secondaryDests = append(secondaryDests, proxy.Destination{
				Ctx:  streamParametersContext(injectedCtx),
				Conn: secondary.Connection,
				Msg:  secondaryMsg,
				ErrHandler: func(err error) error {
					nodeErrors.Lock()
					defer nodeErrors.Unlock()
					nodeErrors.errByNode[secondary.Storage] = err

					c.logger.WithError(err).
						ErrorContext(ctx, "proxying to secondary failed")

					// Cancels failed node's voter in its current subtransaction.
					// Also updates internal state of subtransaction to fail and
					// release blocked voters if quorum becomes impossible.
					if err := c.txMgr.CancelTransactionNodeVoter(transaction.ID(), secondary.Storage); err != nil {
						c.logger.WithError(err).
							ErrorContext(ctx, "canceling secondary voter failed")
					}

					// The error is ignored, so we do not abort transactions
					// which are ongoing and may succeed even with a subset
					// of secondaries bailing out.
					return nil
				},
			})
		}

		finalizers = append(finalizers,
			c.createTransactionFinalizer(ctx, transaction, route, virtualStorage,
				targetRepo, change, params, call.fullMethodName, nodeErrors),
		)
	} else {
		finalizers = append(finalizers,
			c.newRequestFinalizer(
				ctx,
				route.RepositoryID,
				virtualStorage,
				targetRepo,
				route.ReplicaPath,
				route.Primary.Storage,
				nil,
				append(routerNodesToStorages(route.Secondaries), route.ReplicationTargets...),
				change,
				params,
				call.fullMethodName,
			))
	}

	reqFinalizer := func() error {
		var firstErr error
		for _, finalizer := range finalizers {
			err := finalizer()
			if err == nil {
				continue
			}

			if firstErr == nil {
				firstErr = err
				continue
			}

			c.logger.
				WithError(err).
				ErrorContext(ctx, "coordinator proxy stream finalizer failure")
		}
		return firstErr
	}
	return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil), nil
}