internal/cli/gitaly/subcmd_recovery.go (567 lines of code) (raw):
package gitaly
import (
"archive/tar"
"bytes"
"container/list"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/urfave/cli/v3"
"gitlab.com/gitlab-org/gitaly/v16"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode"
nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"golang.org/x/sync/errgroup"
)
const (
flagAll = "all"
flagParallel = "parallel"
flagPartition = "partition"
)
type recoveryContext struct {
cmd *cli.Command
parallel int
nodeStorage storage.Storage
storageName string
partitions []storage.PartitionID
logEntryStore backup.LogEntryStore
cleanupFuncs *list.List
}
func newRecoveryCommand() *cli.Command {
return &cli.Command{
Name: "recovery",
Usage: "manage partitions offline",
UsageText: "gitaly recovery --config <gitaly_config_file> command [command options]",
Flags: []cli.Flag{
gitalyConfigFlag(),
},
Commands: []*cli.Command{
{
Name: "status",
Usage: "shows the status of a partition",
UsageText: `gitaly recovery --config <gitaly_config_file> status [command options]
Example: gitaly recovery --config gitaly.config.toml status --storage default --partition 2`,
Action: recoveryStatusAction,
Flags: []cli.Flag{
&cli.StringFlag{
Name: flagStorage,
Usage: "storage containing the partition",
},
&cli.StringFlag{
Name: flagPartition,
Usage: "partition ID",
},
&cli.StringFlag{
Name: flagRepository,
Usage: "relative path to the repository",
},
&cli.BoolFlag{
Name: flagAll,
Usage: "runs the command for all partitions in the storage",
},
&cli.IntFlag{
Name: flagParallel,
Usage: "maximum number of parallel queries per storage",
Value: 2,
},
},
},
{
Name: "replay",
Usage: "apply all available contiguous archived log entries for a partition, gitaly must be stopped before running this command",
UsageText: `gitaly recovery --config <gitaly_config_file> replay [command options]
Example: gitaly recovery --config gitaly.config.toml replay --storage default --partition 2`,
Action: recoveryReplayAction,
Flags: []cli.Flag{
&cli.StringFlag{
Name: flagStorage,
Usage: "storage containing the partition",
},
&cli.StringFlag{
Name: flagPartition,
Usage: "partition ID",
},
&cli.StringFlag{
Name: flagRepository,
Usage: "relative path to the repository",
},
&cli.BoolFlag{
Name: flagAll,
Usage: "runs the command for all partitions in the storage",
},
&cli.IntFlag{
Name: flagParallel,
Usage: "maximum number of parallel restores per storage",
Value: 2,
},
},
},
},
}
}
func recoveryStatusAction(ctx context.Context, cmd *cli.Command) (returnErr error) {
recoveryContext, err := setupRecoveryContext(ctx, cmd)
if err != nil {
return fmt.Errorf("setup recovery context: %w", err)
}
defer func() {
returnErr = errors.Join(returnErr, recoveryContext.Cleanup())
}()
g, _ := errgroup.WithContext(ctx)
g.SetLimit(recoveryContext.parallel)
var successCount, errCount atomic.Uint64
for _, partitionID := range recoveryContext.partitions {
g.Go(func() error {
err := recoveryContext.printPartitionStatus(ctx, partitionID)
if err != nil {
fmt.Fprintf(cmd.ErrWriter, "restore status for partition %d failed: %v\n", partitionID, err)
errCount.Add(1)
} else {
successCount.Add(1)
}
return nil
})
}
err = g.Wait()
success := successCount.Load()
failure := errCount.Load()
fmt.Fprintf(recoveryContext.cmd.Writer, "recovery status completed: %d succeeded, %d failed", success, failure)
if err == nil && errCount.Load() > 0 {
err = fmt.Errorf("recovery status failed for %d out of %d partition(s)", failure, success+failure)
}
return err
}
func (rc *recoveryContext) printPartitionStatus(ctx context.Context, partitionID storage.PartitionID) (returnErr error) {
var appliedLSN storage.LSN
var relativePaths []string
ptn, err := rc.nodeStorage.GetPartition(ctx, partitionID)
if err != nil {
return fmt.Errorf("getting partition %s: %w", partitionID.String(), err)
}
defer ptn.Close()
txn, err := ptn.Begin(ctx, storage.BeginOptions{
Write: false,
RelativePaths: []string{},
})
if err != nil {
return fmt.Errorf("begin: %w", err)
}
appliedLSN = txn.SnapshotLSN()
relativePaths = txn.PartitionRelativePaths()
err = txn.Rollback(ctx)
if err != nil {
return fmt.Errorf("rollback: %w", err)
}
var buffer bytes.Buffer
buffer.WriteString("---------------------------------------------\n")
buffer.WriteString(fmt.Sprintf("Partition ID: %s - Applied LSN: %s\n", partitionID.String(), appliedLSN.String()))
if len(relativePaths) > 0 {
buffer.WriteString("Relative paths:\n")
for _, relativePath := range relativePaths {
buffer.WriteString(fmt.Sprintf(" - %s\n", relativePath))
}
}
entries := rc.logEntryStore.Query(backup.PartitionInfo{
PartitionID: partitionID,
StorageName: rc.storageName,
}, appliedLSN+1)
var lastLSN storage.LSN
discontinuity := false
for entries.Next(ctx) {
currentLSN := entries.LSN()
// First iteration
if lastLSN == storage.LSN(0) {
lastLSN = currentLSN
continue
}
if currentLSN != lastLSN+1 {
// We've found a gap
discontinuity = true
break
}
lastLSN = currentLSN
}
if lastLSN == storage.LSN(0) {
buffer.WriteString("Available WAL backup entries: No entries found\n")
} else {
buffer.WriteString(fmt.Sprintf("Available WAL backup entries: up to LSN: %s\n", lastLSN.String()))
if discontinuity {
buffer.WriteString(fmt.Sprintf("There is a gap in WAL archive after LSN: %s\n", lastLSN.String()))
}
}
_, _ = buffer.WriteTo(rc.cmd.Writer)
if err := entries.Err(); err != nil {
return fmt.Errorf("query log entry store: %w", err)
}
return nil
}
func recoveryReplayAction(ctx context.Context, cmd *cli.Command) (returnErr error) {
recoveryContext, err := setupRecoveryContext(ctx, cmd)
if err != nil {
return fmt.Errorf("setup recovery context: %w", err)
}
defer func() {
returnErr = errors.Join(returnErr, recoveryContext.Cleanup())
}()
tempDir, err := os.MkdirTemp("", "gitaly-recovery-replay-*")
if err != nil {
return fmt.Errorf("create temp dir: %w", err)
}
defer func() {
if err := os.RemoveAll(tempDir); err != nil {
returnErr = errors.Join(returnErr, fmt.Errorf("removing temp dir: %w", err))
}
}()
g, _ := errgroup.WithContext(ctx)
g.SetLimit(recoveryContext.parallel)
var successCount, errCount atomic.Uint64
for _, partitionID := range recoveryContext.partitions {
g.Go(func() error {
fmt.Fprintf(cmd.Writer, "started processing partition %d\n", partitionID)
err := recoveryContext.processPartition(ctx, tempDir, partitionID)
if err != nil {
fmt.Fprintf(cmd.ErrWriter, "restore replay for partition %d failed: %v\n", partitionID, err)
errCount.Add(1)
} else {
successCount.Add(1)
}
return nil
})
}
err = g.Wait()
success := successCount.Load()
failure := errCount.Load()
fmt.Fprintf(recoveryContext.cmd.Writer, "recovery replay completed: %d succeeded, %d failed", success, failure)
if err == nil && errCount.Load() > 0 {
err = fmt.Errorf("recovery replay failed for %d out of %d partition(s)", failure, success+failure)
}
return err
}
func (rc *recoveryContext) processPartition(ctx context.Context, tempDir string, partitionID storage.PartitionID) error {
var appliedLSN storage.LSN
ptn, err := rc.nodeStorage.GetPartition(ctx, partitionID)
if err != nil {
return fmt.Errorf("getting partition %s: %w", partitionID.String(), err)
}
defer ptn.Close()
txn, err := ptn.Begin(ctx, storage.BeginOptions{
Write: false,
RelativePaths: []string{},
})
if err != nil {
return fmt.Errorf("begin: %w", err)
}
appliedLSN = txn.SnapshotLSN()
err = txn.Rollback(ctx)
if err != nil {
return fmt.Errorf("rollback: %w", err)
}
partitionInfo := backup.PartitionInfo{
PartitionID: partitionID,
StorageName: rc.storageName,
}
nextLSN := appliedLSN + 1
finalLSN := appliedLSN
iterator := rc.logEntryStore.Query(partitionInfo, nextLSN)
for iterator.Next(ctx) {
if nextLSN != iterator.LSN() {
return fmt.Errorf("there is discontinuity in the WAL entries. Expected LSN: %s, Got: %s", nextLSN.String(), iterator.LSN().String())
}
reader, err := rc.logEntryStore.GetReader(ctx, partitionInfo, nextLSN)
if err != nil {
return fmt.Errorf("get reader for entry with LSN %s: %w", nextLSN, err)
}
if err := processLogEntry(reader, tempDir, ptn.GetLogWriter(), nextLSN); err != nil {
reader.Close()
return fmt.Errorf("process log entry %s: %w", nextLSN, err)
}
reader.Close()
// Wait for the log entry to be applied and verify the result
txn, err = ptn.Begin(ctx, storage.BeginOptions{
Write: false,
RelativePaths: []string{},
})
if err != nil || txn.SnapshotLSN() != nextLSN {
// If a log entry cannot be applied for any reason (broken, wrong bucket, etc.), the user will
// find out, but it requires an in-depth investigation. Until the reason is exposed, that
// partition is always in a broken state. There is nothing this tool can do to resolve the
// situation automatically. It's up to the user to decide the next course of actions. At latest,
// the malformed log entry is removed. Otherwise, the partition is broken completely.
return errors.Join(
fmt.Errorf("failed to apply latest log entry: %w", err),
ptn.GetLogWriter().DeleteLogEntry(nextLSN),
)
}
finalLSN = nextLSN
nextLSN++
}
if err := iterator.Err(); err != nil {
return fmt.Errorf("query log entry store: %w", err)
}
var buffer bytes.Buffer
buffer.WriteString("---------------------------------------------\n")
buffer.WriteString(fmt.Sprintf("Partition ID: %s - Applied LSN: %s\n", partitionID.String(), appliedLSN.String()))
buffer.WriteString(fmt.Sprintf("Successfully processed log entries up to LSN: %s\n", finalLSN.String()))
_, _ = buffer.WriteTo(rc.cmd.Writer)
return nil
}
func processLogEntry(reader io.Reader, tempDir string, logWriter storage.LogWriter, lsn storage.LSN) (returnErr error) {
stagingDir, err := os.MkdirTemp(tempDir, "staging-*")
if err != nil {
return fmt.Errorf("create staging dir: %w", err)
}
defer func() {
if err := os.RemoveAll(stagingDir); err != nil {
returnErr = errors.Join(returnErr, fmt.Errorf("removing temp staging dir: %w", err))
}
}()
if err := downloadArchive(reader, stagingDir); err != nil {
return fmt.Errorf("download archive: %w", err)
}
if err := extractArchive(stagingDir); err != nil {
return fmt.Errorf("extract archive: %w", err)
}
// Validate that WAL entry was extracted correctly to its own directory
entryPath := filepath.Join(stagingDir, lsn.String())
info, err := os.Stat(entryPath)
if err != nil {
return fmt.Errorf("WAL entry not found after archive extraction: %w", err)
}
if !info.IsDir() {
return fmt.Errorf("expected WAL entry path %s to be a directory", entryPath)
}
if _, err := logWriter.CompareAndAppendLogEntry(lsn, entryPath); err != nil {
return fmt.Errorf("append log entry: %w", err)
}
logWriter.NotifyNewEntries()
return nil
}
func downloadArchive(reader io.Reader, path string) error {
archivePath := path + ".tar"
file, err := os.Create(archivePath)
if err != nil {
return fmt.Errorf("create archive file: %w", err)
}
defer file.Close()
_, err = io.Copy(file, reader)
if err != nil {
return fmt.Errorf("copy archive content: %w", err)
}
return nil
}
func extractArchive(path string) error {
if err := os.MkdirAll(path, mode.Directory); err != nil {
return fmt.Errorf("create destination directory: %w", err)
}
archivePath := path + ".tar"
archiveFile, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("open archive file: %w", err)
}
defer archiveFile.Close()
tr := tar.NewReader(archiveFile)
for {
header, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("read tar header: %w", err)
}
target := filepath.Join(path, header.Name)
switch header.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(target, mode.Directory); err != nil {
return fmt.Errorf("create directory: %w", err)
}
case tar.TypeReg:
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
return fmt.Errorf("create file: %w", err)
}
if _, err := io.Copy(f, tr); err != nil {
f.Close()
return fmt.Errorf("write file content: %w", err)
}
f.Close()
default:
return fmt.Errorf("tar header type not supported: %d", header.Typeflag)
}
}
return nil
}
func setupRecoveryContext(ctx context.Context, cmd *cli.Command) (rc recoveryContext, returnErr error) {
recoveryContext := recoveryContext{
cmd: cmd,
partitions: make([]storage.PartitionID, 0),
cleanupFuncs: list.New(),
}
defer func() {
if returnErr != nil {
returnErr = errors.Join(returnErr, recoveryContext.Cleanup())
}
}()
parallel := cmd.Int(flagParallel)
if parallel < 1 {
parallel = 1
}
recoveryContext.parallel = int(parallel)
logger := log.ConfigureCommand()
cfg, err := loadConfig(cmd.String(flagConfig))
if err != nil {
return recoveryContext, fmt.Errorf("load config: %w", err)
}
if cfg.Backup.WALGoCloudURL == "" {
return recoveryContext, fmt.Errorf("write-ahead log backup is not configured")
}
sink, err := backup.ResolveSink(ctx, cfg.Backup.WALGoCloudURL)
if err != nil {
return recoveryContext, fmt.Errorf("resolve sink: %w", err)
}
recoveryContext.logEntryStore = backup.NewLogEntryStore(sink)
runtimeDir, err := os.MkdirTemp("", "gitaly-recovery-*")
if err != nil {
return recoveryContext, fmt.Errorf("creating runtime dir: %w", err)
}
recoveryContext.cleanupFuncs.PushFront(func() error {
return os.RemoveAll(runtimeDir)
})
cfg.RuntimeDir = runtimeDir
if err := gitaly.UnpackAuxiliaryBinaries(cfg.RuntimeDir, func(binaryName string) bool {
return strings.HasPrefix(binaryName, "gitaly-git")
}); err != nil {
return recoveryContext, fmt.Errorf("unpack auxiliary binaries: %w", err)
}
dbMgr, err := databasemgr.NewDBManager(
ctx,
cfg.Storages,
keyvalue.NewBadgerStore,
helper.NewTimerTickerFactory(time.Minute),
logger,
)
if err != nil {
return recoveryContext, fmt.Errorf("new db manager: %w", err)
}
recoveryContext.cleanupFuncs.PushFront(func() error {
dbMgr.Close()
return nil
})
gitCmdFactory, cleanup, err := gitcmd.NewExecCommandFactory(cfg, logger)
if err != nil {
return recoveryContext, fmt.Errorf("creating Git command factory: %w", err)
}
recoveryContext.cleanupFuncs.PushFront(func() error {
cleanup()
return nil
})
catfileCache := catfile.NewCache(cfg)
recoveryContext.cleanupFuncs.PushFront(func() error {
catfileCache.Stop()
return nil
})
partitionFactoryOptions := []partition.FactoryOption{
partition.WithCmdFactory(gitCmdFactory),
partition.WithRepoFactory(localrepo.NewFactory(logger, config.NewLocator(cfg), gitCmdFactory, catfileCache)),
partition.WithMetrics(partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus))),
partition.WithRaftConfig(cfg.Raft),
}
node, err := nodeimpl.NewManager(
cfg.Storages,
storagemgr.NewFactory(
logger,
dbMgr,
migration.NewFactory(
partition.NewFactory(partitionFactoryOptions...),
migration.NewMetrics(),
[]migration.Migration{},
),
1,
storagemgr.NewMetrics(cfg.Prometheus),
),
)
if err != nil {
return recoveryContext, fmt.Errorf("new node: %w", err)
}
recoveryContext.cleanupFuncs.PushFront(func() error {
node.Close()
return nil
})
storageName := cmd.String(flagStorage)
if storageName == "" {
if len(cfg.Storages) != 1 {
return recoveryContext, fmt.Errorf("multiple storages configured: use --storage to specify the one you want")
}
storageName = cfg.Storages[0].Name
}
nodeStorage, err := node.GetStorage(storageName)
if err != nil {
return recoveryContext, fmt.Errorf("get storage: %w", err)
}
recoveryContext.storageName = storageName
recoveryContext.nodeStorage = nodeStorage
if cmd.Bool("all") {
iter, err := nodeStorage.ListPartitions(storage.PartitionID(0))
if err != nil {
return recoveryContext, fmt.Errorf("list partitions: %w", err)
}
defer iter.Close()
for iter.Next() {
recoveryContext.partitions = append(recoveryContext.partitions, iter.GetPartitionID())
}
if err := iter.Err(); err != nil {
return recoveryContext, fmt.Errorf("partition iterator: %w", err)
}
} else {
partitionString := cmd.String(flagPartition)
repositoryPath := cmd.String(flagRepository)
if partitionString != "" && repositoryPath != "" {
return recoveryContext, fmt.Errorf("--partition and --repository flags can not be provided at the same time")
}
if partitionString == "" && repositoryPath == "" {
return recoveryContext, fmt.Errorf("this command requires one of --all, --partition or --repository flags")
}
var err error
var partitionID storage.PartitionID
if partitionString != "" {
if err = parsePartitionID(&partitionID, partitionString); err != nil {
return recoveryContext, fmt.Errorf("parse partition ID: %w", err)
}
} else {
partitionID, err = nodeStorage.GetAssignedPartitionID(repositoryPath)
if err != nil {
return recoveryContext, fmt.Errorf("partition ID not found for the given relative path: %w", err)
}
}
if partitionID == storage.PartitionID(0) {
return recoveryContext, fmt.Errorf("invalid partition ID %s", partitionID)
}
recoveryContext.partitions = append(recoveryContext.partitions, partitionID)
}
return recoveryContext, nil
}
func parsePartitionID(id *storage.PartitionID, value string) error {
parsedID, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return fmt.Errorf("parse uint: %w", err)
}
*id = storage.PartitionID(parsedID)
return nil
}
func (rc *recoveryContext) Cleanup() error {
var err error
for i := rc.cleanupFuncs.Front(); i != nil; i = i.Next() {
err = errors.Join(err, i.Value.(func() error)())
}
return err
}