internal/gitaly/storage/raftmgr/replica_snapshotter.go (124 lines of code) (raw):

package raftmgr import ( "errors" "fmt" "io" "os" "path/filepath" "strings" "sync" "github.com/prometheus/client_golang/prometheus" arc "gitlab.com/gitlab-org/gitaly/v16/internal/archive" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) const snapSuffix = ".snap" // ReplicaSnapshotFile represents a file used for snapshots. type ReplicaSnapshotFile interface { io.Reader io.Closer io.Writer Name() string } // ReplicaSnapshot is a structure that holds state about a temporary file that is used // to hold a snapshot. By using an intermediate file we avoid holding everything // in memory. Index, Term & PartitionID are used to identify when the snapshot was taken. type ReplicaSnapshot struct { file ReplicaSnapshotFile metadata ReplicaSnapshotMetadata } // ReplicaSnapshotMetadata holds the last index and term corresponding to when the snapshot was taken type ReplicaSnapshotMetadata struct { index storage.LSN term uint64 partitionID storage.PartitionID } // replicaSnapshotter manages the creation and handling of snapshots in a Raft network. // It provides thread-safe operations for snapshot management. type replicaSnapshotter struct { sync.Mutex logger logging.Logger dir string metrics RaftMetrics } // ReplicaSnapshotter is an interface to implement snapshotting in raft type ReplicaSnapshotter interface { sync.Locker materializeSnapshot(snapshot ReplicaSnapshotMetadata, tx storage.Transaction) (_ *ReplicaSnapshot, returnErr error) } // NewReplicaSnapshotter creates a new Snapshotter func NewReplicaSnapshotter(cfg config.Raft, logger logging.Logger, metrics RaftMetrics) (ReplicaSnapshotter, error) { logger = logger.WithField("component", "raft.snapshotter") logger.Info("Initializing Raft Snapshotter") // Create the snapshot directory if it doesn't exist if err := os.MkdirAll(cfg.SnapshotDir, mode.Directory); err != nil { return nil, fmt.Errorf("create snapshot directory: %w", err) } return &replicaSnapshotter{ logger: logger, dir: cfg.SnapshotDir, metrics: metrics, }, nil } // writeTarball writes the kv state of db and all folders/files from a partition's root to disk func writeTarball(partitionRoot string, kvFile *os.File, w io.Writer) error { builder := arc.NewTarBuilder(partitionRoot, w) if err := builder.VirtualFileWithContents("kv-state", kvFile); err != nil { return fmt.Errorf("tar builder: virtual file: %w", err) } if err := builder.RecursiveDir(".", "fs", true); err != nil { return fmt.Errorf("tar builder: recursive dir: %w", err) } if err := builder.Close(); err != nil { return fmt.Errorf("tar builder: close: %w", err) } return nil } // materializeSnapshot materializes the snapshot inside a transaction and writes to a compressed tar func (rs *replicaSnapshotter) materializeSnapshot(snapshotMetadata ReplicaSnapshotMetadata, tx storage.Transaction) (_ *ReplicaSnapshot, returnErr error) { saveSnapTimer := prometheus.NewTimer(rs.metrics.snapSaveSec) // Make a tmp file in snapshot dir tmpFile := fmt.Sprintf("%016d-%016d-%016d%s", snapshotMetadata.partitionID, snapshotMetadata.term, snapshotMetadata.index, ".tmp") archive, err := os.Create(filepath.Join(rs.dir, tmpFile)) if err != nil { return nil, fmt.Errorf("create snapshot file: %w", err) } rs.logger.WithField("path", archive.Name()).Info("Start snapshot creation") // Clean up tmp file if any errors arise after this point var keep bool defer func() { if keep { return } if err := os.Remove(archive.Name()); err != nil { returnErr = errors.Join(returnErr, fmt.Errorf("clean up temp snapshot: %w", err)) return } }() // Take copy of partition's kv state kvFile, err := storage.CreateKvFile(tx) if err != nil { return nil, fmt.Errorf("write kv file: %w", err) } defer func() { if err := kvFile.Close(); err != nil { returnErr = errors.Join(returnErr, fmt.Errorf("close temp KV file: %w", err)) } }() if err := writeTarball(tx.FS().Root(), kvFile, archive); err != nil { return nil, fmt.Errorf("write tarball: %w", err) } if err := archive.Sync(); err != nil { return nil, fmt.Errorf("sync archive to disk: %w", err) } // Finalize the archive. if err := archive.Close(); err != nil { return nil, fmt.Errorf("finalize snapshot: %w", err) } snapshotName := strings.Replace(tmpFile, filepath.Ext(tmpFile), snapSuffix, 1) outputDestination := filepath.Join(rs.dir, snapshotName) saveSnapTimer.ObserveDuration() rs.logger.WithField("path", outputDestination).Info("Snapshot saved") // Now that we've written all files to the archive, we can rename from a tmp file to a final snapshot if err := os.Rename(archive.Name(), outputDestination); err != nil { return nil, fmt.Errorf("rename temporary file: %w", err) } // Keep the temporary file for later use keep = true // After closing the archive f, err := os.Open(outputDestination) if err != nil { return nil, fmt.Errorf("open archive for verification: %w", err) } defer f.Close() return &ReplicaSnapshot{ file: f, metadata: ReplicaSnapshotMetadata{ index: snapshotMetadata.index, term: snapshotMetadata.term, }, }, nil }