internal/backup/backup.go (480 lines of code) (raw):
package backup
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
// ErrSkipped means the repository was skipped because there was nothing to backup
ErrSkipped = errors.New("repository skipped")
// ErrDoesntExist means that the data was not found.
ErrDoesntExist = errors.New("doesn't exist")
backupLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gitaly_backup_latency_seconds",
Help: "Latency of a repository backup by phase",
},
[]string{"phase"})
backupBundleSize = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "gitaly_backup_bundle_bytes",
Help: "Size of a Git bundle uploaded in a backup",
Buckets: prometheus.ExponentialBucketsRange(1, 10*math.Pow(1024, 3), 20), // up to 10GB
})
)
// Backup represents all the information needed to restore a backup for a repository
type Backup struct {
// ID is the identifier that uniquely identifies the backup for this repository.
ID string `toml:"-"`
// Repository is the repository being backed up.
Repository storage.Repository `toml:"-"`
// Empty is true if the repository is empty
Empty bool `toml:"empty"`
// NonExistent is true if the repository does not exist.
// Any Project/Non-Project that doesn't have any repository initialised can be considered as non-existent.
NonExistent bool `toml:"non_existent"`
// Steps are the ordered list of steps required to restore this backup
Steps []Step `toml:"steps"`
// ObjectFormat is the name of the object hash used by the repository.
ObjectFormat string `toml:"object_format"`
// HeadReference is the reference that HEAD points to.
HeadReference string `toml:"head_reference,omitempty"`
}
// Step represents an incremental step that makes up a complete backup for a repository
type Step struct {
// BundlePath is the path of the bundle
BundlePath string `toml:"bundle_path,omitempty"`
// RefPath is the path of the ref file
RefPath string `toml:"ref_path,omitempty"`
// PreviousRefPath is the path of the previous ref file
PreviousRefPath string `toml:"previous_ref_path,omitempty"`
// CustomHooksPath is the path of the custom hooks archive
CustomHooksPath string `toml:"custom_hooks_path,omitempty"`
}
// Locator finds sink backup paths for repositories
type Locator interface {
// BeginFull returns the tentative backup paths needed to create a full backup.
BeginFull(ctx context.Context, repo storage.Repository, backupID string) *Backup
// BeginIncremental returns the backup with the last element of Steps being
// the tentative step needed to create an incremental backup.
BeginIncremental(ctx context.Context, repo storage.Repository, backupID string) (*Backup, error)
// Commit persists the backup so that it can be looked up by FindLatest. It
// is expected that the last element of Steps will be the newly created
// backup.
Commit(ctx context.Context, backup *Backup) error
// FindLatest returns the latest backup that was written by Commit
FindLatest(ctx context.Context, repo storage.Repository) (*Backup, error)
// Find returns the repository backup at the given backupID. If the backup does
// not exist then the error ErrDoesntExist is returned.
Find(ctx context.Context, repo storage.Repository, backupID string) (*Backup, error)
}
// Repository abstracts git access required to make a repository backup
type Repository interface {
// ListRefs returns an iterator to fetch the full set of refs and targets for the repository.
ListRefs(ctx context.Context) (RefIterator, error)
// GetCustomHooks fetches the custom hooks archive.
GetCustomHooks(ctx context.Context, out io.Writer) error
// CreateBundle fetches a bundle that contains refs matching patterns. When
// patterns is nil all refs are bundled.
CreateBundle(ctx context.Context, out io.Writer, patterns io.Reader) error
// Remove removes the repository. Does not return an error if the
// repository cannot be found.
Remove(ctx context.Context) error
// Create creates the repository.
Create(ctx context.Context, hash git.ObjectHash, defaultBranch string) error
// FetchBundle fetches references from a bundle. Refs will be mirrored to
// the repository.
FetchBundle(ctx context.Context, reader io.Reader, updateHead bool) error
// SetCustomHooks updates the custom hooks for the repository.
SetCustomHooks(ctx context.Context, reader io.Reader) error
// ObjectHash detects the object hash used by the repository.
ObjectHash(ctx context.Context) (git.ObjectHash, error)
// HeadReference fetches the reference pointed to by HEAD.
HeadReference(ctx context.Context) (git.ReferenceName, error)
// ResetRefs attempts to reset the list of refs in the repository to match the
// specified refs slice. This can fail if objects pointed to by a ref no longer
// exists in the repository. The list of refs should not include the symbolic
// HEAD reference.
ResetRefs(ctx context.Context, refs []git.Reference) error
// SetHeadReference sets the symbolic HEAD reference of the repository to the
// given target, for example a branch name.
SetHeadReference(ctx context.Context, target git.ReferenceName) error
}
// ResolveLocator returns a locator implementation based on a locator identifier.
func ResolveLocator(layout string, sink *Sink) (Locator, error) {
var locator Locator = LegacyLocator{}
switch layout {
case "legacy":
case "pointer":
locator = PointerLocator{
Sink: sink,
Fallback: locator,
}
case "manifest":
locator = nil
default:
return nil, fmt.Errorf("unknown layout: %q", layout)
}
locator = NewManifestLocator(sink, locator)
return locator, nil
}
// Manager manages process of the creating/restoring backups.
type Manager struct {
sink *Sink
locator Locator
logger log.Logger
// repositoryFactory returns an abstraction over git repositories in order
// to create and restore backups.
repositoryFactory func(ctx context.Context, repo *gitalypb.Repository, server storage.ServerInfo) (Repository, error)
}
// NewManager creates and returns initialized *Manager instance.
func NewManager(sink *Sink, logger log.Logger, locator Locator, pool *client.Pool) *Manager {
return &Manager{
sink: sink,
locator: locator,
repositoryFactory: func(ctx context.Context, repo *gitalypb.Repository, server storage.ServerInfo) (Repository, error) {
if err := setContextServerInfo(ctx, &server, repo.GetStorageName()); err != nil {
return nil, err
}
conn, err := pool.Dial(ctx, server.Address, server.Token)
if err != nil {
return nil, err
}
return NewRemoteRepository(repo, conn), nil
},
logger: logger,
}
}
// NewManagerLocal creates and returns a *Manager instance for operating on local repositories.
func NewManagerLocal(
sink *Sink,
logger log.Logger,
locator Locator,
storageLocator storage.Locator,
gitCmdFactory gitcmd.CommandFactory,
catfileCache catfile.Cache,
txManager transaction.Manager,
repoCounter *counter.RepositoryCounter,
migrationStateManager migration.StateManager,
) *Manager {
return &Manager{
sink: sink,
locator: locator,
repositoryFactory: func(ctx context.Context, repo *gitalypb.Repository, server storage.ServerInfo) (Repository, error) {
localRepo := localrepo.New(logger, storageLocator, gitCmdFactory, catfileCache, repo)
return NewLocalRepository(
logger,
storageLocator,
gitCmdFactory,
txManager,
repoCounter,
catfileCache,
localRepo,
migrationStateManager,
), nil
},
logger: logger,
}
}
// streamRefs streams the full set of refs into the backup sink.
func (mgr *Manager) streamRefs(ctx context.Context, repo Repository, path string) (refCount int, returnErr error) {
timer := prometheus.NewTimer(backupLatency.WithLabelValues("refs"))
defer timer.ObserveDuration()
w, err := mgr.sink.GetWriter(ctx, path)
if err != nil {
return refCount, fmt.Errorf("stream refs: get writer: %w", err)
}
defer func() {
if err := w.Close(); err != nil {
returnErr = errors.Join(returnErr, fmt.Errorf("stream refs: close writer: %w", err))
}
}()
iterator, err := repo.ListRefs(ctx)
if err != nil {
return refCount, fmt.Errorf("stream refs: %w", err)
}
defer func() {
if err := iterator.Close(); err != nil {
returnErr = errors.Join(returnErr, fmt.Errorf("stream refs: close iterator: %w", err))
}
}()
for iterator.Next() {
ref := iterator.Ref()
_, err = fmt.Fprintf(w, "%s %s\n", ref.Target, ref.Name)
if err != nil {
return refCount, fmt.Errorf("stream refs: write ref: %w", err)
}
refCount++
}
if err := iterator.Err(); err != nil && !errors.Is(err, io.EOF) {
return refCount, fmt.Errorf("stream refs: ref iterator: %w", err)
}
return refCount, nil
}
// Create creates a repository backup.
func (mgr *Manager) Create(ctx context.Context, req *CreateRequest) error {
if req.VanityRepository == nil {
req.VanityRepository = req.Repository
}
repo, err := mgr.repositoryFactory(ctx, req.Repository, req.Server)
if err != nil {
return fmt.Errorf("manager: %w", err)
}
var backup *Backup
if req.Incremental {
var err error
backup, err = mgr.locator.BeginIncremental(ctx, req.VanityRepository, req.BackupID)
if err != nil {
return fmt.Errorf("manager: %w", err)
}
} else {
backup = mgr.locator.BeginFull(ctx, req.VanityRepository, req.BackupID)
}
hash, err := repo.ObjectHash(ctx)
switch {
case status.Code(err) == codes.NotFound:
backup.NonExistent = true
backup.Empty = true
return mgr.locator.Commit(ctx, backup)
case err != nil:
return fmt.Errorf("manager: %w", err)
}
backup.ObjectFormat = hash.Format
headRef, err := repo.HeadReference(ctx)
if err != nil {
return fmt.Errorf("manager: %w", err)
}
backup.HeadReference = headRef.String()
step := &backup.Steps[len(backup.Steps)-1]
refCount, err := mgr.streamRefs(ctx, repo, step.RefPath)
if err != nil {
return fmt.Errorf("manager: %w", err)
}
if refCount > 0 {
if err := mgr.writeBundle(ctx, repo, step); err != nil {
return fmt.Errorf("manager: %w", err)
}
} else {
backup.Empty = true
}
if err := mgr.writeCustomHooks(ctx, repo, step.CustomHooksPath); err != nil {
return fmt.Errorf("manager: %w", err)
}
if err := mgr.locator.Commit(ctx, backup); err != nil {
return fmt.Errorf("manager: %w", err)
}
return nil
}
// Restore restores a repository from a backup. If req.BackupID is empty, the
// latest backup will be used.
func (mgr *Manager) Restore(ctx context.Context, req *RestoreRequest) error {
if req.VanityRepository == nil {
req.VanityRepository = req.Repository
}
repo, err := mgr.repositoryFactory(ctx, req.Repository, req.Server)
if err != nil {
return fmt.Errorf("manager: %w", err)
}
var backup *Backup
if req.BackupID == "" {
backup, err = mgr.locator.FindLatest(ctx, req.VanityRepository)
switch {
case errors.Is(err, ErrDoesntExist):
return removeRepository(ctx, repo, err)
case err != nil:
return fmt.Errorf("manager: %w", err)
}
} else {
backup, err = mgr.locator.Find(ctx, req.VanityRepository, req.BackupID)
switch {
case errors.Is(err, ErrDoesntExist):
return fmt.Errorf("manager: %w: %w", ErrDoesntExist, err)
case err != nil:
return fmt.Errorf("manager: %w", err)
}
}
if len(backup.Steps) == 0 {
return fmt.Errorf("manager: no backup steps")
}
switch {
case backup.NonExistent:
return nil // Nothing to restore for non-existent repositories
case backup.Empty:
if _, err := recreateRepo(ctx, repo, backup); err != nil {
return fmt.Errorf("manager: recreate empty repo: %w", err)
}
}
// Git bundles can not be created for empty repositories.
if !backup.Empty {
// Restore Git objects, potentially from increments.
if err := mgr.restoreFromRefs(ctx, repo, backup); err != nil {
mgr.logger.WithFields(log.Fields{
"storage": req.Repository.GetStorageName(),
"relative_path": req.Repository.GetRelativePath(),
"backup_id": backup.ID,
logrus.ErrorKey: err,
}).Warn("unable to reset refs. Proceeding with a normal restore")
// If we can't reset the refs, perform a full restore by recreating the repo and cloning from the bundle.
if err := mgr.restoreFromBundle(ctx, repo, backup); err != nil {
return fmt.Errorf("manager: restore from bundle: %w", err)
}
}
}
// Restore custom hooks. Each custom hooks archive contains the entirety of the hooks, so
// we can just restore the most recent archive.
latestStep := backup.Steps[len(backup.Steps)-1]
return mgr.restoreCustomHooks(ctx, repo, latestStep.CustomHooksPath)
}
func (mgr *Manager) restoreFromRefs(ctx context.Context, repo Repository, backup *Backup) error {
latestStep := backup.Steps[len(backup.Steps)-1]
refs, err := mgr.readRefs(ctx, latestStep.RefPath)
if err != nil {
return fmt.Errorf("read refs from backup: %w", err)
}
if len(refs) == 0 {
return errors.New("no refs in backup")
}
// Reset all refs except for HEAD.
if err := repo.ResetRefs(ctx, refs); err != nil {
return fmt.Errorf("reset refs: %w", err)
}
// Explicitly reset HEAD to the default branch tracked by the manifest if available. In a
// bundle restore, this would've been done during repository creation.
headRef := git.ReferenceName(backup.HeadReference)
if headRef == "" {
return errors.New("expected HEAD to be a symbolic reference")
}
return repo.SetHeadReference(ctx, headRef)
}
func (mgr *Manager) restoreFromBundle(ctx context.Context, repo Repository, backup *Backup) error {
defaultBranchKnown, err := recreateRepo(ctx, repo, backup)
if err != nil {
return fmt.Errorf("recreate repo: %w", err)
}
for _, step := range backup.Steps {
refs, err := mgr.readRefs(ctx, step.RefPath)
if err != nil {
return fmt.Errorf("read refs: %w", err)
}
// In case old manifest gets loaded and since empty repository
// backups do not contain a bundle, skip bundle restoration.
// TODO: Remove refs check after couple of releases.
if len(refs) > 0 {
if err := mgr.restoreBundle(ctx, repo, step.BundlePath, !defaultBranchKnown); err != nil {
return fmt.Errorf("restore bundle: %w", err)
}
}
}
return nil
}
func removeRepository(ctx context.Context, repo Repository, cause error) error {
if err := repo.Remove(ctx); err != nil {
return fmt.Errorf("manager: repository removed: %w", err)
}
return cause
}
// setContextServerInfo overwrites server with gitaly connection info from ctx metadata when server is zero.
func setContextServerInfo(ctx context.Context, server *storage.ServerInfo, storageName string) error {
if !server.Zero() {
return nil
}
var err error
*server, err = storage.ExtractGitalyServer(ctx, storageName)
if err != nil {
return fmt.Errorf("set context server info: %w", err)
}
return nil
}
func (mgr *Manager) writeBundle(ctx context.Context, repo Repository, step *Step) (returnErr error) {
timer := prometheus.NewTimer(backupLatency.WithLabelValues("bundle"))
defer timer.ObserveDuration()
var patterns io.Reader
if len(step.PreviousRefPath) > 0 {
// If there is a previous ref path, then we are creating an increment
negatedRefs, err := mgr.negatedKnownRefs(ctx, step)
if err != nil {
return fmt.Errorf("write bundle: %w", err)
}
defer func() {
if err := negatedRefs.Close(); err != nil && returnErr == nil {
returnErr = fmt.Errorf("write bundle: %w", err)
}
}()
patterns = io.MultiReader(strings.NewReader("--all\n"), negatedRefs)
}
w := NewLazyWriter(func() (io.WriteCloser, error) {
return mgr.sink.GetWriter(ctx, step.BundlePath)
})
defer func() {
backupBundleSize.Observe(float64(w.BytesWritten()))
if err := w.Close(); err != nil && returnErr == nil {
returnErr = fmt.Errorf("write bundle: %w", err)
}
}()
if err := repo.CreateBundle(ctx, w, patterns); err != nil {
if errors.Is(err, localrepo.ErrEmptyBundle) {
return nil
}
return fmt.Errorf("write bundle: %w", err)
}
return nil
}
func (mgr *Manager) negatedKnownRefs(ctx context.Context, step *Step) (io.ReadCloser, error) {
if len(step.PreviousRefPath) == 0 {
return io.NopCloser(new(bytes.Reader)), nil
}
r, w := io.Pipe()
go func() {
defer w.Close()
reader, err := mgr.sink.GetReader(ctx, step.PreviousRefPath)
if err != nil {
_ = w.CloseWithError(err)
return
}
defer reader.Close()
d := gitcmd.NewShowRefDecoder(reader)
for {
var ref git.Reference
if err := d.Decode(&ref); errors.Is(err, io.EOF) {
break
} else if err != nil {
_ = w.CloseWithError(err)
return
}
if _, err := fmt.Fprintf(w, "^%s\n", ref.Target); err != nil {
_ = w.CloseWithError(err)
return
}
}
}()
return r, nil
}
func (mgr *Manager) readRefs(ctx context.Context, path string) ([]git.Reference, error) {
reader, err := mgr.sink.GetReader(ctx, path)
if err != nil {
return nil, fmt.Errorf("read refs: %w", err)
}
defer reader.Close()
var refs []git.Reference
d := gitcmd.NewShowRefDecoder(reader)
for {
var ref git.Reference
if err := d.Decode(&ref); errors.Is(err, io.EOF) {
break
} else if err != nil {
return refs, fmt.Errorf("read refs: %w", err)
}
// HEAD is tracked as a symbolic reference in the backup manifest and will be restored separately.
if ref.Name == "HEAD" {
continue
}
refs = append(refs, ref)
}
return refs, nil
}
func (mgr *Manager) restoreBundle(ctx context.Context, repo Repository, path string, updateHead bool) error {
reader, err := mgr.sink.GetReader(ctx, path)
if err != nil {
return fmt.Errorf("restore bundle: %q: %w", path, err)
}
defer reader.Close()
if err := repo.FetchBundle(ctx, reader, updateHead); err != nil {
return fmt.Errorf("restore bundle: %q: %w", path, err)
}
return nil
}
func (mgr *Manager) writeCustomHooks(ctx context.Context, repo Repository, path string) (returnErr error) {
timer := prometheus.NewTimer(backupLatency.WithLabelValues("custom_hooks"))
defer timer.ObserveDuration()
w := NewLazyWriter(func() (io.WriteCloser, error) {
return mgr.sink.GetWriter(ctx, path)
})
defer func() {
if err := w.Close(); err != nil && returnErr == nil {
returnErr = fmt.Errorf("write custom hooks: %w", err)
}
}()
if err := repo.GetCustomHooks(ctx, w); err != nil {
return fmt.Errorf("write custom hooks: %w", err)
}
return nil
}
func (mgr *Manager) restoreCustomHooks(ctx context.Context, repo Repository, path string) error {
reader, err := mgr.sink.GetReader(ctx, path)
if err != nil {
if errors.Is(err, ErrDoesntExist) {
return nil
}
return fmt.Errorf("restore custom hooks: %w", err)
}
defer reader.Close()
if err := repo.SetCustomHooks(ctx, reader); err != nil {
return fmt.Errorf("restore custom hooks, %q: %w", path, err)
}
return nil
}
func recreateRepo(ctx context.Context, repo Repository, backup *Backup) (bool, error) {
hash, err := git.ObjectHashByFormat(backup.ObjectFormat)
if err != nil {
return false, err
}
defaultBranch, defaultBranchKnown := git.ReferenceName(backup.HeadReference).Branch()
if err := repo.Remove(ctx); err != nil {
return defaultBranchKnown, err
}
if err := repo.Create(ctx, hash, defaultBranch); err != nil {
return defaultBranchKnown, err
}
return defaultBranchKnown, nil
}