internal/gitaly/service/remote/update_remote_mirror.go (216 lines of code) (raw):
package remote
import (
"context"
"errors"
"fmt"
"io"
"regexp"
"strings"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
const (
// pushBatchSize is the maximum number of branches to push in a single push call.
pushBatchSize = 10
// maxDivergentRefs is the maximum number of divergent refs to return in UpdateRemoteMirror's
// response.
maxDivergentRefs = 100
)
func (s *server) UpdateRemoteMirror(stream gitalypb.RemoteService_UpdateRemoteMirrorServer) error {
firstRequest, err := stream.Recv()
if err != nil {
return structerr.NewInternal("receive first request: %w", err)
}
if err = validateUpdateRemoteMirrorRequest(stream.Context(), s.locator, firstRequest); err != nil {
return structerr.NewInvalidArgument("%w", err)
}
if err := s.updateRemoteMirror(stream, firstRequest); err != nil {
return structerr.NewInternal("%w", err)
}
return nil
}
func (s *server) updateRemoteMirror(stream gitalypb.RemoteService_UpdateRemoteMirrorServer, firstRequest *gitalypb.UpdateRemoteMirrorRequest) error {
ctx := stream.Context()
branchMatchers := firstRequest.GetOnlyBranchesMatching()
for {
req, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("receive: %w", err)
}
branchMatchers = append(branchMatchers, req.GetOnlyBranchesMatching()...)
}
referenceMatcher, err := newReferenceMatcher(branchMatchers)
if err != nil {
return fmt.Errorf("create reference matcher: %w", err)
}
repo := s.localRepoFactory.Build(firstRequest.GetRepository())
remote := firstRequest.GetRemote()
remoteSuffix, err := text.RandomHex(8)
if err != nil {
return fmt.Errorf("generating remote suffix: %w", err)
}
remoteName := "inmemory-" + remoteSuffix
var remoteConfig []gitcmd.ConfigPair
remoteURL := remote.GetUrl()
if resolvedAddress := remote.GetResolvedAddress(); resolvedAddress != "" {
modifiedURL, resolveConfig, err := gitcmd.GetURLAndResolveConfig(remoteURL, resolvedAddress)
if err != nil {
return fmt.Errorf("couldn't get curloptResolve config: %w", err)
}
remoteURL = modifiedURL
remoteConfig = append(remoteConfig, resolveConfig...)
}
remoteConfig = append(remoteConfig, gitcmd.ConfigPair{
Key: fmt.Sprintf("remote.%s.url", remoteName), Value: remoteURL,
})
if authHeader := remote.GetHttpAuthorizationHeader(); authHeader != "" {
remoteConfig = append(remoteConfig, gitcmd.ConfigPair{
Key: fmt.Sprintf("http.%s.extraHeader", remote.GetUrl()),
Value: "Authorization: " + authHeader,
})
}
sshCommand, clean, err := gitcmd.BuildSSHInvocation(ctx, s.logger, firstRequest.GetSshKey(), firstRequest.GetKnownHosts())
if err != nil {
return fmt.Errorf("build ssh invocation: %w", err)
}
defer clean()
remoteRefsSlice, err := repo.GetRemoteReferences(ctx, remoteName,
localrepo.WithPatterns("refs/heads/*", "refs/tags/*"),
localrepo.WithConfig(remoteConfig...),
localrepo.WithSSHCommand(sshCommand),
)
if err != nil {
return fmt.Errorf("get remote references: %w", err)
}
localRefs, err := repo.GetReferences(ctx, "refs/heads/", "refs/tags/")
if err != nil {
return fmt.Errorf("get local references: %w", err)
}
defaultBranch, err := repo.HeadReference(ctx)
if err != nil {
return fmt.Errorf("get default branch: %w", err)
}
remoteRefs := make(map[git.ReferenceName]string, len(remoteRefsSlice))
for _, ref := range remoteRefsSlice {
if ref.IsSymbolic {
// There should be no symbolic refs in refs/heads/ or refs/tags, so we'll just ignore
// them if something has placed one there.
continue
}
remoteRefs[ref.Name] = ref.Target
}
var defaultBranchExists bool
var divergentRefs [][]byte
toUpdate := map[git.ReferenceName]string{}
for _, localRef := range localRefs {
if localRef.Name == defaultBranch {
defaultBranchExists = true
}
if localRef.IsSymbolic {
continue
}
remoteTarget, ok := remoteRefs[localRef.Name]
if !ok {
// ref does not exist on the mirror, it should be created
toUpdate[localRef.Name] = localRef.Target
delete(remoteRefs, localRef.Name)
continue
}
if remoteTarget == localRef.Target {
// ref is up to date on the mirror
delete(remoteRefs, localRef.Name)
continue
}
if firstRequest.GetKeepDivergentRefs() {
isAncestor, err := repo.IsAncestor(ctx, git.Revision(remoteTarget), git.Revision(localRef.Target))
if err != nil && !errors.Is(err, localrepo.InvalidCommitError(remoteTarget)) {
return fmt.Errorf("checking for ancestry: %w", err)
}
if !isAncestor {
// The mirror's reference has diverged from the local ref, or the mirror contains a commit
// which is not present in the local repository.
if referenceMatcher.MatchString(localRef.Name.String()) && len(divergentRefs) < maxDivergentRefs {
// diverged branches on the mirror are only included in the response if they match
// one of the branches in the selector
divergentRefs = append(divergentRefs, []byte(localRef.Name))
}
delete(remoteRefs, localRef.Name)
continue
}
}
// the mirror's ref does not match ours, we should update it.
toUpdate[localRef.Name] = localRef.Target
delete(remoteRefs, localRef.Name)
}
toDelete := remoteRefs
if !defaultBranchExists || firstRequest.GetKeepDivergentRefs() {
toDelete = map[git.ReferenceName]string{}
}
for remoteRef, remoteCommitOID := range toDelete {
isAncestor, err := repo.IsAncestor(ctx, git.Revision(remoteCommitOID), defaultBranch.Revision())
if err != nil && !errors.Is(err, localrepo.InvalidCommitError(remoteCommitOID)) {
return fmt.Errorf("checking for default branch ancestry: %w", err)
}
if isAncestor {
continue
}
// The commit in the extra branch in the remote repository has not been merged in to the
// local repository's default branch. Keep it to avoid losing work.
delete(toDelete, remoteRef)
}
var refspecs []string
for prefix, references := range map[string]map[git.ReferenceName]string{
"": toUpdate, ":": toDelete,
} {
for reference := range references {
if !referenceMatcher.MatchString(reference.String()) {
continue
}
refspecs = append(refspecs, prefix+reference.String())
if reference == defaultBranch {
// The default branch needs to be pushed in the first batch of refspecs as some features
// depend on it existing in the repository. The default branch may not exist in the repo
// yet if this is the first mirroring push.
last := len(refspecs) - 1
refspecs[0], refspecs[last] = refspecs[last], refspecs[0]
}
}
}
for len(refspecs) > 0 {
batch := refspecs
if len(refspecs) > pushBatchSize {
batch = refspecs[:pushBatchSize]
}
refspecs = refspecs[len(batch):]
if err := repo.Push(ctx, remoteName, batch, localrepo.PushOptions{
SSHCommand: sshCommand,
Force: !firstRequest.GetKeepDivergentRefs(),
Config: remoteConfig,
}); err != nil {
return fmt.Errorf("push to mirror: %w", err)
}
}
return stream.SendAndClose(&gitalypb.UpdateRemoteMirrorResponse{DivergentRefs: divergentRefs})
}
// newReferenceMatcher returns a regexp which matches references that should
// be updated in the mirror repository. Tags are always matched successfully.
// branchMatchers optionally contain patterns that are used to match branches.
// The patterns should only include the branch name without the `refs/heads/`
// prefix. "*" can be used as a wildcard in the patterns. If no branchMatchers
// are specified, all branches are matched successfully.
func newReferenceMatcher(branchMatchers [][]byte) (*regexp.Regexp, error) {
sb := &strings.Builder{}
sb.WriteString("^refs/tags/.+$|^refs/heads/(")
for i, expression := range branchMatchers {
segments := strings.Split(string(expression), "*")
for i := range segments {
segments[i] = regexp.QuoteMeta(segments[i])
}
sb.WriteString(strings.Join(segments, ".*"))
if i < len(branchMatchers)-1 {
sb.WriteString("|")
}
}
if len(branchMatchers) == 0 {
sb.WriteString(".+")
}
sb.WriteString(")$")
return regexp.Compile(sb.String())
}
func validateUpdateRemoteMirrorRequest(ctx context.Context, locator storage.Locator, req *gitalypb.UpdateRemoteMirrorRequest) error {
if err := locator.ValidateRepository(ctx, req.GetRepository()); err != nil {
return err
}
if req.GetRemote() == nil {
return errors.New("missing Remote")
}
if req.GetRemote().GetUrl() == "" {
return errors.New("remote is missing URL")
}
return nil
}