internal/cli/gitalybackup/restore.go (227 lines of code) (raw):

package gitalybackup import ( "context" "encoding/json" "errors" "fmt" "io" "runtime" cli "github.com/urfave/cli/v3" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type restoreRequest struct { serverRepository AlwaysCreate bool `json:"always_create"` } type restoreSubcommand struct { backupPath string parallel int parallelStorage int layout string removeAllRepositories []string backupID string serverSide bool } func (cmd *restoreSubcommand) flags(ctx *cli.Command) { cmd.backupPath = ctx.String("path") cmd.parallel = int(ctx.Int("parallel")) cmd.parallelStorage = int(ctx.Int("parallel-storage")) cmd.layout = ctx.String("layout") cmd.removeAllRepositories = ctx.StringSlice("remove-all-repositories") cmd.backupID = ctx.String("id") cmd.serverSide = ctx.Bool("server-side") } func restoreFlags() []cli.Flag { return []cli.Flag{ &cli.StringFlag{ Name: "path", Usage: "repository backup path", }, &cli.IntFlag{ Name: "parallel", Usage: "maximum number of parallel backups", Value: int64(runtime.NumCPU()), }, &cli.IntFlag{ Name: "parallel-storage", Usage: "maximum number of parallel backups per storage. Note: actual parallelism when combined with `-parallel` depends on the order the repositories are received.", Value: 2, }, &cli.StringFlag{ Name: "layout", Usage: "how backup files are located. One of manifest, pointer, or legacy.", Value: "manifest", }, &cli.StringSliceFlag{ Name: "remove-all-repositories", Usage: "comma-separated list of storage names to have all repositories removed from before restoring.", }, &cli.StringFlag{ Name: "id", Usage: "ID of full backup to restore. If not specified, the latest backup is restored.", }, &cli.BoolFlag{ Name: "server-side", Usage: "use server-side backups.", Value: false, }, } } func newRestoreCommand() *cli.Command { return &cli.Command{ Name: "restore", Usage: "Restore backup file", Action: restoreAction, Flags: restoreFlags(), } } func restoreAction(ctx context.Context, cmd *cli.Command) error { logger, err := log.Configure(cmd.Writer, "json", "info") if err != nil { fmt.Printf("configuring logger failed: %v", err) return err } ctx, err = storage.InjectGitalyServersEnv(ctx) if err != nil { logger.Error(err.Error()) return err } subcmd := restoreSubcommand{} subcmd.flags(cmd) if err := subcmd.run(ctx, logger, cmd.Reader); err != nil { logger.Error(err.Error()) return err } return nil } func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin io.Reader) error { pool := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor())) defer func() { _ = pool.Close() }() var manager backup.Strategy if cmd.serverSide { if cmd.backupPath != "" { return fmt.Errorf("restore: path cannot be used with server-side backups") } manager = backup.NewServerSideAdapter(pool) } else { sink, err := backup.ResolveSink(ctx, cmd.backupPath) if err != nil { return fmt.Errorf("restore: resolve sink: %w", err) } locator, err := backup.ResolveLocator(cmd.layout, sink) if err != nil { return fmt.Errorf("restore: resolve locator: %w", err) } manager = backup.NewManager(sink, logger, locator, pool) } // Get the set of existing repositories keyed by storage. We'll later use this to determine any // dangling repos that should be removed. existingRepos := make(map[string][]*gitalypb.Repository) for _, storageName := range cmd.removeAllRepositories { repos, err := listRepositories(ctx, pool, storageName) if err != nil { logger.WithError(err).WithField("storage_name", storageName).Warn("failed to list repositories") } existingRepos[storageName] = repos } var opts []backup.PipelineOption if cmd.parallel > 0 || cmd.parallelStorage > 0 { opts = append(opts, backup.WithConcurrency(cmd.parallel, cmd.parallelStorage)) } pipeline, err := backup.NewPipeline(logger, opts...) if err != nil { return fmt.Errorf("create pipeline: %w", err) } decoder := json.NewDecoder(stdin) for { var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break } else if err != nil { return fmt.Errorf("restore: %w", err) } repo := gitalypb.Repository{ StorageName: req.StorageName, RelativePath: req.RelativePath, GlProjectPath: req.GlProjectPath, } pipeline.Handle(ctx, backup.NewRestoreCommand(manager, backup.RestoreRequest{ Server: req.ServerInfo, Repository: &repo, VanityRepository: &repo, AlwaysCreate: req.AlwaysCreate, BackupID: cmd.backupID, })) } restoredRepos, err := pipeline.Done() if err != nil { return fmt.Errorf("restore: %w", err) } var removalErrors []error for storageName, repos := range existingRepos { for _, repo := range repos { if _, ok := restoredRepos[storageName][backup.NewRepositoryKey(repo)]; ok { continue } if storage.IsPoolRepository(repo) { // Pool repositories are not backed up and so these // repositories will always be considered dangling. continue } // If we have dangling repos (those which exist in the storage but // weren't part of the restore), they need to be deleted so the // state of repos in Gitaly matches that in the Rails DB. if err := removeRepository(ctx, pool, repo); err != nil { removalErrors = append(removalErrors, fmt.Errorf("storage_name %q relative_path %q: %w", storageName, repo.GetRelativePath(), err)) } } } if len(removalErrors) > 0 { return fmt.Errorf("remove dangling repositories: %d failures encountered: %w", len(removalErrors), errors.Join(removalErrors...)) } return nil } // removeRepository removes the specified repository from its storage. func removeRepository(ctx context.Context, pool *client.Pool, repo *gitalypb.Repository) error { server, err := storage.ExtractGitalyServer(ctx, repo.GetStorageName()) if err != nil { return fmt.Errorf("remove repo: %w", err) } conn, err := pool.Dial(ctx, server.Address, server.Token) if err != nil { return fmt.Errorf("remove repo: %w", err) } repoClient := gitalypb.NewRepositoryServiceClient(conn) _, err = repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}) // NotFound error is harmless and doesn't need to be reported. if err != nil && status.Code(err) != codes.NotFound { return fmt.Errorf("remove repo: remove: %w", err) } return nil } // listRepositories returns a list of repositories found in the given storage. func listRepositories(ctx context.Context, pool *client.Pool, storageName string) (repos []*gitalypb.Repository, err error) { server, err := storage.ExtractGitalyServer(ctx, storageName) if err != nil { return nil, fmt.Errorf("list repos: %w", err) } conn, err := pool.Dial(ctx, server.Address, server.Token) if err != nil { return nil, fmt.Errorf("list repos: %w", err) } internalClient := gitalypb.NewInternalGitalyClient(conn) stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storageName}) if err != nil { return nil, fmt.Errorf("list repos: walk: %w", err) } for { resp, err := stream.Recv() if errors.Is(err, io.EOF) { break } if err != nil { return nil, fmt.Errorf("list repos: receiving messages: %w", err) } repos = append(repos, &gitalypb.Repository{RelativePath: resp.GetRelativePath(), StorageName: storageName}) } return repos, nil }