in internal/gitaly/repoutil/create.go [73:330]
func Create(
ctx context.Context,
logger log.Logger,
locator storage.Locator,
gitCmdFactory gitcmd.CommandFactory,
catfileCache catfile.Cache,
txManager transaction.Manager,
repoCounter *counter.RepositoryCounter,
repository storage.Repository,
seedRepository func(repository *gitalypb.Repository) error,
options ...CreateOption,
) error {
targetPath, err := locator.GetRepoPath(ctx, repository, storage.WithRepositoryVerificationSkipped())
if err != nil {
return structerr.NewInvalidArgument("locate repository: %w", err)
}
// The repository must not exist on disk already, or otherwise we won't be able to
// create it with atomic semantics.
if _, err := os.Stat(targetPath); !errors.Is(err, fs.ErrNotExist) {
if err == nil {
return structerr.NewAlreadyExists("repository exists already")
}
return fmt.Errorf("pre-lock stat: %w", err)
}
newRepoProto, newRepoDir, err := tempdir.NewRepository(ctx, repository.GetStorageName(), logger, locator)
if err != nil {
return fmt.Errorf("creating temporary repository: %w", err)
}
defer func() {
// We don't really care about whether this succeeds or not. It will either get
// cleaned up after the context is done, or eventually by the tempdir walker when
// it's old enough.
_ = os.RemoveAll(newRepoDir.Path())
}()
// Note that we do not create the repository directly in its target location, but
// instead create it in a temporary directory, first. This is done such that we can
// guarantee atomicity and roll back the change easily in case an error happens.
var cfg createConfig
for _, option := range options {
option(&cfg)
}
if !cfg.skipInit {
stderr := &bytes.Buffer{}
cmd, err := gitCmdFactory.NewWithoutRepo(ctx, gitcmd.Command{
Name: "init",
Flags: append([]gitcmd.Option{
gitcmd.Flag{Name: "--bare"},
gitcmd.Flag{Name: "--quiet"},
}, cfg.gitOptions...),
Args: []string{newRepoDir.Path()},
}, gitcmd.WithStderr(stderr))
if err != nil {
return fmt.Errorf("spawning git-init: %w", err)
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("creating repository: %w, stderr: %q", err, stderr.String())
}
} else {
if err := os.Remove(newRepoDir.Path()); err != nil {
return fmt.Errorf("removing precreated directory: %w", err)
}
}
if err := seedRepository(newRepoProto); err != nil {
// Return the error returned by the callback function as-is so we don't clobber any
// potential returned gRPC error codes.
return err
}
newRepo := localrepo.New(logger, locator, gitCmdFactory, catfileCache, newRepoProto)
refBackend, err := newRepo.ReferenceBackend(ctx)
if err != nil {
return fmt.Errorf("detecting reference backend: %w", err)
}
// In order to guarantee that the repository is going to be the same across all
// Gitalies in case we're behind Praefect, we walk the repository and hash all of
// its files.
voteHash := voting.NewVoteHash()
if err := filepath.WalkDir(newRepoDir.Path(), func(path string, entry fs.DirEntry, err error) error {
if err != nil {
return err
}
switch path {
// The way packfiles are generated may not be deterministic, so we skip over the
// object database.
case filepath.Join(newRepoDir.Path(), "objects"):
return fs.SkipDir
// FETCH_HEAD refers to the remote we're fetching from. This URL may not be
// deterministic, e.g. when fetching from a temporary file like we do in
// CreateRepositoryFromBundle.
case filepath.Join(newRepoDir.Path(), "FETCH_HEAD"):
return nil
case filepath.Join(newRepoDir.Path(), "refs"):
if refBackend == git.ReferenceBackendReftables {
return fs.SkipDir
}
// Reftables creates files with random suffix, which can be different from node
// to node. So we instead capture the ref information directly.
//
// TODO: Ideally we want to also use the same ideology for the files backend too
// https://gitlab.com/gitlab-org/gitaly/-/issues/6050
case filepath.Join(newRepoDir.Path(), "reftable"):
if refBackend == git.ReferenceBackendReftables {
if err := writeRefs(ctx, voteHash, newRepo); err != nil {
return err
}
return fs.SkipDir
}
}
// We do not care about directories.
if entry.IsDir() {
return nil
}
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("opening %q: %w", entry.Name(), err)
}
defer file.Close()
if _, err := io.Copy(voteHash, file); err != nil {
return fmt.Errorf("hashing %q: %w", entry.Name(), err)
}
return nil
}); err != nil {
return fmt.Errorf("walking repository: %w", err)
}
vote, err := voteHash.Vote()
if err != nil {
return fmt.Errorf("computing vote: %w", err)
}
// Create the full-repack timestamp in the new repository. This is done so that we don't
// consider new repositories to have never been repacked yet, which would cause repository
// housekeeping to perform a full repack right away. And in general, this would not really
// be needed as the end result for most of the repository-creating RPCs would be a either an
// empty or a neatly-packed repository anyway.
//
// As this timestamp should never impact the user-observable state of a repository we do not
// include it in the voting hash.
if err := stats.UpdateFullRepackTimestamp(newRepoDir.Path(), time.Now()); err != nil {
return fmt.Errorf("creating full-repack timestamp: %w", err)
}
// We're now entering the critical section where we want to have exclusive access
// over creation of the repository. So we:
//
// 1. Lock the repository path such that no other process can create it at the same
// time.
// 2. Vote on the new repository's state.
// 3. Move the repository into place.
// 4. Do another confirmatory vote to signal that we performed the change.
// 5. Unlock the repository again.
//
// This sequence guarantees that the change is atomic and can trivially be rolled
// back in case we fail to either lock the repository or reach quorum in the initial
// vote.
unlock, err := Lock(ctx, logger, locator, repository)
if err != nil {
return fmt.Errorf("locking repository: %w", err)
}
defer unlock()
// Now that the repository is locked, we must assert that it _still_ doesn't exist.
// Otherwise, it could have happened that a concurrent RPC call created it while we created
// and seeded our temporary repository. While we would notice this at the point of moving
// the repository into place, we want to be as sure as possible that the action will succeed
// previous to the first transactional vote.
if _, err := os.Stat(targetPath); !errors.Is(err, fs.ErrNotExist) {
if err == nil {
return structerr.NewAlreadyExists("repository exists already")
}
return fmt.Errorf("post-lock stat: %w", err)
}
if err := transaction.VoteOnContext(ctx, txManager, vote, voting.Prepared); err != nil {
return structerr.NewFailedPrecondition("preparatory vote: %w", err)
}
syncer := safe.NewSyncer()
if storage.NeedsSync(ctx) {
if err := syncer.SyncRecursive(ctx, newRepoDir.Path()); err != nil {
return fmt.Errorf("sync recursive: %w", err)
}
}
// Now that we have locked the repository and all Gitalies have agreed that they
// want to do the same change we can move the repository into place.
if err := os.Rename(newRepoDir.Path(), targetPath); err != nil {
return fmt.Errorf("moving repository into place: %w", err)
}
storagePath, err := locator.GetStorageByName(ctx, repository.GetStorageName())
if err != nil {
return fmt.Errorf("get storage by name: %w", err)
}
if storage.NeedsSync(ctx) {
if err := syncer.SyncHierarchy(ctx, storagePath, repository.GetRelativePath()); err != nil {
return fmt.Errorf("sync hierarchy: %w", err)
}
}
if err := transaction.VoteOnContext(ctx, txManager, vote, voting.Committed); err != nil {
return structerr.NewFailedPrecondition("committing vote: %w", err)
}
repoCounter.Increment(repository)
if tx := storage.ExtractTransaction(ctx); tx != nil {
// Git allows writing unreachable objects into the repository that are missing their dependencies. The reachable
// ones are checked through connectivity checks but unreachable ones are not.
//
// Transactions rely on a property that all objects in the repository have all of their dependencies met. This allows
// us to skip full connectivity checks, and simply check that the immediate dependencies of the newly written objects
// are satisfied. Repository creations are used in various contexts and not all of them guarantee this property. Perform
// a full repack to drop all unreachable objects. This way we're certain all of the objects committed through a repository
// creation have their dependencies satisified. Ideally we would only perform a connectivity check of the new objects,
// and record the dependencies that must exist in the repository already. Repository creations should generally include
// all objects so the rewriting should not be needed. Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5969
if err := performFullRepack(ctx, localrepo.New(logger, locator, gitCmdFactory, catfileCache, &gitalypb.Repository{
StorageName: repository.GetStorageName(),
RelativePath: repository.GetRelativePath(),
})); err != nil {
return fmt.Errorf("perform full repack: %w", err)
}
originalRelativePath, err := filepath.Rel(tx.FS().Root(), targetPath)
if err != nil {
return fmt.Errorf("original relative path: %w", err)
}
if err := storage.RecordDirectoryCreation(tx.FS(), originalRelativePath); err != nil {
return fmt.Errorf("record directory creation: %w", err)
}
if err := tx.KV().Set(storage.RepositoryKey(originalRelativePath), nil); err != nil {
return fmt.Errorf("store repository key: %w", err)
}
}
// We unlock the repository implicitly via the deferred `Close()` call.
return nil
}