internal/gitaly/service/repository/replicate.go (338 lines of code) (raw):
package repository
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"gitlab.com/gitlab-org/gitaly/v16/internal/command"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
"gitlab.com/gitlab-org/gitaly/v16/internal/safe"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v16/streamio"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ErrInvalidSourceRepository is returned when attempting to replicate from an invalid source repository.
var ErrInvalidSourceRepository = status.Error(codes.NotFound, "invalid source repository")
// ReplicateRepository replicates data from a source repository to target repository. On the target
// repository, this operation ensures synchronization of the following components:
//
// - Git config
// - Git attributes
// - Custom Git hooks,
// - References and objects
func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) {
if err := validateReplicateRepository(ctx, s.locator, in); err != nil {
return nil, structerr.NewInvalidArgument("%w", err)
}
if err := s.locator.ValidateRepository(ctx, in.GetRepository()); err != nil {
repoPath, err := s.locator.GetRepoPath(ctx, in.GetRepository(), storage.WithRepositoryVerificationSkipped())
if err != nil {
return nil, structerr.NewInternal("%w", err)
}
if err = s.create(ctx, in, repoPath); err != nil {
if errors.Is(err, ErrInvalidSourceRepository) {
return nil, ErrInvalidSourceRepository
}
return nil, structerr.NewInternal("%w", err)
}
}
repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName())
if err != nil {
return nil, structerr.NewInternal("new client: %w", err)
}
// We're checking for repository existence up front such that we can give a conclusive error
// in case it doesn't. Otherwise, the error message returned to the client would depend on
// the order in which the sync functions were executed. Most importantly, given that
// `syncRepository` uses FetchInternalRemote which in turn uses gitaly-ssh, this code path
// cannot pass up NotFound errors given that there is no communication channel between
// Gitaly and gitaly-ssh.
request, err := repoClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
Repository: in.GetSource(),
})
if err != nil {
return nil, structerr.NewInternal("checking for repo existence: %w", err)
}
if !request.GetExists() {
return nil, ErrInvalidSourceRepository
}
// The partitioning hint should not be forwarded to other Gitaly nodes as the path is irrelevant for them.
outgoingCtx := storage.ContextWithoutPartitioningHint(ctx)
outgoingCtx = metadata.IncomingToOutgoing(outgoingCtx)
if err := s.replicateRepository(outgoingCtx, in.GetSource(), in.GetRepository()); err != nil {
return nil, structerr.NewInternal("replicating repository: %w", err)
}
return &gitalypb.ReplicateRepositoryResponse{}, nil
}
func (s *server) replicateRepository(ctx context.Context, source, target *gitalypb.Repository) error {
if err := s.syncGitconfig(ctx, source, target, func(ctx context.Context, path string, content io.Reader) error {
if err := s.writeFile(ctx, path, content); err != nil {
return err
}
if tx := storage.ExtractTransaction(ctx); tx != nil {
originalConfigRelativePath, err := filepath.Rel(tx.FS().Root(), path)
if err != nil {
return fmt.Errorf("original config relative path: %w", err)
}
if err := tx.FS().RecordRemoval(originalConfigRelativePath); err != nil {
return fmt.Errorf("record old config removal: %w", err)
}
if err := tx.FS().RecordFile(originalConfigRelativePath); err != nil {
return fmt.Errorf("record new config creation: %w", err)
}
}
return nil
}); err != nil {
return fmt.Errorf("synchronizing gitconfig: %w", err)
}
if err := s.syncReferences(ctx, source, target); err != nil {
return fmt.Errorf("synchronizing references: %w", err)
}
if err := s.syncCustomHooks(ctx, source, target); err != nil {
return fmt.Errorf("synchronizing custom hooks: %w", err)
}
return nil
}
func validateReplicateRepository(ctx context.Context, locator storage.Locator, in *gitalypb.ReplicateRepositoryRequest) error {
if err := locator.ValidateRepository(ctx, in.GetRepository(), storage.WithSkipRepositoryExistenceCheck()); err != nil {
return err
}
if in.GetSource() == nil {
return errors.New("source repository cannot be empty")
}
if in.GetRepository().GetStorageName() == in.GetSource().GetStorageName() {
return errors.New("repository and source have the same storage")
}
return nil
}
func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest, repoPath string) error {
// if the directory exists, remove it
if _, err := os.Stat(repoPath); err == nil {
tempDir, err := tempdir.NewWithoutContext(in.GetRepository().GetStorageName(), s.logger, s.locator)
if err != nil {
return err
}
if err = os.Rename(repoPath, filepath.Join(tempDir.Path(), filepath.Base(repoPath))); err != nil {
return fmt.Errorf("error deleting invalid repo: %w", err)
}
s.logger.WithField("repo_path", repoPath).WarnContext(ctx, "removed invalid repository")
}
if err := s.createFromSnapshot(ctx, in.GetSource(), in.GetRepository()); err != nil {
return fmt.Errorf("could not create repository from snapshot: %w", err)
}
return nil
}
func (s *server) createFromSnapshot(ctx context.Context, source, target *gitalypb.Repository) error {
if err := repoutil.Create(ctx, s.logger, s.locator, s.gitCmdFactory, s.catfileCache, s.txManager, s.repositoryCounter, target, func(repo *gitalypb.Repository) error {
if err := s.extractSnapshot(ctx, source, repo); err != nil {
return fmt.Errorf("extracting snapshot: %w", err)
}
// The archive extracted above does not contain the configuration file. If SHA256 is used, this
// would lead to an invalid repository as the object format configuration is not present. This
// leads to failures with transactions as we need to pack the object directory after the repository
// is created. Sync the config here to ensure the correct object format is configured before
// returning. We write it directly to disk without voting as the voting is handled by the voting
// round of the repository creation. We also don't want to record the config creating operation
// separately as it is already recorded by `repoutil.Create` when it records the entire repository.
//
// We only run this with transactions as the file update is not atomic without transactions.
if tx := storage.ExtractTransaction(ctx); tx != nil {
if err := s.syncGitconfig(ctx, source, repo, func(ctx context.Context, path string, content io.Reader) (returnedErr error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode.File)
if err != nil {
return fmt.Errorf("open: %w", err)
}
defer func() {
if err := file.Close(); err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("close: %w", err))
}
}()
if _, err := io.Copy(file, content); err != nil {
return fmt.Errorf("copy: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("sync gitconfig: %w", err)
}
}
return nil
}); err != nil {
return fmt.Errorf("creating repository: %w", err)
}
if tx := storage.ExtractTransaction(ctx); tx != nil {
if err := s.migrationStateManager.RecordKeyCreation(
tx,
tx.OriginalRepository(target).GetRelativePath(),
); err != nil {
return fmt.Errorf("recording migration key: %w", err)
}
}
return nil
}
func (s *server) extractSnapshot(ctx context.Context, source, target *gitalypb.Repository) error {
repoClient, err := s.newRepoClient(ctx, source.GetStorageName())
if err != nil {
return fmt.Errorf("new client: %w", err)
}
stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: source})
if err != nil {
return fmt.Errorf("get snapshot: %w", err)
}
// We need to catch a possible 'invalid repository' error from GetSnapshot. On an empty read,
// BSD tar exits with code 0 so we'd receive the error when waiting for the command. GNU tar on
// Linux exits with a non-zero code, which causes Go to return an os.ExitError hiding the original
// error reading from stdin. To get access to the error on both Linux and macOS, we read the first
// message from the stream here to get access to the possible 'invalid repository' first on both
// platforms.
firstBytes, err := stream.Recv()
if err != nil {
switch {
case structerr.GRPCCode(err) == codes.NotFound && strings.Contains(err.Error(), "GetRepoPath: not a git repository:"):
// The error condition exists for backwards compatibility purposes, only,
// and can be removed in the next release.
return ErrInvalidSourceRepository
case structerr.GRPCCode(err) == codes.NotFound && strings.Contains(err.Error(), storage.ErrRepositoryNotFound.Error()):
return ErrInvalidSourceRepository
case structerr.GRPCCode(err) == codes.FailedPrecondition && strings.Contains(err.Error(), storage.ErrRepositoryNotValid.Error()):
return ErrInvalidSourceRepository
default:
return fmt.Errorf("first snapshot read: %w", err)
}
}
snapshotReader := io.MultiReader(
bytes.NewReader(firstBytes.GetData()),
streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
}),
)
targetPath, err := s.locator.GetRepoPath(ctx, target, storage.WithRepositoryVerificationSkipped())
if err != nil {
return fmt.Errorf("target path: %w", err)
}
stderr := &bytes.Buffer{}
cmd, err := command.New(ctx, s.logger, []string{"tar", "-C", targetPath, "-xvf", "-"},
command.WithStdin(snapshotReader),
command.WithStderr(stderr),
)
if err != nil {
return fmt.Errorf("create tar command: %w", err)
}
if err = cmd.Wait(); err != nil {
return structerr.New("wait for tar: %w", err).WithMetadata("stderr", stderr)
}
return nil
}
func (s *server) syncReferences(ctx context.Context, source, target *gitalypb.Repository) error {
repo := s.localRepoFactory.Build(target)
if err := fetchInternalRemote(ctx, s.txManager, s.conns, repo, source); err != nil {
return fmt.Errorf("fetch internal remote: %w", err)
}
return nil
}
func fetchInternalRemote(
ctx context.Context,
txManager transaction.Manager,
conns *client.Pool,
repo *localrepo.Repo,
remoteRepoProto *gitalypb.Repository,
) error {
var stderr bytes.Buffer
if err := repo.FetchInternal(
ctx,
remoteRepoProto,
[]string{git.MirrorRefSpec},
localrepo.FetchOpts{
Prune: true,
Stderr: &stderr,
// By default, Git will fetch any tags that point into the fetched references. This check
// requires time, and is ultimately a waste of compute because we already mirror all refs
// anyway, including tags. By adding `--no-tags` we can thus ask Git to skip that and thus
// accelerate the fetch.
Tags: localrepo.FetchOptsTagsNone,
CommandOptions: []gitcmd.CmdOpt{
gitcmd.WithConfig(gitcmd.ConfigPair{Key: "fetch.negotiationAlgorithm", Value: "skipping"}),
// Disable the consistency checks of objects fetched into the replicated repository.
// These fetched objects come from preexisting internal sources, thus it would be
// problematic for the fetch to fail consistency checks due to altered requirements.
gitcmd.WithConfig(gitcmd.ConfigPair{Key: "fetch.fsckObjects", Value: "false"}),
},
},
); err != nil {
if errors.As(err, &localrepo.FetchFailedError{}) {
return structerr.New("%w", err).WithMetadata("stderr", stderr.String())
}
return fmt.Errorf("fetch: %w", err)
}
remoteRepo, err := remoterepo.New(ctx, remoteRepoProto, conns)
if err != nil {
return structerr.NewInternal("%w", err)
}
remoteDefaultBranch, err := remoteRepo.HeadReference(ctx)
if err != nil {
return structerr.NewInternal("getting remote default branch: %w", err)
}
defaultBranch, err := repo.HeadReference(ctx)
if err != nil {
return structerr.NewInternal("getting local default branch: %w", err)
}
if defaultBranch != remoteDefaultBranch {
if err := repo.SetDefaultBranch(ctx, txManager, remoteDefaultBranch); err != nil {
return structerr.NewInternal("setting default branch: %w", err)
}
}
return nil
}
// syncCustomHooks replicates custom hooks from a source to a target.
func (s *server) syncCustomHooks(ctx context.Context, source, target *gitalypb.Repository) error {
repoClient, err := s.newRepoClient(ctx, source.GetStorageName())
if err != nil {
return fmt.Errorf("creating repo client: %w", err)
}
stream, err := repoClient.GetCustomHooks(ctx, &gitalypb.GetCustomHooksRequest{
Repository: source,
})
if err != nil {
return fmt.Errorf("getting custom hooks: %w", err)
}
reader := streamio.NewReader(func() ([]byte, error) {
request, err := stream.Recv()
return request.GetData(), err
})
if err := repoutil.SetCustomHooks(ctx, s.logger, s.locator, s.txManager, reader, target); err != nil {
return fmt.Errorf("setting custom hooks: %w", err)
}
return nil
}
func (s *server) syncGitconfig(ctx context.Context, source, target *gitalypb.Repository, writeConfig func(ctx context.Context, path string, content io.Reader) error) error {
repoClient, err := s.newRepoClient(ctx, source.GetStorageName())
if err != nil {
return err
}
repoPath, err := s.locator.GetRepoPath(ctx, target)
if err != nil {
return err
}
stream, err := repoClient.GetConfig(ctx, &gitalypb.GetConfigRequest{
Repository: source,
})
if err != nil {
return err
}
configPath := filepath.Join(repoPath, "config")
return writeConfig(ctx, configPath, streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
}))
}
func (s *server) writeFile(ctx context.Context, path string, reader io.Reader) (returnedErr error) {
parentDir := filepath.Dir(path)
if err := os.MkdirAll(parentDir, mode.Directory); err != nil {
return err
}
lockedFile, err := safe.NewLockingFileWriter(path, safe.LockingFileWriterConfig{
FileWriterConfig: safe.FileWriterConfig{
FileMode: mode.File,
},
})
if err != nil {
return fmt.Errorf("creating file writer: %w", err)
}
defer func() {
if err := lockedFile.Close(); err != nil && returnedErr == nil {
returnedErr = err
}
}()
if _, err := io.Copy(lockedFile, reader); err != nil {
return err
}
if err := transaction.CommitLockedFile(ctx, s.txManager, lockedFile); err != nil {
return err
}
return nil
}
// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository
func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) {
gitalyServerInfo, err := storage.ExtractGitalyServer(ctx, storageName)
if err != nil {
return nil, err
}
conn, err := s.conns.Dial(ctx, gitalyServerInfo.Address, gitalyServerInfo.Token)
if err != nil {
return nil, err
}
return gitalypb.NewRepositoryServiceClient(conn), nil
}