internal/gitaly/service/repository/fetch_remote.go (209 lines of code) (raw):
package repository
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
var changeTypes = map[gitcmd.RefUpdateType]bool{
gitcmd.RefUpdateTypeFastForwardUpdate: true,
gitcmd.RefUpdateTypeForcedUpdate: true,
gitcmd.RefUpdateTypeTagUpdate: true,
gitcmd.RefUpdateTypeFetched: true,
}
func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteRequest) (*gitalypb.FetchRemoteResponse, error) {
if err := s.validateFetchRemoteRequest(ctx, req); err != nil {
return nil, err
}
if req.GetTimeout() > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.GetTimeout())*time.Second)
defer cancel()
}
tagsChanged, repoChanged, err := s.fetchRemoteAtomic(ctx, req)
if err != nil {
return nil, err
}
return &gitalypb.FetchRemoteResponse{TagsChanged: tagsChanged, RepoChanged: repoChanged}, nil
}
// fetchRemoteAtomic fetches changes from the specified remote repository. To be atomic, fetched
// objects are first quarantined and only migrated before committing the reference transaction.
func (s *server) fetchRemoteAtomic(ctx context.Context, req *gitalypb.FetchRemoteRequest) (_ bool, _ bool, returnedErr error) {
var stdout, stderr bytes.Buffer
opts := localrepo.FetchOpts{
Stdout: &stdout,
Stderr: &stderr,
Force: req.GetForce(),
Prune: !req.GetNoPrune(),
Tags: localrepo.FetchOptsTagsAll,
Verbose: true,
// Transactions are disabled during fetch operation because no references are updated when
// the dry-run option is enabled. Instead, the reference-transaction hook is performed
// during the subsequent execution of `git-update-ref(1)`.
DisableTransactions: true,
// When the `dry-run` option is used with `git-fetch(1)`, Git objects are received without
// performing reference updates. This is used to quarantine objects on the initial fetch and
// migration to occur only during reference update.
DryRun: true,
// The `porcelain` option outputs reference update information from `git-fetch(1) to stdout.
// Since references are not updated during a `git-fetch(1)` dry-run, the reference
// information is used during `git-update-ref(1)` execution to update the appropriate
// corresponding references.
Porcelain: true,
}
if req.GetNoTags() {
opts.Tags = localrepo.FetchOptsTagsNone
}
if err := buildCommandOpts(&opts, req); err != nil {
return false, false, err
}
sshCommand, cleanup, err := gitcmd.BuildSSHInvocation(ctx, s.logger, req.GetSshKey(), req.GetKnownHosts())
if err != nil {
return false, false, err
}
defer cleanup()
opts.Env = append(opts.Env, "GIT_SSH_COMMAND="+sshCommand)
// When performing fetch, objects are received before references are updated. If references fail
// to be updated, unreachable objects could be left in the repository that would need to be
// garbage collected. To be more atomic, a quarantine directory is set up where objects will be
// fetched prior to being migrated to the main repository when reference updates are committed.
quarantineDir, err := quarantine.New(ctx, req.GetRepository(), s.logger, s.locator)
if err != nil {
return false, false, fmt.Errorf("creating quarantine directory: %w", err)
}
quarantineRepo := s.localRepoFactory.Build(quarantineDir.QuarantinedRepo())
if err := quarantineRepo.FetchRemote(ctx, "inmemory", opts); err != nil {
// When `git-fetch(1)` fails to apply all reference updates successfully, the command
// returns `exit status 1`. Despite this error, successful reference updates should still be
// applied during the subsequent `git-update-ref(1)`. To differentiate between regular
// errors and failed reference updates, stderr is checked for an error message. If an error
// message is present, it is determined that an error occurred and the operation halts.
errMsg := stderr.String()
if errMsg != "" {
return false, false, structerr.NewInternal("fetch remote: %q: %w", errMsg, err)
}
// Some errors during the `git-fetch(1)` operation do not print to stderr. If the error
// message is not `exit status 1`, it is determined that the error is unrelated to failed
// reference updates and the operation halts. Otherwise, it is assumed the error is from a
// failed reference update and the operation proceeds to update references.
if err.Error() != "exit status 1" {
return false, false, structerr.NewInternal("fetch remote: %w", err)
}
}
// A repository cannot contain references with F/D (file/directory) conflicts (i.e.
// `refs/heads/foo` and `refs/heads/foo/bar`). If fetching from the remote repository
// results in an F/D conflict, the reference update fails. In some cases a conflicting
// reference may exist locally that does not exist on the remote. In this scenario, if
// outdated references are first pruned locally, the F/D conflict can be avoided. When
// `git-fetch(1)` is performed with the `--prune` and `--dry-run` flags, the pruned
// references are also included in the output without performing any actual reference
// updates. Bulk atomic reference updates performed by `git-update-ref(1)` do not support
// F/D conflicts even if the conflicted reference is being pruned. Therefore, pruned
// references must be updated first in a separate transaction. To accommodate this, two
// different instances of `updateref.Updater` are used to keep the transactions separate.
prunedUpdater, err := updateref.New(ctx, quarantineRepo)
if err != nil {
return false, false, fmt.Errorf("spawning pruned updater: %w", err)
}
defer func() {
if err := prunedUpdater.Close(); err != nil && returnedErr == nil {
returnedErr = fmt.Errorf("cancel pruned updater: %w", err)
}
}()
// All other reference updates can be queued as part of the same transaction.
refUpdater, err := updateref.New(ctx, quarantineRepo)
if err != nil {
return false, false, fmt.Errorf("spawning ref updater: %w", err)
}
defer func() {
if err := refUpdater.Close(); err != nil && returnedErr == nil {
returnedErr = fmt.Errorf("cancel ref updater: %w", err)
}
}()
if err := prunedUpdater.Start(); err != nil {
return false, false, fmt.Errorf("start reference transaction: %w", err)
}
if err := refUpdater.Start(); err != nil {
return false, false, fmt.Errorf("start reference transaction: %w", err)
}
objectHash, err := quarantineRepo.ObjectHash(ctx)
if err != nil {
return false, false, fmt.Errorf("detecting object hash: %w", err)
}
// We always return that the repo and tags did not change as the default.
var tagsChanged, repoChanged bool
// Parse stdout to identify required reference updates. Reference updates are queued to the
// respective updater based on type.
scanner := gitcmd.NewFetchPorcelainScanner(&stdout, objectHash)
for scanner.Scan() {
status := scanner.StatusLine()
switch status.Type {
// Failed and unchanged reference updates do not need to be applied.
case gitcmd.RefUpdateTypeUpdateFailed, gitcmd.RefUpdateTypeUnchanged:
// Queue pruned references in a separate transaction to avoid F/D conflicts.
case gitcmd.RefUpdateTypePruned:
if err := prunedUpdater.Delete(git.ReferenceName(status.Reference)); err != nil {
return false, false, fmt.Errorf("queueing pruned ref for deletion: %w", err)
}
// Queue all other reference updates in the same transaction.
default:
if err := refUpdater.Update(git.ReferenceName(status.Reference), status.NewOID, status.OldOID); err != nil {
return false, false, fmt.Errorf("queueing ref to be updated: %w", err)
}
// While scanning reference updates, check if any tags changed.
if wereTagsChanged(status) {
tagsChanged = true
}
// While scanning reference updates, check if repo was changed.
if changeTypes[status.Type] {
repoChanged = true
}
}
}
if scanner.Err() != nil {
return false, false, fmt.Errorf("scanning fetch output: %w", scanner.Err())
}
// Prepare pruned references in separate transaction to avoid F/D conflicts.
if err := prunedUpdater.Prepare(); err != nil {
return false, false, fmt.Errorf("preparing reference prune: %w", err)
}
// Commit pruned references to complete transaction and apply changes.
if err := prunedUpdater.Commit(); err != nil {
return false, false, fmt.Errorf("committing reference prune: %w", err)
}
// Prepare the remaining queued reference updates.
if err := refUpdater.Prepare(); err != nil {
return false, false, fmt.Errorf("preparing reference update: %w", err)
}
// Before committing the remaining reference updates, fetched objects must be migrated out of
// the quarantine directory.
if err := quarantineDir.Migrate(ctx); err != nil {
return false, false, fmt.Errorf("migrating quarantined objects: %w", err)
}
// Commit the remaining queued reference updates so the changes get applied.
if err := refUpdater.Commit(); err != nil {
return false, false, fmt.Errorf("committing reference update: %w", err)
}
if req.GetCheckTagsChanged() {
return tagsChanged, repoChanged, nil
}
// Historically we've been reporting "tags have changed" unconditionally when the caller didn't set `check_tags_changed`
return true, repoChanged, nil
}
func wereTagsChanged(status gitcmd.FetchPorcelainStatusLine) bool {
return status.Type == gitcmd.RefUpdateTypeTagUpdate ||
(status.Type == gitcmd.RefUpdateTypeFetched && strings.HasPrefix(status.Reference, "refs/tags"))
}
func buildCommandOpts(opts *localrepo.FetchOpts, req *gitalypb.FetchRemoteRequest) error {
remoteURL := req.GetRemoteParams().GetUrl()
var config []gitcmd.ConfigPair
for _, refspec := range getRefspecs(req.GetRemoteParams().GetMirrorRefmaps()) {
config = append(config, gitcmd.ConfigPair{
Key: "remote.inmemory.fetch", Value: refspec,
})
}
if resolvedAddress := req.GetRemoteParams().GetResolvedAddress(); resolvedAddress != "" {
modifiedURL, resolveConfig, err := gitcmd.GetURLAndResolveConfig(remoteURL, resolvedAddress)
if err != nil {
return fmt.Errorf("couldn't get curloptResolve config: %w", err)
}
remoteURL = modifiedURL
config = append(config, resolveConfig...)
}
config = append(config, gitcmd.ConfigPair{Key: "remote.inmemory.url", Value: remoteURL})
if authHeader := req.GetRemoteParams().GetHttpAuthorizationHeader(); authHeader != "" {
config = append(config, gitcmd.ConfigPair{
Key: fmt.Sprintf("http.%s.extraHeader", req.GetRemoteParams().GetUrl()),
Value: "Authorization: " + authHeader,
})
}
opts.CommandOptions = append(opts.CommandOptions, gitcmd.WithConfigEnv(config...))
return nil
}
func (s *server) validateFetchRemoteRequest(ctx context.Context, req *gitalypb.FetchRemoteRequest) error {
if err := s.locator.ValidateRepository(ctx, req.GetRepository()); err != nil {
return structerr.NewInvalidArgument("%w", err)
}
if req.GetRemoteParams() == nil {
return structerr.NewInvalidArgument("missing remote params")
}
if req.GetRemoteParams().GetUrl() == "" {
return structerr.NewInvalidArgument("blank or empty remote URL")
}
return nil
}
func getRefspecs(refmaps []string) []string {
if len(refmaps) == 0 {
return []string{"refs/*:refs/*"}
}
refspecs := make([]string, 0, len(refmaps))
for _, refmap := range refmaps {
switch refmap {
case "all_refs":
// with `all_refs`, the repository is equivalent to the result of `git clone --mirror`
refspecs = append(refspecs, "refs/*:refs/*")
case "heads":
refspecs = append(refspecs, "refs/heads/*:refs/heads/*")
case "tags":
refspecs = append(refspecs, "refs/tags/*:refs/tags/*")
default:
refspecs = append(refspecs, refmap)
}
}
return refspecs
}